feat(feeds): add scheduled cleanup (#1073)

* refactor(feeds): make feed scheduling more robust

* feat(feeds): add daily cleanup job

* removes feed cache older than 30 days

* fix(feeds): fmt wrong type
This commit is contained in:
ze0s 2023-09-02 22:44:28 +02:00 committed by GitHub
parent cfc2436d50
commit 6fd8626507
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 301 additions and 215 deletions

View file

@ -28,14 +28,14 @@ func NewFeedCacheRepo(log logger.Logger, db *DB) domain.FeedCacheRepo {
} }
} }
func (r *FeedCacheRepo) Get(bucket string, key string) ([]byte, error) { func (r *FeedCacheRepo) Get(feedId int, key string) ([]byte, error) {
queryBuilder := r.db.squirrel. queryBuilder := r.db.squirrel.
Select( Select(
"value", "value",
"ttl", "ttl",
). ).
From("feed_cache"). From("feed_cache").
Where(sq.Eq{"bucket": bucket}). Where(sq.Eq{"feed_id": feedId}).
Where(sq.Eq{"key": key}). Where(sq.Eq{"key": key}).
Where(sq.Gt{"ttl": time.Now()}) Where(sq.Gt{"ttl": time.Now()})
@ -63,16 +63,16 @@ func (r *FeedCacheRepo) Get(bucket string, key string) ([]byte, error) {
return value, nil return value, nil
} }
func (r *FeedCacheRepo) GetByBucket(ctx context.Context, bucket string) ([]domain.FeedCacheItem, error) { func (r *FeedCacheRepo) GetByFeed(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error) {
queryBuilder := r.db.squirrel. queryBuilder := r.db.squirrel.
Select( Select(
"bucket", "feed_id",
"key", "key",
"value", "value",
"ttl", "ttl",
). ).
From("feed_cache"). From("feed_cache").
Where(sq.Eq{"bucket": bucket}) Where(sq.Eq{"feed_id": feedId})
query, args, err := queryBuilder.ToSql() query, args, err := queryBuilder.ToSql()
if err != nil { if err != nil {
@ -91,7 +91,7 @@ func (r *FeedCacheRepo) GetByBucket(ctx context.Context, bucket string) ([]domai
for rows.Next() { for rows.Next() {
var d domain.FeedCacheItem var d domain.FeedCacheItem
if err := rows.Scan(&d.Bucket, &d.Key, &d.Value, &d.TTL); err != nil { if err := rows.Scan(&d.FeedId, &d.Key, &d.Value, &d.TTL); err != nil {
return nil, errors.Wrap(err, "error scanning row") return nil, errors.Wrap(err, "error scanning row")
} }
@ -105,12 +105,11 @@ func (r *FeedCacheRepo) GetByBucket(ctx context.Context, bucket string) ([]domai
return data, nil return data, nil
} }
func (r *FeedCacheRepo) GetCountByBucket(ctx context.Context, bucket string) (int, error) { func (r *FeedCacheRepo) GetCountByFeed(ctx context.Context, feedId int) (int, error) {
queryBuilder := r.db.squirrel. queryBuilder := r.db.squirrel.
Select("COUNT(*)"). Select("COUNT(*)").
From("feed_cache"). From("feed_cache").
Where(sq.Eq{"bucket": bucket}) Where(sq.Eq{"feed_id": feedId})
query, args, err := queryBuilder.ToSql() query, args, err := queryBuilder.ToSql()
if err != nil { if err != nil {
@ -131,12 +130,12 @@ func (r *FeedCacheRepo) GetCountByBucket(ctx context.Context, bucket string) (in
return count, nil return count, nil
} }
func (r *FeedCacheRepo) Exists(bucket string, key string) (bool, error) { func (r *FeedCacheRepo) Exists(feedId int, key string) (bool, error) {
queryBuilder := r.db.squirrel. queryBuilder := r.db.squirrel.
Select("1"). Select("1").
Prefix("SELECT EXISTS ("). Prefix("SELECT EXISTS (").
From("feed_cache"). From("feed_cache").
Where(sq.Eq{"bucket": bucket}). Where(sq.Eq{"feed_id": feedId}).
Where(sq.Eq{"key": key}). Where(sq.Eq{"key": key}).
Suffix(")") Suffix(")")
@ -149,7 +148,7 @@ func (r *FeedCacheRepo) Exists(bucket string, key string) (bool, error) {
err = r.db.handler.QueryRow(query, args...).Scan(&exists) err = r.db.handler.QueryRow(query, args...).Scan(&exists)
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
return false, domain.ErrRecordNotFound return false, nil
} }
return false, errors.Wrap(err, "error query") return false, errors.Wrap(err, "error query")
@ -158,11 +157,11 @@ func (r *FeedCacheRepo) Exists(bucket string, key string) (bool, error) {
return exists, nil return exists, nil
} }
func (r *FeedCacheRepo) Put(bucket string, key string, val []byte, ttl time.Time) error { func (r *FeedCacheRepo) Put(feedId int, key string, val []byte, ttl time.Time) error {
queryBuilder := r.db.squirrel. queryBuilder := r.db.squirrel.
Insert("feed_cache"). Insert("feed_cache").
Columns("bucket", "key", "value", "ttl"). Columns("feed_id", "key", "value", "ttl").
Values(bucket, key, val, ttl) Values(feedId, key, val, ttl)
query, args, err := queryBuilder.ToSql() query, args, err := queryBuilder.ToSql()
if err != nil { if err != nil {
@ -176,10 +175,10 @@ func (r *FeedCacheRepo) Put(bucket string, key string, val []byte, ttl time.Time
return nil return nil
} }
func (r *FeedCacheRepo) Delete(ctx context.Context, bucket string, key string) error { func (r *FeedCacheRepo) Delete(ctx context.Context, feedId int, key string) error {
queryBuilder := r.db.squirrel. queryBuilder := r.db.squirrel.
Delete("feed_cache"). Delete("feed_cache").
Where(sq.Eq{"bucket": bucket}). Where(sq.Eq{"feed_id": feedId}).
Where(sq.Eq{"key": key}) Where(sq.Eq{"key": key})
query, args, err := queryBuilder.ToSql() query, args, err := queryBuilder.ToSql()
@ -195,10 +194,8 @@ func (r *FeedCacheRepo) Delete(ctx context.Context, bucket string, key string) e
return nil return nil
} }
func (r *FeedCacheRepo) DeleteBucket(ctx context.Context, bucket string) error { func (r *FeedCacheRepo) DeleteByFeed(ctx context.Context, feedId int) error {
queryBuilder := r.db.squirrel. queryBuilder := r.db.squirrel.Delete("feed_cache").Where(sq.Eq{"feed_id": feedId})
Delete("feed_cache").
Where(sq.Eq{"bucket": bucket})
query, args, err := queryBuilder.ToSql() query, args, err := queryBuilder.ToSql()
if err != nil { if err != nil {
@ -215,9 +212,36 @@ func (r *FeedCacheRepo) DeleteBucket(ctx context.Context, bucket string) error {
return errors.Wrap(err, "error exec result") return errors.Wrap(err, "error exec result")
} }
if rows == 0 { r.log.Debug().Msgf("deleted %d rows from feed cache: %d", rows, feedId)
r.log.Warn().Msgf("no rows affected for delete of bucket: %s", bucket)
} return nil
}
func (r *FeedCacheRepo) DeleteStale(ctx context.Context) error {
queryBuilder := r.db.squirrel.Delete("feed_cache")
if r.db.Driver == "sqlite" {
queryBuilder = queryBuilder.Where(sq.Expr("ttl < datetime('now', 'localtime', '-30 days')"))
} else {
queryBuilder = queryBuilder.Where(sq.Lt{"ttl": time.Now().AddDate(0, 0, -30)})
}
query, args, err := queryBuilder.ToSql()
if err != nil {
return errors.Wrap(err, "error building query")
}
result, err := r.db.handler.ExecContext(ctx, query, args...)
if err != nil {
return errors.Wrap(err, "error executing query")
}
rows, err := result.RowsAffected()
if err != nil {
return errors.Wrap(err, "error exec result")
}
r.log.Debug().Msgf("deleted %d rows from stale feed cache", rows)
return nil return nil
} }

View file

@ -353,12 +353,16 @@ CREATE TABLE feed
CREATE TABLE feed_cache CREATE TABLE feed_cache
( (
bucket TEXT, feed_id INTEGER NOT NULL,
key TEXT, key TEXT,
value TEXT, value TEXT,
ttl TIMESTAMP ttl TIMESTAMP,
FOREIGN KEY (feed_id) REFERENCES feed (id) ON DELETE cascade
); );
CREATE INDEX feed_cache_feed_id_key_index
ON feed_cache (feed_id, key);
CREATE TABLE api_key CREATE TABLE api_key
( (
name TEXT, name TEXT,
@ -776,4 +780,18 @@ ALTER TABLE release_action_status
ALTER TABLE filter ALTER TABLE filter
DROP COLUMN IF EXISTS external_webhook_expect_status; DROP COLUMN IF EXISTS external_webhook_expect_status;
`, `,
`DROP TABLE IF EXISTS feed_cache;
CREATE TABLE feed_cache
(
feed_id INTEGER NOT NULL,
key TEXT,
value TEXT,
ttl TIMESTAMP,
FOREIGN KEY (feed_id) REFERENCES feed (id) ON DELETE cascade
);
CREATE INDEX feed_cache_feed_id_key_index
ON feed_cache (feed_id, key);
`,
} }

View file

@ -346,12 +346,16 @@ CREATE TABLE feed
CREATE TABLE feed_cache CREATE TABLE feed_cache
( (
bucket TEXT, feed_id INTEGER NOT NULL,
key TEXT, key TEXT,
value TEXT, value TEXT,
ttl TIMESTAMP ttl TIMESTAMP,
FOREIGN KEY (feed_id) REFERENCES feed (id) ON DELETE cascade
); );
CREATE INDEX feed_cache_feed_id_key_index
ON feed_cache (feed_id, key);
CREATE TABLE api_key CREATE TABLE api_key
( (
name TEXT, name TEXT,
@ -1326,5 +1330,19 @@ drop table filter;
alter table filter_dg_tmp alter table filter_dg_tmp
rename to filter; rename to filter;
`,
`DROP TABLE IF EXISTS feed_cache;
CREATE TABLE feed_cache
(
feed_id INTEGER NOT NULL,
key TEXT,
value TEXT,
ttl TIMESTAMP,
FOREIGN KEY (feed_id) REFERENCES feed (id) ON DELETE cascade
);
CREATE INDEX feed_cache_feed_id_key_index
ON feed_cache (feed_id, key);
`, `,
} }

View file

@ -9,13 +9,14 @@ import (
) )
type FeedCacheRepo interface { type FeedCacheRepo interface {
Get(bucket string, key string) ([]byte, error) Get(feedId int, key string) ([]byte, error)
GetByBucket(ctx context.Context, bucket string) ([]FeedCacheItem, error) GetByFeed(ctx context.Context, feedId int) ([]FeedCacheItem, error)
GetCountByBucket(ctx context.Context, bucket string) (int, error) GetCountByFeed(ctx context.Context, feedId int) (int, error)
Exists(bucket string, key string) (bool, error) Exists(feedId int, key string) (bool, error)
Put(bucket string, key string, val []byte, ttl time.Time) error Put(feedId int, key string, val []byte, ttl time.Time) error
Delete(ctx context.Context, bucket string, key string) error Delete(ctx context.Context, feedId int, key string) error
DeleteBucket(ctx context.Context, bucket string) error DeleteByFeed(ctx context.Context, feedId int) error
DeleteStale(ctx context.Context) error
} }
type FeedRepo interface { type FeedRepo interface {
@ -79,7 +80,7 @@ const (
) )
type FeedCacheItem struct { type FeedCacheItem struct {
Bucket string `json:"bucket"` FeedId string `json:"feed_id"`
Key string `json:"key"` Key string `json:"key"`
Value []byte `json:"value"` Value []byte `json:"value"`
TTL time.Time `json:"ttl"` TTL time.Time `json:"ttl"`

35
internal/feed/cleanup.go Normal file
View file

@ -0,0 +1,35 @@
// Copyright (c) 2021 - 2023, Ludvig Lundgren and the autobrr contributors.
// SPDX-License-Identifier: GPL-2.0-or-later
package feed
import (
"context"
"time"
"github.com/autobrr/autobrr/internal/domain"
"github.com/rs/zerolog"
)
type CleanupJob struct {
log zerolog.Logger
cacheRepo domain.FeedCacheRepo
CronSchedule time.Duration
}
func NewCleanupJob(log zerolog.Logger, cacheRepo domain.FeedCacheRepo) *CleanupJob {
return &CleanupJob{
log: log,
cacheRepo: cacheRepo,
}
}
func (j *CleanupJob) Run() {
if err := j.cacheRepo.DeleteStale(context.Background()); err != nil {
j.log.Error().Err(err).Msg("error when running feed cache cleanup job")
}
j.log.Info().Msg("successfully ran feed-cache-cleanup job")
}

View file

@ -133,7 +133,7 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) {
j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID) j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID)
} }
j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feed.Channel.Items)) j.Log.Debug().Msgf("refreshing feed: %s, found (%d) items", j.Name, len(feed.Channel.Items))
items := make([]newznab.FeedItem, 0) items := make([]newznab.FeedItem, 0)
if len(feed.Channel.Items) == 0 { if len(feed.Channel.Items) == 0 {
@ -146,15 +146,16 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) {
for _, i := range feed.Channel.Items { for _, i := range feed.Channel.Items {
if i.GUID == "" { if i.GUID == "" {
j.Log.Error().Err(err).Msgf("missing GUID from feed: %s", j.Feed.Name) j.Log.Error().Msgf("missing GUID from feed: %s", j.Feed.Name)
continue continue
} }
exists, err := j.CacheRepo.Exists(j.Name, i.GUID) exists, err := j.CacheRepo.Exists(j.Feed.ID, i.GUID)
if err != nil { if err != nil {
j.Log.Error().Err(err).Msg("could not check if item exists") j.Log.Error().Err(err).Msg("could not check if item exists")
continue continue
} }
if exists { if exists {
j.Log.Trace().Msgf("cache item exists, skipping release: %s", i.Title) j.Log.Trace().Msgf("cache item exists, skipping release: %s", i.Title)
continue continue
@ -165,7 +166,7 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) {
// set ttl to 1 month // set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0) ttl := time.Now().AddDate(0, 1, 0)
if err := j.CacheRepo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil { if err := j.CacheRepo.Put(j.Feed.ID, i.GUID, []byte(i.Title), ttl); err != nil {
j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache") j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache")
continue continue
} }

View file

@ -6,7 +6,6 @@ package feed
import ( import (
"context" "context"
"encoding/xml" "encoding/xml"
"fmt"
"net/url" "net/url"
"regexp" "regexp"
"time" "time"
@ -227,8 +226,6 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error)
return return
} }
bucketKey := fmt.Sprintf("%v+%v", j.IndexerIdentifier, j.Name)
//sort.Sort(feed) //sort.Sort(feed)
// set ttl to 1 month // set ttl to 1 month
@ -245,7 +242,7 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error)
} }
} }
exists, err := j.CacheRepo.Exists(bucketKey, key) exists, err := j.CacheRepo.Exists(j.Feed.ID, key)
if err != nil { if err != nil {
j.Log.Error().Err(err).Msg("could not check if item exists") j.Log.Error().Err(err).Msg("could not check if item exists")
continue continue
@ -257,7 +254,7 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error)
j.Log.Debug().Msgf("found new release: %s", i.Title) j.Log.Debug().Msgf("found new release: %s", i.Title)
if err := j.CacheRepo.Put(bucketKey, key, []byte(item.Title), ttl); err != nil { if err := j.CacheRepo.Put(j.Feed.ID, key, []byte(item.Title), ttl); err != nil {
j.Log.Error().Err(err).Str("entry", key).Msg("cache.Put: error storing item in cache") j.Log.Error().Err(err).Str("entry", key).Msg("cache.Put: error storing item in cache")
continue continue
} }

View file

@ -7,7 +7,6 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"strconv"
"time" "time"
"github.com/autobrr/autobrr/internal/domain" "github.com/autobrr/autobrr/internal/domain"
@ -20,6 +19,7 @@ import (
"github.com/dcarbone/zadapters/zstdlog" "github.com/dcarbone/zadapters/zstdlog"
"github.com/mmcdole/gofeed" "github.com/mmcdole/gofeed"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
@ -27,7 +27,7 @@ type Service interface {
FindByID(ctx context.Context, id int) (*domain.Feed, error) FindByID(ctx context.Context, id int) (*domain.Feed, error)
FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error)
Find(ctx context.Context) ([]domain.Feed, error) Find(ctx context.Context) ([]domain.Feed, error)
GetCacheByID(ctx context.Context, bucket string) ([]domain.FeedCacheItem, error) GetCacheByID(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error)
Store(ctx context.Context, feed *domain.Feed) error Store(ctx context.Context, feed *domain.Feed) error
Update(ctx context.Context, feed *domain.Feed) error Update(ctx context.Context, feed *domain.Feed) error
Test(ctx context.Context, feed *domain.Feed) error Test(ctx context.Context, feed *domain.Feed) error
@ -35,6 +35,7 @@ type Service interface {
Delete(ctx context.Context, id int) error Delete(ctx context.Context, id int) error
DeleteFeedCache(ctx context.Context, id int) error DeleteFeedCache(ctx context.Context, id int) error
GetLastRunData(ctx context.Context, id int) (string, error) GetLastRunData(ctx context.Context, id int) (string, error)
DeleteFeedCacheStale(ctx context.Context) error
Start() error Start() error
} }
@ -50,14 +51,14 @@ type feedInstance struct {
Timeout time.Duration Timeout time.Duration
} }
// feedKey creates a unique identifier to be used for controlling jobs in the scheduler
type feedKey struct { type feedKey struct {
id int id int
indexer string
name string
} }
// ToString creates a string of the unique id to be used for controlling jobs in the scheduler
func (k feedKey) ToString() string { func (k feedKey) ToString() string {
return fmt.Sprintf("%v+%v+%v", k.id, k.indexer, k.name) return fmt.Sprintf("feed-%d", k.id)
} }
type service struct { type service struct {
@ -93,22 +94,8 @@ func (s *service) Find(ctx context.Context) ([]domain.Feed, error) {
return s.repo.Find(ctx) return s.repo.Find(ctx)
} }
func (s *service) GetCacheByID(ctx context.Context, bucket string) ([]domain.FeedCacheItem, error) { func (s *service) GetCacheByID(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error) {
id, _ := strconv.Atoi(bucket) return s.cacheRepo.GetByFeed(ctx, feedId)
feed, err := s.repo.FindByID(ctx, id)
if err != nil {
s.log.Error().Err(err).Msgf("could not find feed by id: %v", id)
return nil, err
}
data, err := s.cacheRepo.GetByBucket(ctx, feed.Name)
if err != nil {
s.log.Error().Err(err).Msg("could not get feed cache")
return nil, err
}
return data, err
} }
func (s *service) Store(ctx context.Context, feed *domain.Feed) error { func (s *service) Store(ctx context.Context, feed *domain.Feed) error {
@ -124,18 +111,11 @@ func (s *service) Delete(ctx context.Context, id int) error {
} }
func (s *service) DeleteFeedCache(ctx context.Context, id int) error { func (s *service) DeleteFeedCache(ctx context.Context, id int) error {
feed, err := s.repo.FindByID(ctx, id) return s.cacheRepo.DeleteByFeed(ctx, id)
if err != nil { }
s.log.Error().Err(err).Msgf("could not find feed by id: %d", id)
return err
}
if err := s.cacheRepo.DeleteBucket(ctx, feed.Name); err != nil { func (s *service) DeleteFeedCacheStale(ctx context.Context) error {
s.log.Error().Err(err).Msgf("could not clear feed cache: %d", id) return s.cacheRepo.DeleteStale(ctx)
return err
}
return nil
} }
func (s *service) ToggleEnabled(ctx context.Context, id int, enabled bool) error { func (s *service) ToggleEnabled(ctx context.Context, id int, enabled bool) error {
@ -171,22 +151,16 @@ func (s *service) delete(ctx context.Context, id int) error {
return err return err
} }
s.log.Debug().Msgf("stopping and removing feed: %v", f.Name) s.log.Debug().Msgf("stopping and removing feed: %s", f.Name)
identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString() if err := s.stopFeedJob(f.ID); err != nil {
s.log.Error().Err(err).Msgf("error stopping rss job: %s id: %d", f.Name, f.ID)
if err := s.stopFeedJob(identifierKey); err != nil {
s.log.Error().Err(err).Msg("error stopping rss job")
return err return err
} }
if err := s.repo.Delete(ctx, id); err != nil { // delete feed and cascade delete feed_cache by fk
s.log.Error().Err(err).Msg("error deleting feed") if err := s.repo.Delete(ctx, f.ID); err != nil {
return err s.log.Error().Err(err).Msgf("error deleting feed: %s", f.Name)
}
if err := s.cacheRepo.DeleteBucket(ctx, f.Name); err != nil {
s.log.Error().Err(err).Msgf("could not delete feedCache bucket by id: %v", id)
return err return err
} }
@ -215,24 +189,22 @@ func (s *service) toggleEnabled(ctx context.Context, id int, enabled bool) error
return err return err
} }
s.log.Debug().Msgf("feed started: %v", f.Name) s.log.Debug().Msgf("feed started: %s", f.Name)
return nil return nil
} else { }
s.log.Debug().Msgf("stopping feed: %v", f.Name)
identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString() s.log.Debug().Msgf("stopping feed: %s", f.Name)
if err := s.stopFeedJob(identifierKey); err != nil { if err := s.stopFeedJob(f.ID); err != nil {
s.log.Error().Err(err).Msg("error stopping feed job") s.log.Error().Err(err).Msg("error stopping feed job")
return err return err
} }
s.log.Debug().Msgf("feed stopped: %v", f.Name) s.log.Debug().Msgf("feed stopped: %s", f.Name)
return nil return nil
} }
}
return nil return nil
} }
@ -274,7 +246,7 @@ func (s *service) testRSS(ctx context.Context, feed *domain.Feed) error {
return errors.Wrap(err, "error fetching rss feed items") return errors.Wrap(err, "error fetching rss feed items")
} }
s.log.Info().Msgf("refreshing rss feed: %v, found (%d) items", feed.Name, len(f.Items)) s.log.Info().Msgf("refreshing rss feed: %s, found (%d) items", feed.Name, len(f.Items))
return nil return nil
} }
@ -289,7 +261,7 @@ func (s *service) testTorznab(ctx context.Context, feed *domain.Feed, subLogger
return err return err
} }
s.log.Info().Msgf("refreshing torznab feed: %v, found (%d) items", feed.Name, len(items.Channel.Items)) s.log.Info().Msgf("refreshing torznab feed: %s, found (%d) items", feed.Name, len(items.Channel.Items))
return nil return nil
} }
@ -304,13 +276,18 @@ func (s *service) testNewznab(ctx context.Context, feed *domain.Feed, subLogger
return err return err
} }
s.log.Info().Msgf("refreshing newznab feed: %v, found (%d) items", feed.Name, len(items.Channel.Items)) s.log.Info().Msgf("refreshing newznab feed: %s, found (%d) items", feed.Name, len(items.Channel.Items))
return nil return nil
} }
func (s *service) start() error { func (s *service) start() error {
// get all torznab indexer definitions // always run feed cache maintenance job
if err := s.createCleanupJob(); err != nil {
s.log.Error().Err(err).Msg("could not start feed cache cleanup job")
}
// get all feeds
feeds, err := s.repo.Find(context.TODO()) feeds, err := s.repo.Find(context.TODO())
if err != nil { if err != nil {
s.log.Error().Err(err).Msg("error finding feeds") s.log.Error().Err(err).Msg("error finding feeds")
@ -329,12 +306,10 @@ func (s *service) start() error {
} }
func (s *service) restartJob(f *domain.Feed) error { func (s *service) restartJob(f *domain.Feed) error {
s.log.Debug().Msgf("stopping feed: %v", f.Name) s.log.Debug().Msgf("stopping feed: %s", f.Name)
identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString()
// stop feed job // stop feed job
if err := s.stopFeedJob(identifierKey); err != nil { if err := s.stopFeedJob(f.ID); err != nil {
s.log.Error().Err(err).Msg("error stopping feed job") s.log.Error().Err(err).Msg("error stopping feed job")
return err return err
} }
@ -345,21 +320,21 @@ func (s *service) restartJob(f *domain.Feed) error {
return err return err
} }
s.log.Debug().Msgf("restarted feed: %v", f.Name) s.log.Debug().Msgf("restarted feed: %s", f.Name)
} }
return nil return nil
} }
func (s *service) startJob(f *domain.Feed) error { func (s *service) startJob(f *domain.Feed) error {
// get all torznab indexer definitions // if it's not enabled we should not start it
if !f.Enabled { if !f.Enabled {
return nil return nil
} }
// get torznab_url from settings // get torznab_url from settings
if f.URL == "" { if f.URL == "" {
return errors.New("no URL provided for feed: %v", f.Name) return errors.New("no URL provided for feed: %s", f.Name)
} }
// cron schedule to run every X minutes // cron schedule to run every X minutes
@ -374,32 +349,49 @@ func (s *service) startJob(f *domain.Feed) error {
Timeout: time.Duration(f.Timeout) * time.Second, Timeout: time.Duration(f.Timeout) * time.Second,
} }
var err error
var job cron.Job
switch fi.Implementation { switch fi.Implementation {
case string(domain.FeedTypeTorznab): case string(domain.FeedTypeTorznab):
if err := s.addTorznabJob(fi); err != nil { job, err = s.createTorznabJob(fi)
s.log.Error().Err(err).Msg("failed to initialize torznab feed")
return err
}
case string(domain.FeedTypeNewznab): case string(domain.FeedTypeNewznab):
if err := s.addNewznabJob(fi); err != nil { job, err = s.createNewznabJob(fi)
s.log.Error().Err(err).Msg("failed to initialize newznab feed")
case string(domain.FeedTypeRSS):
job, err = s.createRSSJob(fi)
default:
return errors.New("unsupported feed type: %s", fi.Implementation)
}
if err != nil {
s.log.Error().Err(err).Msgf("failed to initialize %s feed", fi.Implementation)
return err return err
} }
case string(domain.FeedTypeRSS): identifierKey := feedKey{f.ID}.ToString()
if err := s.addRSSJob(fi); err != nil {
s.log.Error().Err(err).Msg("failed to initialize rss feed") // schedule job
return err id, err := s.scheduler.ScheduleJob(job, fi.CronSchedule, identifierKey)
} if err != nil {
return errors.Wrap(err, "add job %s failed", identifierKey)
} }
// add to job map
s.jobs[identifierKey] = id
s.log.Debug().Msgf("successfully started feed: %s", f.Name)
return nil return nil
} }
func (s *service) addTorznabJob(f feedInstance) error { func (s *service) createTorznabJob(f feedInstance) (cron.Job, error) {
s.log.Debug().Msgf("create torznab job: %s", f.Name)
if f.URL == "" { if f.URL == "" {
return errors.New("torznab feed requires URL") return nil, errors.New("torznab feed requires URL")
} }
//if f.CronSchedule < 5*time.Minute { //if f.CronSchedule < 5*time.Minute {
@ -410,62 +402,38 @@ func (s *service) addTorznabJob(f feedInstance) error {
l := s.log.With().Str("feed", f.Name).Logger() l := s.log.With().Str("feed", f.Name).Logger()
// setup torznab Client // setup torznab Client
c := torznab.NewClient(torznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout}) client := torznab.NewClient(torznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout})
// create job // create job
job := NewTorznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, c, s.repo, s.cacheRepo, s.releaseSvc) job := NewTorznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, client, s.repo, s.cacheRepo, s.releaseSvc)
identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString() return job, nil
// schedule job
id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey)
if err != nil {
return errors.Wrap(err, "feed.AddTorznabJob: add job failed")
}
job.JobID = id
// add to job map
s.jobs[identifierKey] = id
s.log.Debug().Msgf("add torznab job: %v", f.Name)
return nil
} }
func (s *service) addNewznabJob(f feedInstance) error { func (s *service) createNewznabJob(f feedInstance) (cron.Job, error) {
s.log.Debug().Msgf("add newznab job: %s", f.Name)
if f.URL == "" { if f.URL == "" {
return errors.New("newznab feed requires URL") return nil, errors.New("newznab feed requires URL")
} }
// setup logger // setup logger
l := s.log.With().Str("feed", f.Name).Logger() l := s.log.With().Str("feed", f.Name).Logger()
// setup newznab Client // setup newznab Client
c := newznab.NewClient(newznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout}) client := newznab.NewClient(newznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout})
// create job // create job
job := NewNewznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, c, s.repo, s.cacheRepo, s.releaseSvc) job := NewNewznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, client, s.repo, s.cacheRepo, s.releaseSvc)
identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString() return job, nil
// schedule job
id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey)
if err != nil {
return errors.Wrap(err, "feed.AddNewznabJob: add job failed")
}
job.JobID = id
// add to job map
s.jobs[identifierKey] = id
s.log.Debug().Msgf("add newznab job: %v", f.Name)
return nil
} }
func (s *service) addRSSJob(f feedInstance) error { func (s *service) createRSSJob(f feedInstance) (cron.Job, error) {
s.log.Debug().Msgf("add rss job: %s", f.Name)
if f.URL == "" { if f.URL == "" {
return errors.New("rss feed requires URL") return nil, errors.New("rss feed requires URL")
} }
//if f.CronSchedule < time.Duration(5*time.Minute) { //if f.CronSchedule < time.Duration(5*time.Minute) {
@ -478,36 +446,43 @@ func (s *service) addRSSJob(f feedInstance) error {
// create job // create job
job := NewRSSJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, s.repo, s.cacheRepo, s.releaseSvc, f.Timeout) job := NewRSSJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, s.repo, s.cacheRepo, s.releaseSvc, f.Timeout)
identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString() return job, nil
}
// schedule job func (s *service) createCleanupJob() error {
id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey) // setup logger
l := s.log.With().Str("job", "feed-cache-cleanup").Logger()
// create job
job := NewCleanupJob(l, s.cacheRepo)
identifierKey := "feed-cache-cleanup"
// schedule job for every day at 03:05
id, err := s.scheduler.AddJob(job, "5 3 * * *", identifierKey)
if err != nil { if err != nil {
return errors.Wrap(err, "feed.AddRSSJob: add job failed") return errors.Wrap(err, "add job %s failed", identifierKey)
} }
job.JobID = id
// add to job map // add to job map
s.jobs[identifierKey] = id s.jobs[identifierKey] = id
s.log.Debug().Msgf("add rss job: %v", f.Name)
return nil return nil
} }
func (s *service) stopFeedJob(indexer string) error { func (s *service) stopFeedJob(id int) error {
// remove job from scheduler // remove job from scheduler
if err := s.scheduler.RemoveJobByIdentifier(indexer); err != nil { if err := s.scheduler.RemoveJobByIdentifier(feedKey{id}.ToString()); err != nil {
return errors.Wrap(err, "stop job failed") return errors.Wrap(err, "stop job failed")
} }
s.log.Debug().Msgf("stop feed job: %v", indexer) s.log.Debug().Msgf("stop feed job: %d", id)
return nil return nil
} }
func (s *service) GetNextRun(indexer string) (time.Time, error) { func (s *service) GetNextRun(id int) (time.Time, error) {
return s.scheduler.GetNextRun(indexer) return s.scheduler.GetNextRun(feedKey{id}.ToString())
} }
func (s *service) GetLastRunData(ctx context.Context, id int) (string, error) { func (s *service) GetLastRunData(ctx context.Context, id int) (string, error) {

View file

@ -214,7 +214,7 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) {
continue continue
} }
exists, err := j.CacheRepo.Exists(j.Name, i.GUID) exists, err := j.CacheRepo.Exists(j.Feed.ID, i.GUID)
if err != nil { if err != nil {
j.Log.Error().Err(err).Msg("could not check if item exists") j.Log.Error().Err(err).Msg("could not check if item exists")
continue continue
@ -229,7 +229,7 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) {
// set ttl to 1 month // set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0) ttl := time.Now().AddDate(0, 1, 0)
if err := j.CacheRepo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil { if err := j.CacheRepo.Put(j.Feed.ID, i.GUID, []byte(i.Title), ttl); err != nil {
j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache") j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache")
continue continue
} }

View file

@ -76,7 +76,6 @@ func NewService(log logger.Logger, config *domain.Config, repo domain.IndexerRep
} }
func (s *service) Store(ctx context.Context, indexer domain.Indexer) (*domain.Indexer, error) { func (s *service) Store(ctx context.Context, indexer domain.Indexer) (*domain.Indexer, error) {
// if indexer is rss or torznab do additional cleanup for identifier // if indexer is rss or torznab do additional cleanup for identifier
switch indexer.Implementation { switch indexer.Implementation {
case "torznab", "newznab", "rss": case "torznab", "newznab", "rss":
@ -84,18 +83,18 @@ func (s *service) Store(ctx context.Context, indexer domain.Indexer) (*domain.In
cleanName := strings.ToLower(indexer.Name) cleanName := strings.ToLower(indexer.Name)
// torznab-name OR rss-name // torznab-name OR rss-name
indexer.Identifier = slug.Make(fmt.Sprintf("%v-%v", indexer.Implementation, cleanName)) indexer.Identifier = slug.Make(fmt.Sprintf("%s-%s", indexer.Implementation, cleanName))
} }
i, err := s.repo.Store(ctx, indexer) i, err := s.repo.Store(ctx, indexer)
if err != nil { if err != nil {
s.log.Error().Stack().Err(err).Msgf("failed to store indexer: %v", indexer.Name) s.log.Error().Err(err).Msgf("failed to store indexer: %s", indexer.Name)
return nil, err return nil, err
} }
// add to indexerInstances // add to indexerInstances
if err = s.addIndexer(*i); err != nil { if err = s.addIndexer(*i); err != nil {
s.log.Error().Stack().Err(err).Msgf("failed to add indexer: %v", indexer.Name) s.log.Error().Err(err).Msgf("failed to add indexer: %s", indexer.Name)
return nil, err return nil, err
} }
@ -111,7 +110,7 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I
// add to indexerInstances // add to indexerInstances
if err = s.updateIndexer(*i); err != nil { if err = s.updateIndexer(*i); err != nil {
s.log.Error().Err(err).Msgf("failed to add indexer: %v", indexer.Name) s.log.Error().Err(err).Msgf("failed to add indexer: %s", indexer.Name)
return nil, err return nil, err
} }
@ -121,7 +120,7 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I
} }
} }
s.log.Debug().Msgf("successfully updated indexer: %v", indexer.Name) s.log.Debug().Msgf("successfully updated indexer: %s", indexer.Name)
return i, nil return i, nil
} }
@ -133,7 +132,7 @@ func (s *service) Delete(ctx context.Context, id int) error {
} }
if err := s.repo.Delete(ctx, id); err != nil { if err := s.repo.Delete(ctx, id); err != nil {
s.log.Error().Err(err).Msgf("could not delete indexer by id: %v", id) s.log.Error().Err(err).Msgf("could not delete indexer by id: %d", id)
return err return err
} }
@ -150,7 +149,7 @@ func (s *service) Delete(ctx context.Context, id int) error {
func (s *service) FindByFilterID(ctx context.Context, id int) ([]domain.Indexer, error) { func (s *service) FindByFilterID(ctx context.Context, id int) ([]domain.Indexer, error) {
indexers, err := s.repo.FindByFilterID(ctx, id) indexers, err := s.repo.FindByFilterID(ctx, id)
if err != nil { if err != nil {
s.log.Error().Err(err).Msgf("could not find indexers by filter id: %v", id) s.log.Error().Err(err).Msgf("could not find indexers by filter id: %d", id)
return nil, err return nil, err
} }
@ -160,7 +159,7 @@ func (s *service) FindByFilterID(ctx context.Context, id int) ([]domain.Indexer,
func (s *service) FindByID(ctx context.Context, id int) (*domain.Indexer, error) { func (s *service) FindByID(ctx context.Context, id int) (*domain.Indexer, error) {
indexers, err := s.repo.FindByID(ctx, id) indexers, err := s.repo.FindByID(ctx, id)
if err != nil { if err != nil {
s.log.Error().Err(err).Msgf("could not find indexer by id: %v", id) s.log.Error().Err(err).Msgf("could not find indexer by id: %d", id)
return nil, err return nil, err
} }
@ -340,7 +339,7 @@ func (s *service) Start() error {
// check if it has api and add to api service // check if it has api and add to api service
if indexer.Enabled && indexer.HasApi() { if indexer.Enabled && indexer.HasApi() {
if err := s.ApiService.AddClient(indexer.Identifier, indexer.SettingsMap); err != nil { if err := s.ApiService.AddClient(indexer.Identifier, indexer.SettingsMap); err != nil {
s.log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%v'", indexer.Identifier) s.log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%s'", indexer.Identifier)
} }
} }
} }
@ -391,7 +390,7 @@ func (s *service) addIndexer(indexer domain.Indexer) error {
// check if it has api and add to api service // check if it has api and add to api service
if indexerDefinition.HasApi() { if indexerDefinition.HasApi() {
if err := s.ApiService.AddClient(indexerDefinition.Identifier, indexerDefinition.SettingsMap); err != nil { if err := s.ApiService.AddClient(indexerDefinition.Identifier, indexerDefinition.SettingsMap); err != nil {
s.log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%v'", indexer.Identifier) s.log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%s'", indexer.Identifier)
} }
} }
} }
@ -481,18 +480,18 @@ func (s *service) LoadIndexerDefinitions() error {
file := "definitions/" + f.Name() file := "definitions/" + f.Name()
s.log.Trace().Msgf("parsing: %v", file) s.log.Trace().Msgf("parsing: %s", file)
data, err := fs.ReadFile(Definitions, file) data, err := fs.ReadFile(Definitions, file)
if err != nil { if err != nil {
s.log.Error().Stack().Err(err).Msgf("failed reading file: %v", file) s.log.Error().Stack().Err(err).Msgf("failed reading file: %s", file)
return errors.Wrap(err, "could not read file: %v", file) return errors.Wrap(err, "could not read file: %s", file)
} }
var d domain.IndexerDefinition var d domain.IndexerDefinition
if err = yaml.Unmarshal(data, &d); err != nil { if err = yaml.Unmarshal(data, &d); err != nil {
s.log.Error().Stack().Err(err).Msgf("failed unmarshal file: %v", file) s.log.Error().Stack().Err(err).Msgf("failed unmarshal file: %s", file)
return errors.Wrap(err, "could not unmarshal file: %v", file) return errors.Wrap(err, "could not unmarshal file: %s", file)
} }
if d.Implementation == "" { if d.Implementation == "" {
@ -515,7 +514,7 @@ func (s *service) LoadCustomIndexerDefinitions() error {
outputDirRead, err := os.Open(s.config.CustomDefinitions) outputDirRead, err := os.Open(s.config.CustomDefinitions)
if err != nil { if err != nil {
s.log.Warn().Stack().Msgf("failed opening custom definitions directory %q: %s", s.config.CustomDefinitions, err) s.log.Error().Err(err).Msgf("failed opening custom definitions directory %s", s.config.CustomDefinitions)
return nil return nil
} }
@ -538,22 +537,22 @@ func (s *service) LoadCustomIndexerDefinitions() error {
file := filepath.Join(s.config.CustomDefinitions, f.Name()) file := filepath.Join(s.config.CustomDefinitions, f.Name())
s.log.Trace().Msgf("parsing custom: %v", file) s.log.Trace().Msgf("parsing custom: %s", file)
data, err := os.ReadFile(file) data, err := os.ReadFile(file)
if err != nil { if err != nil {
s.log.Error().Stack().Err(err).Msgf("failed reading file: %v", file) s.log.Error().Stack().Err(err).Msgf("failed reading file: %s", file)
return errors.Wrap(err, "could not read file: %v", file) return errors.Wrap(err, "could not read file: %s", file)
} }
var d *domain.IndexerDefinitionCustom var d *domain.IndexerDefinitionCustom
if err = yaml.Unmarshal(data, &d); err != nil { if err = yaml.Unmarshal(data, &d); err != nil {
s.log.Error().Stack().Err(err).Msgf("failed unmarshal file: %v", file) s.log.Error().Stack().Err(err).Msgf("failed unmarshal file: %s", file)
return errors.Wrap(err, "could not unmarshal file: %v", file) return errors.Wrap(err, "could not unmarshal file: %s", file)
} }
if d == nil { if d == nil {
s.log.Warn().Stack().Err(err).Msgf("skipping empty file: %v", file) s.log.Warn().Stack().Err(err).Msgf("skipping empty file: %s", file)
continue continue
} }
@ -563,7 +562,7 @@ func (s *service) LoadCustomIndexerDefinitions() error {
// to prevent crashing from non-updated definitions lets skip // to prevent crashing from non-updated definitions lets skip
if d.Implementation == "irc" && d.IRC.Parse == nil { if d.Implementation == "irc" && d.IRC.Parse == nil {
s.log.Warn().Msgf("DEPRECATED: indexer definition version: %v", file) s.log.Warn().Msgf("DEPRECATED: indexer definition version: %s", file)
} }
s.definitions[d.Identifier] = *d.ToIndexerDefinition() s.definitions[d.Identifier] = *d.ToIndexerDefinition()

View file

@ -11,6 +11,7 @@ import (
"github.com/autobrr/autobrr/internal/logger" "github.com/autobrr/autobrr/internal/logger"
"github.com/autobrr/autobrr/internal/notification" "github.com/autobrr/autobrr/internal/notification"
"github.com/autobrr/autobrr/internal/update" "github.com/autobrr/autobrr/internal/update"
"github.com/autobrr/autobrr/pkg/errors"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -19,7 +20,8 @@ import (
type Service interface { type Service interface {
Start() Start()
Stop() Stop()
AddJob(job cron.Job, interval time.Duration, identifier string) (int, error) ScheduleJob(job cron.Job, interval time.Duration, identifier string) (int, error)
AddJob(job cron.Job, spec string, identifier string) (int, error)
RemoveJobByIdentifier(id string) error RemoveJobByIdentifier(id string) error
GetNextRun(id string) (time.Time, error) GetNextRun(id string) (time.Time, error)
} }
@ -74,7 +76,7 @@ func (s *service) addAppJobs() {
lastCheckVersion: s.version, lastCheckVersion: s.version,
} }
if id, err := s.AddJob(checkUpdates, 2*time.Hour, "app-check-updates"); err != nil { if id, err := s.ScheduleJob(checkUpdates, 2*time.Hour, "app-check-updates"); err != nil {
s.log.Error().Err(err).Msgf("scheduler.addAppJobs: error adding job: %v", id) s.log.Error().Err(err).Msgf("scheduler.addAppJobs: error adding job: %v", id)
} }
} }
@ -86,11 +88,27 @@ func (s *service) Stop() {
return return
} }
func (s *service) AddJob(job cron.Job, interval time.Duration, identifier string) (int, error) { // ScheduleJob takes a time duration and adds a job
func (s *service) ScheduleJob(job cron.Job, interval time.Duration, identifier string) (int, error) {
id := s.cron.Schedule(cron.Every(interval), cron.NewChain(cron.SkipIfStillRunning(cron.DiscardLogger)).Then(job))
id := s.cron.Schedule(cron.Every(interval), cron.NewChain( s.log.Debug().Msgf("scheduler.ScheduleJob: job successfully added: %s id %d", identifier, id)
cron.SkipIfStillRunning(cron.DiscardLogger)).Then(job),
) s.m.Lock()
// add to job map
s.jobs[identifier] = id
s.m.Unlock()
return int(id), nil
}
// AddJob takes a cron schedule and adds a job
func (s *service) AddJob(job cron.Job, spec string, identifier string) (int, error) {
id, err := s.cron.AddJob(spec, cron.NewChain(cron.SkipIfStillRunning(cron.DiscardLogger)).Then(job))
if err != nil {
return 0, errors.Wrap(err, "could not add job to cron")
}
s.log.Debug().Msgf("scheduler.AddJob: job successfully added: %s id %d", identifier, id) s.log.Debug().Msgf("scheduler.AddJob: job successfully added: %s id %d", identifier, id)