From 46f6fbe5cc4d95b8d299eb4bbc222c3e12a2e566 Mon Sep 17 00:00:00 2001 From: ze0s <43699394+zze0s@users.noreply.github.com> Date: Sat, 7 Jun 2025 12:46:08 +0200 Subject: [PATCH] feat(feeds): optimize existing cache items check (#2078) * feat(feeds): optimize existing items cache check * feat(feeds): remove ttl from repo method ExistingItems * feat(feeds): add db integration test for ExistingItems * feat(feeds): improve release and filter processing * feat(feeds): fix failing test --- internal/database/feed_cache.go | 42 +++++++++++ internal/database/feed_cache_test.go | 101 +++++++++++++++++++++++++++ internal/domain/error.go | 7 +- internal/domain/feed.go | 1 + internal/feed/newznab.go | 32 +++++---- internal/feed/rss.go | 33 +++++---- internal/feed/torznab.go | 37 ++++++---- internal/release/service.go | 58 +++++++++++++-- 8 files changed, 266 insertions(+), 45 deletions(-) diff --git a/internal/database/feed_cache.go b/internal/database/feed_cache.go index 4c323ae..2b707d0 100644 --- a/internal/database/feed_cache.go +++ b/internal/database/feed_cache.go @@ -157,6 +157,48 @@ func (r *FeedCacheRepo) Exists(feedId int, key string) (bool, error) { return exists, nil } +// ExistingItems checks multiple keys in the cache for a given feed ID +// and returns a map of existing keys to their values +func (r *FeedCacheRepo) ExistingItems(ctx context.Context, feedId int, keys []string) (map[string]bool, error) { + if len(keys) == 0 { + return make(map[string]bool), nil + } + + // Build a query that returns all keys that exist in the cache + queryBuilder := r.db.squirrel. + Select("key"). + From("feed_cache"). + Where(sq.Eq{"feed_id": feedId}). + Where(sq.Eq{"key": keys}) + + query, args, err := queryBuilder.ToSql() + if err != nil { + return nil, errors.Wrap(err, "error building query") + } + + rows, err := r.db.handler.QueryContext(ctx, query, args...) + if err != nil { + return nil, errors.Wrap(err, "error executing query") + } + defer rows.Close() + + result := make(map[string]bool) + + for rows.Next() { + var key string + if err := rows.Scan(&key); err != nil { + return nil, errors.Wrap(err, "error scanning row") + } + result[key] = true + } + + if err := rows.Err(); err != nil { + return nil, errors.Wrap(err, "row error") + } + + return result, nil +} + func (r *FeedCacheRepo) Put(feedId int, key string, val []byte, ttl time.Time) error { queryBuilder := r.db.squirrel. Insert("feed_cache"). diff --git a/internal/database/feed_cache_test.go b/internal/database/feed_cache_test.go index ba9948b..c7518bf 100644 --- a/internal/database/feed_cache_test.go +++ b/internal/database/feed_cache_test.go @@ -155,6 +155,107 @@ func TestFeedCacheRepo_Exists(t *testing.T) { } } +func TestFeedCacheRepo_ExistingItems(t *testing.T) { + for dbType, db := range testDBs { + + log := setupLoggerForTest() + repo := NewFeedCacheRepo(log, db) + feedRepo := NewFeedRepo(log, db) + indexerRepo := NewIndexerRepo(log, db) + mockData := getMockFeed() + indexerMockData := getMockIndexer() + + t.Run(fmt.Sprintf("ExistingItems_SingleItem_Multi_Keys [%s]", dbType), func(t *testing.T) { + // Setup + indexer, err := indexerRepo.Store(t.Context(), indexerMockData) + assert.NoError(t, err) + mockData.IndexerID = int(indexer.ID) + + err = feedRepo.Store(t.Context(), mockData) + assert.NoError(t, err) + + err = repo.Put(mockData.ID, "test_key", []byte("test_value"), time.Now().Add(time.Hour)) + assert.NoError(t, err) + + keys := []string{"test_key", "test_key_2"} + + // Execute + items, err := repo.ExistingItems(t.Context(), mockData.ID, keys) + assert.NoError(t, err) + assert.Len(t, items, 1) + //assert.True(t, exists) + + // Cleanup + _ = feedRepo.Delete(t.Context(), mockData.ID) + _ = indexerRepo.Delete(t.Context(), int(indexer.ID)) + _ = repo.Delete(t.Context(), mockData.ID, "test_key") + }) + + t.Run(fmt.Sprintf("ExistingItems_MultipleItems [%s]", dbType), func(t *testing.T) { + // Setup + indexer, err := indexerRepo.Store(t.Context(), indexerMockData) + assert.NoError(t, err) + mockData.IndexerID = int(indexer.ID) + + err = feedRepo.Store(t.Context(), mockData) + assert.NoError(t, err) + + err = repo.Put(mockData.ID, "test_key", []byte("test_value"), time.Now().Add(time.Hour)) + assert.NoError(t, err) + + err = repo.Put(mockData.ID, "test_key_2", []byte("test_value_2"), time.Now().Add(time.Hour)) + assert.NoError(t, err) + + keys := []string{"test_key", "test_key_2"} + + // Execute + items, err := repo.ExistingItems(t.Context(), mockData.ID, keys) + assert.NoError(t, err) + assert.Len(t, items, 2) + + // Cleanup + _ = feedRepo.Delete(t.Context(), mockData.ID) + _ = indexerRepo.Delete(t.Context(), int(indexer.ID)) + _ = repo.Delete(t.Context(), mockData.ID, "test_key") + }) + + t.Run(fmt.Sprintf("ExistingItems_MultipleItems_Single_Key [%s]", dbType), func(t *testing.T) { + // Setup + indexer, err := indexerRepo.Store(t.Context(), indexerMockData) + assert.NoError(t, err) + mockData.IndexerID = int(indexer.ID) + + err = feedRepo.Store(t.Context(), mockData) + assert.NoError(t, err) + + err = repo.Put(mockData.ID, "test_key", []byte("test_value"), time.Now().Add(time.Hour)) + assert.NoError(t, err) + + err = repo.Put(mockData.ID, "test_key_2", []byte("test_value_2"), time.Now().Add(time.Hour)) + assert.NoError(t, err) + + keys := []string{"test_key"} + + // Execute + items, err := repo.ExistingItems(t.Context(), mockData.ID, keys) + assert.NoError(t, err) + assert.Len(t, items, 1) + + // Cleanup + _ = feedRepo.Delete(t.Context(), mockData.ID) + _ = indexerRepo.Delete(t.Context(), int(indexer.ID)) + _ = repo.Delete(t.Context(), mockData.ID, "test_key") + }) + + t.Run(fmt.Sprintf("ExistsItems_Nonexistent_Key [%s]", dbType), func(t *testing.T) { + // Execute + exists, err := repo.Exists(-1, "nonexistent_key") + assert.NoError(t, err) + assert.False(t, exists) + }) + } +} + func TestFeedCacheRepo_Put(t *testing.T) { for dbType, db := range testDBs { log := setupLoggerForTest() diff --git a/internal/domain/error.go b/internal/domain/error.go index 9846f31..d8d3245 100644 --- a/internal/domain/error.go +++ b/internal/domain/error.go @@ -10,7 +10,8 @@ import ( ) var ( - ErrRecordNotFound = sql.ErrNoRows - ErrUpdateFailed = errors.New("update failed") - ErrDeleteFailed = errors.New("delete failed") + ErrRecordNotFound = sql.ErrNoRows + ErrUpdateFailed = errors.New("update failed") + ErrDeleteFailed = errors.New("delete failed") + ErrNoActiveFiltersFoundForIndexer = errors.New("no active filters found for indexer") ) diff --git a/internal/domain/feed.go b/internal/domain/feed.go index d829844..acb71d0 100644 --- a/internal/domain/feed.go +++ b/internal/domain/feed.go @@ -13,6 +13,7 @@ type FeedCacheRepo interface { GetByFeed(ctx context.Context, feedId int) ([]FeedCacheItem, error) GetCountByFeed(ctx context.Context, feedId int) (int, error) Exists(feedId int, key string) (bool, error) + ExistingItems(ctx context.Context, feedId int, keys []string) (map[string]bool, error) Put(feedId int, key string, val []byte, ttl time.Time) error PutMany(ctx context.Context, items []FeedCacheItem) error Delete(ctx context.Context, feedId int, key string) error diff --git a/internal/feed/newznab.go b/internal/feed/newznab.go index 3be8aa6..68ebae9 100644 --- a/internal/feed/newznab.go +++ b/internal/feed/newznab.go @@ -124,7 +124,7 @@ func (j *NewznabJob) process(ctx context.Context) error { } // process all new releases - go j.ReleaseSvc.ProcessMultiple(releases) + go j.ReleaseSvc.ProcessMultipleFromIndexer(releases, j.Feed.Indexer) return nil } @@ -164,10 +164,9 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) { return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time) }) - toCache := make([]domain.FeedCacheItem, 0) - - // set ttl to 1 month - ttl := time.Now().AddDate(0, 1, 0) + // Collect all valid GUIDs first + guidItemMap := make(map[string]*newznab.FeedItem) + var guids []string for _, item := range feed.Channel.Items { if item.GUID == "" { @@ -175,13 +174,22 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) { continue } - exists, err := j.CacheRepo.Exists(j.Feed.ID, item.GUID) - if err != nil { - j.Log.Error().Err(err).Msg("could not check if item exists") - continue - } + guidItemMap[item.GUID] = item + guids = append(guids, item.GUID) + } - if exists { + existingGuids, err := j.CacheRepo.ExistingItems(ctx, j.Feed.ID, guids) + if err != nil { + j.Log.Error().Err(err).Msg("could not get existing items from cache") + return nil, errors.Wrap(err, "could not get existing items from cache") + } + + // set ttl to 1 month + ttl := time.Now().AddDate(0, 1, 0) + toCache := make([]domain.FeedCacheItem, 0) + + for guid, item := range guidItemMap { + if existingGuids[guid] { j.Log.Trace().Msgf("cache item exists, skipping release: %s", item.Title) continue } @@ -190,7 +198,7 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) { toCache = append(toCache, domain.FeedCacheItem{ FeedId: strconv.Itoa(j.Feed.ID), - Key: item.GUID, + Key: guid, Value: []byte(item.Title), TTL: ttl, }) diff --git a/internal/feed/rss.go b/internal/feed/rss.go index 8f5552d..397d20d 100644 --- a/internal/feed/rss.go +++ b/internal/feed/rss.go @@ -105,7 +105,7 @@ func (j *RSSJob) process(ctx context.Context) error { } // process all new releases - go j.ReleaseSvc.ProcessMultiple(releases) + go j.ReleaseSvc.ProcessMultipleFromIndexer(releases, j.Feed.Indexer) return nil } @@ -280,11 +280,8 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) } //sort.Sort(feed) - - toCache := make([]domain.FeedCacheItem, 0) - - // set ttl to 1 month - ttl := time.Now().AddDate(0, 1, 0) + guidItemMap := make(map[string]*gofeed.Item) + var guids []string for _, item := range feed.Items { key := item.GUID @@ -295,12 +292,22 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) } } - exists, err := j.CacheRepo.Exists(j.Feed.ID, key) - if err != nil { - j.Log.Error().Err(err).Msg("could not check if item exists") - continue - } - if exists { + guidItemMap[key] = item + guids = append(guids, key) + } + + existingGuids, err := j.CacheRepo.ExistingItems(ctx, j.Feed.ID, guids) + if err != nil { + j.Log.Error().Err(err).Msgf("error getting existing items from cache") + return + } + + // set ttl to 1 month + ttl := time.Now().AddDate(0, 1, 0) + toCache := make([]domain.FeedCacheItem, 0) + + for guid, item := range guidItemMap { + if existingGuids[guid] { j.Log.Trace().Msgf("cache item exists, skipping release: %s", item.Title) continue } @@ -309,7 +316,7 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) toCache = append(toCache, domain.FeedCacheItem{ FeedId: strconv.Itoa(j.Feed.ID), - Key: key, + Key: guid, Value: []byte(item.Title), TTL: ttl, }) diff --git a/internal/feed/torznab.go b/internal/feed/torznab.go index b5c34ab..fc1142c 100644 --- a/internal/feed/torznab.go +++ b/internal/feed/torznab.go @@ -158,7 +158,7 @@ func (j *TorznabJob) process(ctx context.Context) error { } // process all new releases - go j.ReleaseSvc.ProcessMultiple(releases) + go j.ReleaseSvc.ProcessMultipleFromIndexer(releases, j.Feed.Indexer) return nil } @@ -259,10 +259,9 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) { return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time) }) - toCache := make([]domain.FeedCacheItem, 0) - - // set ttl to 1 month - ttl := time.Now().AddDate(0, 1, 0) + // Collect all valid GUIDs first + guidItemMap := make(map[string]*torznab.FeedItem) + var guids []string for _, item := range feed.Channel.Items { if item.GUID == "" { @@ -270,12 +269,24 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) { continue } - exists, err := j.CacheRepo.Exists(j.Feed.ID, item.GUID) - if err != nil { - j.Log.Error().Err(err).Msg("could not check if item exists") - continue - } - if exists { + guidItemMap[item.GUID] = item + guids = append(guids, item.GUID) + } + + // Batch check which GUIDs already exist in the cache + existingGuids, err := j.CacheRepo.ExistingItems(ctx, j.Feed.ID, guids) + if err != nil { + j.Log.Error().Err(err).Msg("could not check existing items") + return nil, errors.Wrap(err, "could not check existing items") + } + + // set ttl to 1 month + ttl := time.Now().AddDate(0, 1, 0) + toCache := make([]domain.FeedCacheItem, 0) + + // Process items that don't exist in the cache + for guid, item := range guidItemMap { + if existingGuids[guid] { j.Log.Trace().Msgf("cache item exists, skipping release: %s", item.Title) continue } @@ -284,12 +295,12 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) { toCache = append(toCache, domain.FeedCacheItem{ FeedId: strconv.Itoa(j.Feed.ID), - Key: item.GUID, + Key: guid, Value: []byte(item.Title), TTL: ttl, }) - // only append if we successfully added to cache + // Add item to result list items = append(items, *item) } diff --git a/internal/release/service.go b/internal/release/service.go index 0e8878e..d55cb53 100644 --- a/internal/release/service.go +++ b/internal/release/service.go @@ -30,6 +30,7 @@ type Service interface { Delete(ctx context.Context, req *domain.DeleteReleaseRequest) error Process(release *domain.Release) ProcessMultiple(releases []*domain.Release) + ProcessMultipleFromIndexer(releases []*domain.Release, indexer domain.IndexerMinimal) error ProcessManual(ctx context.Context, req *domain.ReleaseProcessReq) error Retry(ctx context.Context, req *domain.ReleaseActionRetryReq) error @@ -175,8 +176,6 @@ func (s *service) Process(release *domain.Release) { } }() - defer release.CleanupTemporaryFiles() - ctx := context.Background() // TODO check in config for "Save all releases" @@ -195,10 +194,23 @@ func (s *service) Process(release *domain.Release) { return } - if err := s.processFilters(ctx, filters, release); err != nil { + if err := s.processRelease(ctx, release, filters); err != nil { s.log.Error().Err(err).Msgf("release.Process: error processing filters for indexer: %s", release.Indexer.Name) return } + + return +} + +func (s *service) processRelease(ctx context.Context, release *domain.Release, filters []*domain.Filter) error { + defer release.CleanupTemporaryFiles() + + if err := s.processFilters(ctx, filters, release); err != nil { + s.log.Error().Err(err).Msgf("release.Process: error processing filters for indexer: %s", release.Indexer.Name) + return err + } + + return nil } func (s *service) processFilters(ctx context.Context, filters []*domain.Filter, release *domain.Release) error { @@ -340,7 +352,6 @@ func (s *service) ProcessMultiple(releases []*domain.Release) { s.log.Debug().Msgf("process (%d) new releases from feed", len(releases)) for _, rls := range releases { - rls := rls if rls == nil { continue } @@ -348,6 +359,45 @@ func (s *service) ProcessMultiple(releases []*domain.Release) { } } +func (s *service) ProcessMultipleFromIndexer(releases []*domain.Release, indexer domain.IndexerMinimal) error { + s.log.Debug().Msgf("process (%d) new releases from feed %s", len(releases), indexer.Name) + + defer func() { + if r := recover(); r != nil { + s.log.Error().Msgf("recovering from panic in release process %s error: %v", "", r) + //err := errors.New("panic in release process: %s", release.TorrentName) + return + } + }() + + ctx := context.Background() + + // get filters by priority + filters, err := s.filterSvc.FindByIndexerIdentifier(ctx, indexer.Identifier) + if err != nil { + s.log.Error().Err(err).Msgf("release.Process: error finding filters for indexer: %s", indexer.Name) + return err + } + + if len(filters) == 0 { + s.log.Warn().Msgf("no active filters found for indexer: %s skipping rest..", indexer.Name) + return domain.ErrNoActiveFiltersFoundForIndexer + } + + for _, release := range releases { + if release == nil { + continue + } + + if err := s.processRelease(ctx, release, filters); err != nil { + s.log.Error().Err(err).Msgf("release.ProcessMultipleFromIndexer: error processing filters for indexer: %s", indexer.Name) + return nil + } + } + + return nil +} + func (s *service) runAction(ctx context.Context, action *domain.Action, release *domain.Release, status *domain.ReleaseActionStatus) (*domain.ReleaseActionStatus, error) { // add action status as pending //status := domain.NewReleaseActionStatus(action, release)