diff --git a/internal/database/feed_cache.go b/internal/database/feed_cache.go index 5d0fb6c..e289ac1 100644 --- a/internal/database/feed_cache.go +++ b/internal/database/feed_cache.go @@ -28,14 +28,14 @@ func NewFeedCacheRepo(log logger.Logger, db *DB) domain.FeedCacheRepo { } } -func (r *FeedCacheRepo) Get(bucket string, key string) ([]byte, error) { +func (r *FeedCacheRepo) Get(feedId int, key string) ([]byte, error) { queryBuilder := r.db.squirrel. Select( "value", "ttl", ). From("feed_cache"). - Where(sq.Eq{"bucket": bucket}). + Where(sq.Eq{"feed_id": feedId}). Where(sq.Eq{"key": key}). Where(sq.Gt{"ttl": time.Now()}) @@ -63,16 +63,16 @@ func (r *FeedCacheRepo) Get(bucket string, key string) ([]byte, error) { return value, nil } -func (r *FeedCacheRepo) GetByBucket(ctx context.Context, bucket string) ([]domain.FeedCacheItem, error) { +func (r *FeedCacheRepo) GetByFeed(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error) { queryBuilder := r.db.squirrel. Select( - "bucket", + "feed_id", "key", "value", "ttl", ). From("feed_cache"). - Where(sq.Eq{"bucket": bucket}) + Where(sq.Eq{"feed_id": feedId}) query, args, err := queryBuilder.ToSql() if err != nil { @@ -91,7 +91,7 @@ func (r *FeedCacheRepo) GetByBucket(ctx context.Context, bucket string) ([]domai for rows.Next() { var d domain.FeedCacheItem - if err := rows.Scan(&d.Bucket, &d.Key, &d.Value, &d.TTL); err != nil { + if err := rows.Scan(&d.FeedId, &d.Key, &d.Value, &d.TTL); err != nil { return nil, errors.Wrap(err, "error scanning row") } @@ -105,12 +105,11 @@ func (r *FeedCacheRepo) GetByBucket(ctx context.Context, bucket string) ([]domai return data, nil } -func (r *FeedCacheRepo) GetCountByBucket(ctx context.Context, bucket string) (int, error) { - +func (r *FeedCacheRepo) GetCountByFeed(ctx context.Context, feedId int) (int, error) { queryBuilder := r.db.squirrel. Select("COUNT(*)"). From("feed_cache"). - Where(sq.Eq{"bucket": bucket}) + Where(sq.Eq{"feed_id": feedId}) query, args, err := queryBuilder.ToSql() if err != nil { @@ -131,12 +130,12 @@ func (r *FeedCacheRepo) GetCountByBucket(ctx context.Context, bucket string) (in return count, nil } -func (r *FeedCacheRepo) Exists(bucket string, key string) (bool, error) { +func (r *FeedCacheRepo) Exists(feedId int, key string) (bool, error) { queryBuilder := r.db.squirrel. Select("1"). Prefix("SELECT EXISTS ("). From("feed_cache"). - Where(sq.Eq{"bucket": bucket}). + Where(sq.Eq{"feed_id": feedId}). Where(sq.Eq{"key": key}). Suffix(")") @@ -149,7 +148,7 @@ func (r *FeedCacheRepo) Exists(bucket string, key string) (bool, error) { err = r.db.handler.QueryRow(query, args...).Scan(&exists) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return false, domain.ErrRecordNotFound + return false, nil } return false, errors.Wrap(err, "error query") @@ -158,11 +157,11 @@ func (r *FeedCacheRepo) Exists(bucket string, key string) (bool, error) { return exists, nil } -func (r *FeedCacheRepo) Put(bucket string, key string, val []byte, ttl time.Time) error { +func (r *FeedCacheRepo) Put(feedId int, key string, val []byte, ttl time.Time) error { queryBuilder := r.db.squirrel. Insert("feed_cache"). - Columns("bucket", "key", "value", "ttl"). - Values(bucket, key, val, ttl) + Columns("feed_id", "key", "value", "ttl"). + Values(feedId, key, val, ttl) query, args, err := queryBuilder.ToSql() if err != nil { @@ -176,10 +175,10 @@ func (r *FeedCacheRepo) Put(bucket string, key string, val []byte, ttl time.Time return nil } -func (r *FeedCacheRepo) Delete(ctx context.Context, bucket string, key string) error { +func (r *FeedCacheRepo) Delete(ctx context.Context, feedId int, key string) error { queryBuilder := r.db.squirrel. Delete("feed_cache"). - Where(sq.Eq{"bucket": bucket}). + Where(sq.Eq{"feed_id": feedId}). Where(sq.Eq{"key": key}) query, args, err := queryBuilder.ToSql() @@ -195,10 +194,8 @@ func (r *FeedCacheRepo) Delete(ctx context.Context, bucket string, key string) e return nil } -func (r *FeedCacheRepo) DeleteBucket(ctx context.Context, bucket string) error { - queryBuilder := r.db.squirrel. - Delete("feed_cache"). - Where(sq.Eq{"bucket": bucket}) +func (r *FeedCacheRepo) DeleteByFeed(ctx context.Context, feedId int) error { + queryBuilder := r.db.squirrel.Delete("feed_cache").Where(sq.Eq{"feed_id": feedId}) query, args, err := queryBuilder.ToSql() if err != nil { @@ -215,9 +212,36 @@ func (r *FeedCacheRepo) DeleteBucket(ctx context.Context, bucket string) error { return errors.Wrap(err, "error exec result") } - if rows == 0 { - r.log.Warn().Msgf("no rows affected for delete of bucket: %s", bucket) - } + r.log.Debug().Msgf("deleted %d rows from feed cache: %d", rows, feedId) + + return nil +} + +func (r *FeedCacheRepo) DeleteStale(ctx context.Context) error { + queryBuilder := r.db.squirrel.Delete("feed_cache") + + if r.db.Driver == "sqlite" { + queryBuilder = queryBuilder.Where(sq.Expr("ttl < datetime('now', 'localtime', '-30 days')")) + } else { + queryBuilder = queryBuilder.Where(sq.Lt{"ttl": time.Now().AddDate(0, 0, -30)}) + } + + query, args, err := queryBuilder.ToSql() + if err != nil { + return errors.Wrap(err, "error building query") + } + + result, err := r.db.handler.ExecContext(ctx, query, args...) + if err != nil { + return errors.Wrap(err, "error executing query") + } + + rows, err := result.RowsAffected() + if err != nil { + return errors.Wrap(err, "error exec result") + } + + r.log.Debug().Msgf("deleted %d rows from stale feed cache", rows) return nil } diff --git a/internal/database/postgres_migrate.go b/internal/database/postgres_migrate.go index 2eda0a4..07a6d8a 100644 --- a/internal/database/postgres_migrate.go +++ b/internal/database/postgres_migrate.go @@ -353,12 +353,16 @@ CREATE TABLE feed CREATE TABLE feed_cache ( - bucket TEXT, - key TEXT, - value TEXT, - ttl TIMESTAMP + feed_id INTEGER NOT NULL, + key TEXT, + value TEXT, + ttl TIMESTAMP, + FOREIGN KEY (feed_id) REFERENCES feed (id) ON DELETE cascade ); +CREATE INDEX feed_cache_feed_id_key_index + ON feed_cache (feed_id, key); + CREATE TABLE api_key ( name TEXT, @@ -776,4 +780,18 @@ ALTER TABLE release_action_status ALTER TABLE filter DROP COLUMN IF EXISTS external_webhook_expect_status; `, + `DROP TABLE IF EXISTS feed_cache; + +CREATE TABLE feed_cache +( + feed_id INTEGER NOT NULL, + key TEXT, + value TEXT, + ttl TIMESTAMP, + FOREIGN KEY (feed_id) REFERENCES feed (id) ON DELETE cascade +); + +CREATE INDEX feed_cache_feed_id_key_index + ON feed_cache (feed_id, key); +`, } diff --git a/internal/database/sqlite_migrate.go b/internal/database/sqlite_migrate.go index 16d0184..db4a09e 100644 --- a/internal/database/sqlite_migrate.go +++ b/internal/database/sqlite_migrate.go @@ -346,12 +346,16 @@ CREATE TABLE feed CREATE TABLE feed_cache ( - bucket TEXT, - key TEXT, - value TEXT, - ttl TIMESTAMP + feed_id INTEGER NOT NULL, + key TEXT, + value TEXT, + ttl TIMESTAMP, + FOREIGN KEY (feed_id) REFERENCES feed (id) ON DELETE cascade ); +CREATE INDEX feed_cache_feed_id_key_index + ON feed_cache (feed_id, key); + CREATE TABLE api_key ( name TEXT, @@ -1326,5 +1330,19 @@ drop table filter; alter table filter_dg_tmp rename to filter; +`, + `DROP TABLE IF EXISTS feed_cache; + +CREATE TABLE feed_cache +( + feed_id INTEGER NOT NULL, + key TEXT, + value TEXT, + ttl TIMESTAMP, + FOREIGN KEY (feed_id) REFERENCES feed (id) ON DELETE cascade +); + +CREATE INDEX feed_cache_feed_id_key_index + ON feed_cache (feed_id, key); `, } diff --git a/internal/domain/feed.go b/internal/domain/feed.go index a9cdff0..00b5fdf 100644 --- a/internal/domain/feed.go +++ b/internal/domain/feed.go @@ -9,13 +9,14 @@ import ( ) type FeedCacheRepo interface { - Get(bucket string, key string) ([]byte, error) - GetByBucket(ctx context.Context, bucket string) ([]FeedCacheItem, error) - GetCountByBucket(ctx context.Context, bucket string) (int, error) - Exists(bucket string, key string) (bool, error) - Put(bucket string, key string, val []byte, ttl time.Time) error - Delete(ctx context.Context, bucket string, key string) error - DeleteBucket(ctx context.Context, bucket string) error + Get(feedId int, key string) ([]byte, error) + GetByFeed(ctx context.Context, feedId int) ([]FeedCacheItem, error) + GetCountByFeed(ctx context.Context, feedId int) (int, error) + Exists(feedId int, key string) (bool, error) + Put(feedId int, key string, val []byte, ttl time.Time) error + Delete(ctx context.Context, feedId int, key string) error + DeleteByFeed(ctx context.Context, feedId int) error + DeleteStale(ctx context.Context) error } type FeedRepo interface { @@ -79,7 +80,7 @@ const ( ) type FeedCacheItem struct { - Bucket string `json:"bucket"` + FeedId string `json:"feed_id"` Key string `json:"key"` Value []byte `json:"value"` TTL time.Time `json:"ttl"` diff --git a/internal/feed/cleanup.go b/internal/feed/cleanup.go new file mode 100644 index 0000000..74bd6b8 --- /dev/null +++ b/internal/feed/cleanup.go @@ -0,0 +1,35 @@ +// Copyright (c) 2021 - 2023, Ludvig Lundgren and the autobrr contributors. +// SPDX-License-Identifier: GPL-2.0-or-later + +package feed + +import ( + "context" + "time" + + "github.com/autobrr/autobrr/internal/domain" + + "github.com/rs/zerolog" +) + +type CleanupJob struct { + log zerolog.Logger + cacheRepo domain.FeedCacheRepo + + CronSchedule time.Duration +} + +func NewCleanupJob(log zerolog.Logger, cacheRepo domain.FeedCacheRepo) *CleanupJob { + return &CleanupJob{ + log: log, + cacheRepo: cacheRepo, + } +} + +func (j *CleanupJob) Run() { + if err := j.cacheRepo.DeleteStale(context.Background()); err != nil { + j.log.Error().Err(err).Msg("error when running feed cache cleanup job") + } + + j.log.Info().Msg("successfully ran feed-cache-cleanup job") +} diff --git a/internal/feed/newznab.go b/internal/feed/newznab.go index e82e042..30c248e 100644 --- a/internal/feed/newznab.go +++ b/internal/feed/newznab.go @@ -133,7 +133,7 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) { j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID) } - j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feed.Channel.Items)) + j.Log.Debug().Msgf("refreshing feed: %s, found (%d) items", j.Name, len(feed.Channel.Items)) items := make([]newznab.FeedItem, 0) if len(feed.Channel.Items) == 0 { @@ -146,15 +146,16 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) { for _, i := range feed.Channel.Items { if i.GUID == "" { - j.Log.Error().Err(err).Msgf("missing GUID from feed: %s", j.Feed.Name) + j.Log.Error().Msgf("missing GUID from feed: %s", j.Feed.Name) continue } - exists, err := j.CacheRepo.Exists(j.Name, i.GUID) + exists, err := j.CacheRepo.Exists(j.Feed.ID, i.GUID) if err != nil { j.Log.Error().Err(err).Msg("could not check if item exists") continue } + if exists { j.Log.Trace().Msgf("cache item exists, skipping release: %s", i.Title) continue @@ -165,7 +166,7 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) { // set ttl to 1 month ttl := time.Now().AddDate(0, 1, 0) - if err := j.CacheRepo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil { + if err := j.CacheRepo.Put(j.Feed.ID, i.GUID, []byte(i.Title), ttl); err != nil { j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache") continue } diff --git a/internal/feed/rss.go b/internal/feed/rss.go index 37bdcf4..676f1a0 100644 --- a/internal/feed/rss.go +++ b/internal/feed/rss.go @@ -6,7 +6,6 @@ package feed import ( "context" "encoding/xml" - "fmt" "net/url" "regexp" "time" @@ -227,8 +226,6 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) return } - bucketKey := fmt.Sprintf("%v+%v", j.IndexerIdentifier, j.Name) - //sort.Sort(feed) // set ttl to 1 month @@ -245,7 +242,7 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) } } - exists, err := j.CacheRepo.Exists(bucketKey, key) + exists, err := j.CacheRepo.Exists(j.Feed.ID, key) if err != nil { j.Log.Error().Err(err).Msg("could not check if item exists") continue @@ -257,7 +254,7 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) j.Log.Debug().Msgf("found new release: %s", i.Title) - if err := j.CacheRepo.Put(bucketKey, key, []byte(item.Title), ttl); err != nil { + if err := j.CacheRepo.Put(j.Feed.ID, key, []byte(item.Title), ttl); err != nil { j.Log.Error().Err(err).Str("entry", key).Msg("cache.Put: error storing item in cache") continue } diff --git a/internal/feed/service.go b/internal/feed/service.go index 38a6900..8329cbf 100644 --- a/internal/feed/service.go +++ b/internal/feed/service.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "log" - "strconv" "time" "github.com/autobrr/autobrr/internal/domain" @@ -20,6 +19,7 @@ import ( "github.com/dcarbone/zadapters/zstdlog" "github.com/mmcdole/gofeed" + "github.com/robfig/cron/v3" "github.com/rs/zerolog" ) @@ -27,7 +27,7 @@ type Service interface { FindByID(ctx context.Context, id int) (*domain.Feed, error) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) Find(ctx context.Context) ([]domain.Feed, error) - GetCacheByID(ctx context.Context, bucket string) ([]domain.FeedCacheItem, error) + GetCacheByID(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error) Store(ctx context.Context, feed *domain.Feed) error Update(ctx context.Context, feed *domain.Feed) error Test(ctx context.Context, feed *domain.Feed) error @@ -35,6 +35,7 @@ type Service interface { Delete(ctx context.Context, id int) error DeleteFeedCache(ctx context.Context, id int) error GetLastRunData(ctx context.Context, id int) (string, error) + DeleteFeedCacheStale(ctx context.Context) error Start() error } @@ -50,14 +51,14 @@ type feedInstance struct { Timeout time.Duration } +// feedKey creates a unique identifier to be used for controlling jobs in the scheduler type feedKey struct { - id int - indexer string - name string + id int } +// ToString creates a string of the unique id to be used for controlling jobs in the scheduler func (k feedKey) ToString() string { - return fmt.Sprintf("%v+%v+%v", k.id, k.indexer, k.name) + return fmt.Sprintf("feed-%d", k.id) } type service struct { @@ -93,22 +94,8 @@ func (s *service) Find(ctx context.Context) ([]domain.Feed, error) { return s.repo.Find(ctx) } -func (s *service) GetCacheByID(ctx context.Context, bucket string) ([]domain.FeedCacheItem, error) { - id, _ := strconv.Atoi(bucket) - - feed, err := s.repo.FindByID(ctx, id) - if err != nil { - s.log.Error().Err(err).Msgf("could not find feed by id: %v", id) - return nil, err - } - - data, err := s.cacheRepo.GetByBucket(ctx, feed.Name) - if err != nil { - s.log.Error().Err(err).Msg("could not get feed cache") - return nil, err - } - - return data, err +func (s *service) GetCacheByID(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error) { + return s.cacheRepo.GetByFeed(ctx, feedId) } func (s *service) Store(ctx context.Context, feed *domain.Feed) error { @@ -124,18 +111,11 @@ func (s *service) Delete(ctx context.Context, id int) error { } func (s *service) DeleteFeedCache(ctx context.Context, id int) error { - feed, err := s.repo.FindByID(ctx, id) - if err != nil { - s.log.Error().Err(err).Msgf("could not find feed by id: %d", id) - return err - } + return s.cacheRepo.DeleteByFeed(ctx, id) +} - if err := s.cacheRepo.DeleteBucket(ctx, feed.Name); err != nil { - s.log.Error().Err(err).Msgf("could not clear feed cache: %d", id) - return err - } - - return nil +func (s *service) DeleteFeedCacheStale(ctx context.Context) error { + return s.cacheRepo.DeleteStale(ctx) } func (s *service) ToggleEnabled(ctx context.Context, id int, enabled bool) error { @@ -171,22 +151,16 @@ func (s *service) delete(ctx context.Context, id int) error { return err } - s.log.Debug().Msgf("stopping and removing feed: %v", f.Name) + s.log.Debug().Msgf("stopping and removing feed: %s", f.Name) - identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString() - - if err := s.stopFeedJob(identifierKey); err != nil { - s.log.Error().Err(err).Msg("error stopping rss job") + if err := s.stopFeedJob(f.ID); err != nil { + s.log.Error().Err(err).Msgf("error stopping rss job: %s id: %d", f.Name, f.ID) return err } - if err := s.repo.Delete(ctx, id); err != nil { - s.log.Error().Err(err).Msg("error deleting feed") - return err - } - - if err := s.cacheRepo.DeleteBucket(ctx, f.Name); err != nil { - s.log.Error().Err(err).Msgf("could not delete feedCache bucket by id: %v", id) + // delete feed and cascade delete feed_cache by fk + if err := s.repo.Delete(ctx, f.ID); err != nil { + s.log.Error().Err(err).Msgf("error deleting feed: %s", f.Name) return err } @@ -215,23 +189,21 @@ func (s *service) toggleEnabled(ctx context.Context, id int, enabled bool) error return err } - s.log.Debug().Msgf("feed started: %v", f.Name) - - return nil - } else { - s.log.Debug().Msgf("stopping feed: %v", f.Name) - - identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString() - - if err := s.stopFeedJob(identifierKey); err != nil { - s.log.Error().Err(err).Msg("error stopping feed job") - return err - } - - s.log.Debug().Msgf("feed stopped: %v", f.Name) + s.log.Debug().Msgf("feed started: %s", f.Name) return nil } + + s.log.Debug().Msgf("stopping feed: %s", f.Name) + + if err := s.stopFeedJob(f.ID); err != nil { + s.log.Error().Err(err).Msg("error stopping feed job") + return err + } + + s.log.Debug().Msgf("feed stopped: %s", f.Name) + + return nil } return nil @@ -274,7 +246,7 @@ func (s *service) testRSS(ctx context.Context, feed *domain.Feed) error { return errors.Wrap(err, "error fetching rss feed items") } - s.log.Info().Msgf("refreshing rss feed: %v, found (%d) items", feed.Name, len(f.Items)) + s.log.Info().Msgf("refreshing rss feed: %s, found (%d) items", feed.Name, len(f.Items)) return nil } @@ -289,7 +261,7 @@ func (s *service) testTorznab(ctx context.Context, feed *domain.Feed, subLogger return err } - s.log.Info().Msgf("refreshing torznab feed: %v, found (%d) items", feed.Name, len(items.Channel.Items)) + s.log.Info().Msgf("refreshing torznab feed: %s, found (%d) items", feed.Name, len(items.Channel.Items)) return nil } @@ -304,13 +276,18 @@ func (s *service) testNewznab(ctx context.Context, feed *domain.Feed, subLogger return err } - s.log.Info().Msgf("refreshing newznab feed: %v, found (%d) items", feed.Name, len(items.Channel.Items)) + s.log.Info().Msgf("refreshing newznab feed: %s, found (%d) items", feed.Name, len(items.Channel.Items)) return nil } func (s *service) start() error { - // get all torznab indexer definitions + // always run feed cache maintenance job + if err := s.createCleanupJob(); err != nil { + s.log.Error().Err(err).Msg("could not start feed cache cleanup job") + } + + // get all feeds feeds, err := s.repo.Find(context.TODO()) if err != nil { s.log.Error().Err(err).Msg("error finding feeds") @@ -329,12 +306,10 @@ func (s *service) start() error { } func (s *service) restartJob(f *domain.Feed) error { - s.log.Debug().Msgf("stopping feed: %v", f.Name) - - identifierKey := feedKey{f.ID, f.Indexer, f.Name}.ToString() + s.log.Debug().Msgf("stopping feed: %s", f.Name) // stop feed job - if err := s.stopFeedJob(identifierKey); err != nil { + if err := s.stopFeedJob(f.ID); err != nil { s.log.Error().Err(err).Msg("error stopping feed job") return err } @@ -345,21 +320,21 @@ func (s *service) restartJob(f *domain.Feed) error { return err } - s.log.Debug().Msgf("restarted feed: %v", f.Name) + s.log.Debug().Msgf("restarted feed: %s", f.Name) } return nil } func (s *service) startJob(f *domain.Feed) error { - // get all torznab indexer definitions + // if it's not enabled we should not start it if !f.Enabled { return nil } // get torznab_url from settings if f.URL == "" { - return errors.New("no URL provided for feed: %v", f.Name) + return errors.New("no URL provided for feed: %s", f.Name) } // cron schedule to run every X minutes @@ -374,32 +349,49 @@ func (s *service) startJob(f *domain.Feed) error { Timeout: time.Duration(f.Timeout) * time.Second, } + var err error + var job cron.Job + switch fi.Implementation { case string(domain.FeedTypeTorznab): - if err := s.addTorznabJob(fi); err != nil { - s.log.Error().Err(err).Msg("failed to initialize torznab feed") - return err - } + job, err = s.createTorznabJob(fi) case string(domain.FeedTypeNewznab): - if err := s.addNewznabJob(fi); err != nil { - s.log.Error().Err(err).Msg("failed to initialize newznab feed") - return err - } + job, err = s.createNewznabJob(fi) case string(domain.FeedTypeRSS): - if err := s.addRSSJob(fi); err != nil { - s.log.Error().Err(err).Msg("failed to initialize rss feed") - return err - } + job, err = s.createRSSJob(fi) + + default: + return errors.New("unsupported feed type: %s", fi.Implementation) } + if err != nil { + s.log.Error().Err(err).Msgf("failed to initialize %s feed", fi.Implementation) + return err + } + + identifierKey := feedKey{f.ID}.ToString() + + // schedule job + id, err := s.scheduler.ScheduleJob(job, fi.CronSchedule, identifierKey) + if err != nil { + return errors.Wrap(err, "add job %s failed", identifierKey) + } + + // add to job map + s.jobs[identifierKey] = id + + s.log.Debug().Msgf("successfully started feed: %s", f.Name) + return nil } -func (s *service) addTorznabJob(f feedInstance) error { +func (s *service) createTorznabJob(f feedInstance) (cron.Job, error) { + s.log.Debug().Msgf("create torznab job: %s", f.Name) + if f.URL == "" { - return errors.New("torznab feed requires URL") + return nil, errors.New("torznab feed requires URL") } //if f.CronSchedule < 5*time.Minute { @@ -410,62 +402,38 @@ func (s *service) addTorznabJob(f feedInstance) error { l := s.log.With().Str("feed", f.Name).Logger() // setup torznab Client - c := torznab.NewClient(torznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout}) + client := torznab.NewClient(torznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout}) // create job - job := NewTorznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, c, s.repo, s.cacheRepo, s.releaseSvc) + job := NewTorznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, client, s.repo, s.cacheRepo, s.releaseSvc) - identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString() - - // schedule job - id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey) - if err != nil { - return errors.Wrap(err, "feed.AddTorznabJob: add job failed") - } - job.JobID = id - - // add to job map - s.jobs[identifierKey] = id - - s.log.Debug().Msgf("add torznab job: %v", f.Name) - - return nil + return job, nil } -func (s *service) addNewznabJob(f feedInstance) error { +func (s *service) createNewznabJob(f feedInstance) (cron.Job, error) { + s.log.Debug().Msgf("add newznab job: %s", f.Name) + if f.URL == "" { - return errors.New("newznab feed requires URL") + return nil, errors.New("newznab feed requires URL") } // setup logger l := s.log.With().Str("feed", f.Name).Logger() // setup newznab Client - c := newznab.NewClient(newznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout}) + client := newznab.NewClient(newznab.Config{Host: f.URL, ApiKey: f.ApiKey, Timeout: f.Timeout}) // create job - job := NewNewznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, c, s.repo, s.cacheRepo, s.releaseSvc) + job := NewNewznabJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, client, s.repo, s.cacheRepo, s.releaseSvc) - identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString() - - // schedule job - id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey) - if err != nil { - return errors.Wrap(err, "feed.AddNewznabJob: add job failed") - } - job.JobID = id - - // add to job map - s.jobs[identifierKey] = id - - s.log.Debug().Msgf("add newznab job: %v", f.Name) - - return nil + return job, nil } -func (s *service) addRSSJob(f feedInstance) error { +func (s *service) createRSSJob(f feedInstance) (cron.Job, error) { + s.log.Debug().Msgf("add rss job: %s", f.Name) + if f.URL == "" { - return errors.New("rss feed requires URL") + return nil, errors.New("rss feed requires URL") } //if f.CronSchedule < time.Duration(5*time.Minute) { @@ -478,36 +446,43 @@ func (s *service) addRSSJob(f feedInstance) error { // create job job := NewRSSJob(f.Feed, f.Name, f.IndexerIdentifier, l, f.URL, s.repo, s.cacheRepo, s.releaseSvc, f.Timeout) - identifierKey := feedKey{f.Feed.ID, f.Feed.Indexer, f.Feed.Name}.ToString() + return job, nil +} - // schedule job - id, err := s.scheduler.AddJob(job, f.CronSchedule, identifierKey) +func (s *service) createCleanupJob() error { + // setup logger + l := s.log.With().Str("job", "feed-cache-cleanup").Logger() + + // create job + job := NewCleanupJob(l, s.cacheRepo) + + identifierKey := "feed-cache-cleanup" + + // schedule job for every day at 03:05 + id, err := s.scheduler.AddJob(job, "5 3 * * *", identifierKey) if err != nil { - return errors.Wrap(err, "feed.AddRSSJob: add job failed") + return errors.Wrap(err, "add job %s failed", identifierKey) } - job.JobID = id // add to job map s.jobs[identifierKey] = id - s.log.Debug().Msgf("add rss job: %v", f.Name) - return nil } -func (s *service) stopFeedJob(indexer string) error { +func (s *service) stopFeedJob(id int) error { // remove job from scheduler - if err := s.scheduler.RemoveJobByIdentifier(indexer); err != nil { + if err := s.scheduler.RemoveJobByIdentifier(feedKey{id}.ToString()); err != nil { return errors.Wrap(err, "stop job failed") } - s.log.Debug().Msgf("stop feed job: %v", indexer) + s.log.Debug().Msgf("stop feed job: %d", id) return nil } -func (s *service) GetNextRun(indexer string) (time.Time, error) { - return s.scheduler.GetNextRun(indexer) +func (s *service) GetNextRun(id int) (time.Time, error) { + return s.scheduler.GetNextRun(feedKey{id}.ToString()) } func (s *service) GetLastRunData(ctx context.Context, id int) (string, error) { diff --git a/internal/feed/torznab.go b/internal/feed/torznab.go index 5f832ee..103fcd8 100644 --- a/internal/feed/torznab.go +++ b/internal/feed/torznab.go @@ -214,7 +214,7 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) { continue } - exists, err := j.CacheRepo.Exists(j.Name, i.GUID) + exists, err := j.CacheRepo.Exists(j.Feed.ID, i.GUID) if err != nil { j.Log.Error().Err(err).Msg("could not check if item exists") continue @@ -229,7 +229,7 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) { // set ttl to 1 month ttl := time.Now().AddDate(0, 1, 0) - if err := j.CacheRepo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil { + if err := j.CacheRepo.Put(j.Feed.ID, i.GUID, []byte(i.Title), ttl); err != nil { j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache") continue } diff --git a/internal/indexer/service.go b/internal/indexer/service.go index 1d1e91b..829ea69 100644 --- a/internal/indexer/service.go +++ b/internal/indexer/service.go @@ -76,7 +76,6 @@ func NewService(log logger.Logger, config *domain.Config, repo domain.IndexerRep } func (s *service) Store(ctx context.Context, indexer domain.Indexer) (*domain.Indexer, error) { - // if indexer is rss or torznab do additional cleanup for identifier switch indexer.Implementation { case "torznab", "newznab", "rss": @@ -84,18 +83,18 @@ func (s *service) Store(ctx context.Context, indexer domain.Indexer) (*domain.In cleanName := strings.ToLower(indexer.Name) // torznab-name OR rss-name - indexer.Identifier = slug.Make(fmt.Sprintf("%v-%v", indexer.Implementation, cleanName)) + indexer.Identifier = slug.Make(fmt.Sprintf("%s-%s", indexer.Implementation, cleanName)) } i, err := s.repo.Store(ctx, indexer) if err != nil { - s.log.Error().Stack().Err(err).Msgf("failed to store indexer: %v", indexer.Name) + s.log.Error().Err(err).Msgf("failed to store indexer: %s", indexer.Name) return nil, err } // add to indexerInstances if err = s.addIndexer(*i); err != nil { - s.log.Error().Stack().Err(err).Msgf("failed to add indexer: %v", indexer.Name) + s.log.Error().Err(err).Msgf("failed to add indexer: %s", indexer.Name) return nil, err } @@ -111,7 +110,7 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I // add to indexerInstances if err = s.updateIndexer(*i); err != nil { - s.log.Error().Err(err).Msgf("failed to add indexer: %v", indexer.Name) + s.log.Error().Err(err).Msgf("failed to add indexer: %s", indexer.Name) return nil, err } @@ -121,7 +120,7 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I } } - s.log.Debug().Msgf("successfully updated indexer: %v", indexer.Name) + s.log.Debug().Msgf("successfully updated indexer: %s", indexer.Name) return i, nil } @@ -133,7 +132,7 @@ func (s *service) Delete(ctx context.Context, id int) error { } if err := s.repo.Delete(ctx, id); err != nil { - s.log.Error().Err(err).Msgf("could not delete indexer by id: %v", id) + s.log.Error().Err(err).Msgf("could not delete indexer by id: %d", id) return err } @@ -150,7 +149,7 @@ func (s *service) Delete(ctx context.Context, id int) error { func (s *service) FindByFilterID(ctx context.Context, id int) ([]domain.Indexer, error) { indexers, err := s.repo.FindByFilterID(ctx, id) if err != nil { - s.log.Error().Err(err).Msgf("could not find indexers by filter id: %v", id) + s.log.Error().Err(err).Msgf("could not find indexers by filter id: %d", id) return nil, err } @@ -160,7 +159,7 @@ func (s *service) FindByFilterID(ctx context.Context, id int) ([]domain.Indexer, func (s *service) FindByID(ctx context.Context, id int) (*domain.Indexer, error) { indexers, err := s.repo.FindByID(ctx, id) if err != nil { - s.log.Error().Err(err).Msgf("could not find indexer by id: %v", id) + s.log.Error().Err(err).Msgf("could not find indexer by id: %d", id) return nil, err } @@ -340,7 +339,7 @@ func (s *service) Start() error { // check if it has api and add to api service if indexer.Enabled && indexer.HasApi() { if err := s.ApiService.AddClient(indexer.Identifier, indexer.SettingsMap); err != nil { - s.log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%v'", indexer.Identifier) + s.log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%s'", indexer.Identifier) } } } @@ -391,7 +390,7 @@ func (s *service) addIndexer(indexer domain.Indexer) error { // check if it has api and add to api service if indexerDefinition.HasApi() { if err := s.ApiService.AddClient(indexerDefinition.Identifier, indexerDefinition.SettingsMap); err != nil { - s.log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%v'", indexer.Identifier) + s.log.Error().Stack().Err(err).Msgf("indexer.start: could not init api client for: '%s'", indexer.Identifier) } } } @@ -481,18 +480,18 @@ func (s *service) LoadIndexerDefinitions() error { file := "definitions/" + f.Name() - s.log.Trace().Msgf("parsing: %v", file) + s.log.Trace().Msgf("parsing: %s", file) data, err := fs.ReadFile(Definitions, file) if err != nil { - s.log.Error().Stack().Err(err).Msgf("failed reading file: %v", file) - return errors.Wrap(err, "could not read file: %v", file) + s.log.Error().Stack().Err(err).Msgf("failed reading file: %s", file) + return errors.Wrap(err, "could not read file: %s", file) } var d domain.IndexerDefinition if err = yaml.Unmarshal(data, &d); err != nil { - s.log.Error().Stack().Err(err).Msgf("failed unmarshal file: %v", file) - return errors.Wrap(err, "could not unmarshal file: %v", file) + s.log.Error().Stack().Err(err).Msgf("failed unmarshal file: %s", file) + return errors.Wrap(err, "could not unmarshal file: %s", file) } if d.Implementation == "" { @@ -515,7 +514,7 @@ func (s *service) LoadCustomIndexerDefinitions() error { outputDirRead, err := os.Open(s.config.CustomDefinitions) if err != nil { - s.log.Warn().Stack().Msgf("failed opening custom definitions directory %q: %s", s.config.CustomDefinitions, err) + s.log.Error().Err(err).Msgf("failed opening custom definitions directory %s", s.config.CustomDefinitions) return nil } @@ -538,22 +537,22 @@ func (s *service) LoadCustomIndexerDefinitions() error { file := filepath.Join(s.config.CustomDefinitions, f.Name()) - s.log.Trace().Msgf("parsing custom: %v", file) + s.log.Trace().Msgf("parsing custom: %s", file) data, err := os.ReadFile(file) if err != nil { - s.log.Error().Stack().Err(err).Msgf("failed reading file: %v", file) - return errors.Wrap(err, "could not read file: %v", file) + s.log.Error().Stack().Err(err).Msgf("failed reading file: %s", file) + return errors.Wrap(err, "could not read file: %s", file) } var d *domain.IndexerDefinitionCustom if err = yaml.Unmarshal(data, &d); err != nil { - s.log.Error().Stack().Err(err).Msgf("failed unmarshal file: %v", file) - return errors.Wrap(err, "could not unmarshal file: %v", file) + s.log.Error().Stack().Err(err).Msgf("failed unmarshal file: %s", file) + return errors.Wrap(err, "could not unmarshal file: %s", file) } if d == nil { - s.log.Warn().Stack().Err(err).Msgf("skipping empty file: %v", file) + s.log.Warn().Stack().Err(err).Msgf("skipping empty file: %s", file) continue } @@ -563,7 +562,7 @@ func (s *service) LoadCustomIndexerDefinitions() error { // to prevent crashing from non-updated definitions lets skip if d.Implementation == "irc" && d.IRC.Parse == nil { - s.log.Warn().Msgf("DEPRECATED: indexer definition version: %v", file) + s.log.Warn().Msgf("DEPRECATED: indexer definition version: %s", file) } s.definitions[d.Identifier] = *d.ToIndexerDefinition() diff --git a/internal/scheduler/service.go b/internal/scheduler/service.go index e521be1..69cd74c 100644 --- a/internal/scheduler/service.go +++ b/internal/scheduler/service.go @@ -11,6 +11,7 @@ import ( "github.com/autobrr/autobrr/internal/logger" "github.com/autobrr/autobrr/internal/notification" "github.com/autobrr/autobrr/internal/update" + "github.com/autobrr/autobrr/pkg/errors" "github.com/robfig/cron/v3" "github.com/rs/zerolog" @@ -19,7 +20,8 @@ import ( type Service interface { Start() Stop() - AddJob(job cron.Job, interval time.Duration, identifier string) (int, error) + ScheduleJob(job cron.Job, interval time.Duration, identifier string) (int, error) + AddJob(job cron.Job, spec string, identifier string) (int, error) RemoveJobByIdentifier(id string) error GetNextRun(id string) (time.Time, error) } @@ -74,7 +76,7 @@ func (s *service) addAppJobs() { lastCheckVersion: s.version, } - if id, err := s.AddJob(checkUpdates, 2*time.Hour, "app-check-updates"); err != nil { + if id, err := s.ScheduleJob(checkUpdates, 2*time.Hour, "app-check-updates"); err != nil { s.log.Error().Err(err).Msgf("scheduler.addAppJobs: error adding job: %v", id) } } @@ -86,11 +88,27 @@ func (s *service) Stop() { return } -func (s *service) AddJob(job cron.Job, interval time.Duration, identifier string) (int, error) { +// ScheduleJob takes a time duration and adds a job +func (s *service) ScheduleJob(job cron.Job, interval time.Duration, identifier string) (int, error) { + id := s.cron.Schedule(cron.Every(interval), cron.NewChain(cron.SkipIfStillRunning(cron.DiscardLogger)).Then(job)) - id := s.cron.Schedule(cron.Every(interval), cron.NewChain( - cron.SkipIfStillRunning(cron.DiscardLogger)).Then(job), - ) + s.log.Debug().Msgf("scheduler.ScheduleJob: job successfully added: %s id %d", identifier, id) + + s.m.Lock() + // add to job map + s.jobs[identifier] = id + s.m.Unlock() + + return int(id), nil +} + +// AddJob takes a cron schedule and adds a job +func (s *service) AddJob(job cron.Job, spec string, identifier string) (int, error) { + id, err := s.cron.AddJob(spec, cron.NewChain(cron.SkipIfStillRunning(cron.DiscardLogger)).Then(job)) + + if err != nil { + return 0, errors.Wrap(err, "could not add job to cron") + } s.log.Debug().Msgf("scheduler.AddJob: job successfully added: %s id %d", identifier, id)