1
0
Fork 0

Add alternative scheduler based on the number of entries

This commit is contained in:
Shizun Ge 2020-05-25 16:06:56 -05:00 committed by GitHub
parent 25d4b9fc0c
commit cead85b165
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 423 additions and 119 deletions

View file

@ -692,6 +692,111 @@ func TestBatchSize(t *testing.T) {
}
}
func TestDefautPollingSchedulerValue(t *testing.T) {
os.Clearenv()
parser := NewParser()
opts, err := parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
expected := defaultPollingScheduler
result := opts.PollingScheduler()
if result != expected {
t.Fatalf(`Unexpected POLLING_SCHEDULER value, got %v instead of %v`, result, expected)
}
}
func TestPollingScheduler(t *testing.T) {
os.Clearenv()
os.Setenv("POLLING_SCHEDULER", "entry_count_based")
parser := NewParser()
opts, err := parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
expected := "entry_count_based"
result := opts.PollingScheduler()
if result != expected {
t.Fatalf(`Unexpected POLLING_SCHEDULER value, got %v instead of %v`, result, expected)
}
}
func TestDefautSchedulerCountBasedMaxIntervalValue(t *testing.T) {
os.Clearenv()
parser := NewParser()
opts, err := parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
expected := defaultSchedulerCountBasedMaxInterval
result := opts.SchedulerCountBasedMaxInterval()
if result != expected {
t.Fatalf(`Unexpected SCHEDULER_ENTRY_COUNT_BASED_MAX_INTERVAL value, got %v instead of %v`, result, expected)
}
}
func TestDefautSchedulerCountBasedMaxInterval(t *testing.T) {
os.Clearenv()
os.Setenv("SCHEDULER_ENTRY_COUNT_BASED_MAX_INTERVAL", "30")
parser := NewParser()
opts, err := parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
expected := 30
result := opts.SchedulerCountBasedMaxInterval()
if result != expected {
t.Fatalf(`Unexpected SCHEDULER_ENTRY_COUNT_BASED_MAX_INTERVAL value, got %v instead of %v`, result, expected)
}
}
func TestDefautSchedulerCountBasedMinIntervalValue(t *testing.T) {
os.Clearenv()
parser := NewParser()
opts, err := parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
expected := defaultSchedulerCountBasedMinInterval
result := opts.SchedulerCountBasedMinInterval()
if result != expected {
t.Fatalf(`Unexpected SCHEDULER_ENTRY_COUNT_BASED_MIN_INTERVAL value, got %v instead of %v`, result, expected)
}
}
func TestDefautSchedulerCountBasedMinInterval(t *testing.T) {
os.Clearenv()
os.Setenv("SCHEDULER_ENTRY_COUNT_BASED_MIN_INTERVAL", "30")
parser := NewParser()
opts, err := parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
expected := 30
result := opts.SchedulerCountBasedMinInterval()
if result != expected {
t.Fatalf(`Unexpected SCHEDULER_ENTRY_COUNT_BASED_MIN_INTERVAL value, got %v instead of %v`, result, expected)
}
}
func TestOAuth2UserCreationWhenUnset(t *testing.T) {
os.Clearenv()

View file

@ -22,6 +22,9 @@ const (
defaultWorkerPoolSize = 5
defaultPollingFrequency = 60
defaultBatchSize = 10
defaultPollingScheduler = "round_robin"
defaultSchedulerCountBasedMinInterval = 5
defaultSchedulerCountBasedMaxInterval = 24 * 60
defaultRunMigrations = false
defaultDatabaseURL = "user=postgres password=postgres dbname=miniflux2 sslmode=disable"
defaultDatabaseMaxConns = 20
@ -74,6 +77,9 @@ type Options struct {
cleanupRemoveSessionsDays int
pollingFrequency int
batchSize int
pollingScheduler string
schedulerCountBasedMinInterval int
schedulerCountBasedMaxInterval int
workerPoolSize int
createAdmin bool
proxyImages string
@ -116,6 +122,9 @@ func NewOptions() *Options {
cleanupRemoveSessionsDays: defaultCleanupRemoveSessionsDays,
pollingFrequency: defaultPollingFrequency,
batchSize: defaultBatchSize,
pollingScheduler: defaultPollingScheduler,
schedulerCountBasedMinInterval: defaultSchedulerCountBasedMinInterval,
schedulerCountBasedMaxInterval: defaultSchedulerCountBasedMaxInterval,
workerPoolSize: defaultWorkerPoolSize,
createAdmin: defaultCreateAdmin,
proxyImages: defaultProxyImages,
@ -233,6 +242,21 @@ func (o *Options) BatchSize() int {
return o.batchSize
}
// PollingScheduler returns the scheduler used for polling feeds
func (o *Options) PollingScheduler() string {
return o.pollingScheduler
}
// SchedulerCountBasedMaxInterval returns the maximum interval in minutes for the count-based scheduler
func (o *Options) SchedulerCountBasedMaxInterval() int {
return o.schedulerCountBasedMaxInterval
}
// SchedulerCountBasedMinInterval returns the minimum interval in minutes for the count-based scheduler
func (o *Options) SchedulerCountBasedMinInterval() int {
return o.schedulerCountBasedMinInterval
}
// IsOAuth2UserCreationAllowed returns true if user creation is allowed for OAuth2 users.
func (o *Options) IsOAuth2UserCreationAllowed() bool {
return o.oauth2UserCreationAllowed
@ -349,6 +373,9 @@ func (o *Options) String() string {
builder.WriteString(fmt.Sprintf("WORKER_POOL_SIZE: %v\n", o.workerPoolSize))
builder.WriteString(fmt.Sprintf("POLLING_FREQUENCY: %v\n", o.pollingFrequency))
builder.WriteString(fmt.Sprintf("BATCH_SIZE: %v\n", o.batchSize))
builder.WriteString(fmt.Sprintf("POLLING_SCHEDULER: %v\n", o.pollingScheduler))
builder.WriteString(fmt.Sprintf("SCHEDULER_ENTRY_COUNT_BASED_MAX_INTERVAL: %v\n", o.schedulerCountBasedMaxInterval))
builder.WriteString(fmt.Sprintf("SCHEDULER_ENTRY_COUNT_BASED_MIN_INTERVAL: %v\n", o.schedulerCountBasedMinInterval))
builder.WriteString(fmt.Sprintf("PROXY_IMAGES: %v\n", o.proxyImages))
builder.WriteString(fmt.Sprintf("CREATE_ADMIN: %v\n", o.createAdmin))
builder.WriteString(fmt.Sprintf("POCKET_CONSUMER_KEY: %v\n", o.pocketConsumerKey))

View file

@ -138,6 +138,12 @@ func (p *Parser) parseLines(lines []string) (err error) {
p.opts.pollingFrequency = parseInt(value, defaultPollingFrequency)
case "BATCH_SIZE":
p.opts.batchSize = parseInt(value, defaultBatchSize)
case "POLLING_SCHEDULER":
p.opts.pollingScheduler = parseString(value, defaultPollingScheduler)
case "SCHEDULER_ENTRY_COUNT_BASED_MAX_INTERVAL":
p.opts.schedulerCountBasedMaxInterval = parseInt(value, defaultSchedulerCountBasedMaxInterval)
case "SCHEDULER_ENTRY_COUNT_BASED_MIN_INTERVAL":
p.opts.schedulerCountBasedMinInterval = parseInt(value, defaultSchedulerCountBasedMinInterval)
case "PROXY_IMAGES":
p.opts.proxyImages = parseString(value, defaultProxyImages)
case "CREATE_ADMIN":

View file

@ -12,7 +12,7 @@ import (
"miniflux.app/logger"
)
const schemaVersion = 29
const schemaVersion = 30
// Migrate executes database migrations.
func Migrate(db *sql.DB) {

View file

@ -179,6 +179,9 @@ create unique index entries_share_code_idx on entries using btree(share_code) wh
created_at timestamp with time zone not null default now(),
primary key(id, value)
);`,
"schema_version_30": `alter table feeds add column next_check_at timestamp with time zone default now();
create index entries_user_feed_idx on entries (user_id, feed_id);
`,
"schema_version_4": `create type entry_sorting_direction as enum('asc', 'desc');
alter table users add column entry_direction entry_sorting_direction default 'asc';
`,
@ -231,6 +234,7 @@ var SqlMapChecksums = map[string]string{
"schema_version_28": "a64b5ba0b37fe3f209617b7d0e4dd05018d2b8362d2c9c528ba8cce19b77e326",
"schema_version_29": "527403d951d025b387baf7b1ab80c014752c5429cc0b9851aeb34b7716cf2c68",
"schema_version_3": "a54745dbc1c51c000f74d4e5068f1e2f43e83309f023415b1749a47d5c1e0f12",
"schema_version_30": "3ec48a9b2e7a0fc32c85f31652f723565c34213f5f2d7e5e5076aad8f0b40d23",
"schema_version_4": "216ea3a7d3e1704e40c797b5dc47456517c27dbb6ca98bf88812f4f63d74b5d9",
"schema_version_5": "46397e2f5f2c82116786127e9f6a403e975b14d2ca7b652a48cd1ba843e6a27c",
"schema_version_6": "9d05b4fb223f0e60efc716add5048b0ca9c37511cf2041721e20505d6d798ce4",

View file

@ -0,0 +1,2 @@
alter table feeds add column next_check_at timestamp with time zone default now();
create index entries_user_feed_idx on entries (user_id, feed_id);

View file

@ -110,6 +110,15 @@ Refresh interval in minutes for feeds (default is 60 minutes)\&.
.B BATCH_SIZE
Number of feeds to send to the queue for each interval (default is 10)\&.
.TP
.B POLLING_SCHEDULER
The scheduler used for polling feeds. Possible values include: "round_robin", "entry_count_based"
.TP
.B SCHEDULER_ENTRY_COUNT_BASED_MAX_INTERVAL
The maximum interval in minutes for the entry-count-based scheduler
.TP
.B SCHEDULER_ENTRY_COUNT_BASED_MIN_INTERVAL
The minimum interval in minutes for the entry-count-based scheduler
.TP
.B DATABASE_URL
Postgresql connection parameters\&.
.br

View file

@ -6,8 +6,11 @@ package model // import "miniflux.app/model"
import (
"fmt"
"math"
"strings"
"time"
"miniflux.app/config"
"miniflux.app/http/client"
)
@ -19,6 +22,7 @@ type Feed struct {
SiteURL string `json:"site_url"`
Title string `json:"title"`
CheckedAt time.Time `json:"checked_at"`
NextCheckAt time.Time `json:"next_check_at"`
EtagHeader string `json:"etag_header"`
LastModifiedHeader string `json:"last_modified_header"`
ParsingErrorMsg string `json:"parsing_error_message"`
@ -37,6 +41,11 @@ type Feed struct {
ReadCount int `json:"-"`
}
const (
// SchedulerEntryCountBased represnets the name of the scheduler based on entry counts.
SchedulerEntryCountBased = "entry_count_based"
)
func (f *Feed) String() string {
return fmt.Sprintf("ID=%d, UserID=%d, FeedURL=%s, SiteURL=%s, Title=%s, Category={%s}",
f.ID,
@ -91,5 +100,27 @@ func (f *Feed) CheckedNow() {
}
}
// ScheduleNextCheck set "next_check_at" of a feed based on the scheduler selected from the configuration.
func (f *Feed) ScheduleNextCheck(weeklyCount int) {
var nextCheckAt time.Time
switch strings.ToLower(config.Opts.PollingScheduler()) {
case SchedulerEntryCountBased:
var intervalMinutes int
if weeklyCount == 0 {
intervalMinutes = config.Opts.SchedulerCountBasedMaxInterval()
} else {
intervalMinutes = int(math.Round(float64(7*24*60) / float64(weeklyCount)))
}
intervalMinutes = int(math.Min(float64(intervalMinutes), float64(config.Opts.SchedulerCountBasedMaxInterval())))
intervalMinutes = int(math.Max(float64(intervalMinutes), float64(config.Opts.SchedulerCountBasedMinInterval())))
nextCheckAt = time.Now().Add(time.Minute * time.Duration(intervalMinutes))
default:
// round robin
// omit the interval because they are same for all feeds.
nextCheckAt = time.Now()
}
f.NextCheckAt = nextCheckAt
}
// Feeds is a list of feed
type Feeds []*Feed

View file

@ -5,8 +5,12 @@
package model // import "miniflux.app/model"
import (
"fmt"
"os"
"testing"
"time"
"miniflux.app/config"
"miniflux.app/http/client"
)
@ -107,3 +111,74 @@ func TestFeedCheckedNow(t *testing.T) {
t.Error(`The checked date must be set`)
}
}
func TestFeedScheduleNextCheckDefault(t *testing.T) {
var err error
parser := config.NewParser()
config.Opts, err = parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
feed := &Feed{}
weeklyCount := 10
feed.ScheduleNextCheck(weeklyCount)
if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
}
}
func TestFeedScheduleNextCheckEntryCountBasedMaxInterval(t *testing.T) {
maxInterval := 5
minInterval := 1
os.Clearenv()
os.Setenv("POLLING_SCHEDULER", "entry_count_based")
os.Setenv("SCHEDULER_ENTRY_COUNT_BASED_MAX_INTERVAL", fmt.Sprintf("%d", maxInterval))
os.Setenv("SCHEDULER_ENTRY_COUNT_BASED_MIN_INTERVAL", fmt.Sprintf("%d", minInterval))
var err error
parser := config.NewParser()
config.Opts, err = parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
feed := &Feed{}
weeklyCount := maxInterval * 100
feed.ScheduleNextCheck(weeklyCount)
if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
}
if feed.NextCheckAt.After(time.Now().Add(time.Minute * time.Duration(maxInterval))) {
t.Error(`The next_check_at should not be after the now + max interval`)
}
}
func TestFeedScheduleNextCheckEntryCountBasedMinInterval(t *testing.T) {
maxInterval := 500
minInterval := 100
os.Clearenv()
os.Setenv("POLLING_SCHEDULER", "entry_count_based")
os.Setenv("SCHEDULER_ENTRY_COUNT_BASED_MAX_INTERVAL", fmt.Sprintf("%d", maxInterval))
os.Setenv("SCHEDULER_ENTRY_COUNT_BASED_MIN_INTERVAL", fmt.Sprintf("%d", minInterval))
var err error
parser := config.NewParser()
config.Opts, err = parser.ParseEnvironmentVariables()
if err != nil {
t.Fatalf(`Parsing failure: %v`, err)
}
feed := &Feed{}
weeklyCount := minInterval / 2
feed.ScheduleNextCheck(weeklyCount)
if feed.NextCheckAt.IsZero() {
t.Error(`The next_check_at must be set`)
}
if feed.NextCheckAt.Before(time.Now().Add(time.Minute * time.Duration(minInterval))) {
t.Error(`The next_check_at should not be before the now + min interval`)
}
}

View file

@ -90,7 +90,13 @@ func (h *Handler) RefreshFeed(userID, feedID int64) error {
return errors.NewLocalizedError(errNotFound, feedID)
}
weeklyCount, parametersErr := h.store.FeedSchedulerParameters(userID, feedID)
if parametersErr != nil {
return parametersErr
}
originalFeed.CheckedNow()
originalFeed.ScheduleNextCheck(weeklyCount)
request := client.New(originalFeed.FeedURL)
request.WithCredentials(originalFeed.Username, originalFeed.Password)

View file

@ -8,7 +8,9 @@ import (
"database/sql"
"errors"
"fmt"
"strings"
"miniflux.app/config"
"miniflux.app/model"
"miniflux.app/timezone"
)
@ -272,6 +274,39 @@ func (s *Storage) fetchFeeds(feedQuery, counterQuery string, args ...interface{}
return feeds, nil
}
// FeedSchedulerParameters returns the parameters used for the scheduler.
func (s *Storage) FeedSchedulerParameters(userID, feedID int64) (int, error) {
scheduler := strings.ToLower(config.Opts.PollingScheduler())
if scheduler != model.SchedulerEntryCountBased {
return 0, nil
}
var weeklyCount int
query := `
SELECT
count(*)
FROM
entries
WHERE
entries.user_id=$1 AND
entries.feed_id=$2 AND
entries.published_at BETWEEN (now() - interval '1 week') AND now();
`
err := s.db.QueryRow(query, userID, feedID).Scan(
&weeklyCount,
)
switch {
case err == sql.ErrNoRows:
return 0, nil
case err != nil:
return 0, fmt.Errorf(`store: unable to fetch scheduler parameters for feed #%d: %v`, feedID, err)
}
return weeklyCount, nil
}
// FeedByID returns a feed by the ID.
func (s *Storage) FeedByID(userID, feedID int64) (*model.Feed, error) {
var feed model.Feed
@ -429,9 +464,10 @@ func (s *Storage) UpdateFeed(feed *model.Feed) (err error) {
user_agent=$13,
username=$14,
password=$15,
disabled=$16
disabled=$16,
next_check_at=$17
WHERE
id=$17 AND user_id=$18
id=$18 AND user_id=$19
`
_, err = s.db.Exec(query,
feed.FeedURL,
@ -450,6 +486,7 @@ func (s *Storage) UpdateFeed(feed *model.Feed) (err error) {
feed.Username,
feed.Password,
feed.Disabled,
feed.NextCheckAt,
feed.ID,
feed.UserID,
)
@ -469,14 +506,16 @@ func (s *Storage) UpdateFeedError(feed *model.Feed) (err error) {
SET
parsing_error_msg=$1,
parsing_error_count=$2,
checked_at=$3
checked_at=$3,
next_check_at=$4
WHERE
id=$4 AND user_id=$5
id=$5 AND user_id=$6
`
_, err = s.db.Exec(query,
feed.ParsingErrorMsg,
feed.ParsingErrorCount,
feed.CheckedAt,
feed.NextCheckAt,
feed.ID,
feed.UserID,
)

View file

@ -21,8 +21,8 @@ func (s *Storage) NewBatch(batchSize int) (jobs model.JobList, err error) {
FROM
feeds
WHERE
parsing_error_count < $1 AND disabled is false
ORDER BY checked_at ASC LIMIT %d
parsing_error_count < $1 AND disabled is false AND next_check_at < now()
ORDER BY next_check_at ASC LIMIT %d
`
return s.fetchBatchRows(fmt.Sprintf(query, batchSize), maxParsingError)
}
@ -39,7 +39,7 @@ func (s *Storage) NewUserBatch(userID int64, batchSize int) (jobs model.JobList,
feeds
WHERE
user_id=$1 AND disabled is false
ORDER BY checked_at ASC LIMIT %d
ORDER BY next_check_at ASC LIMIT %d
`
return s.fetchBatchRows(fmt.Sprintf(query, batchSize), userID)
}