diff --git a/internal/database/feed_cache.go b/internal/database/feed_cache.go index e289ac1..52ec9d2 100644 --- a/internal/database/feed_cache.go +++ b/internal/database/feed_cache.go @@ -175,6 +175,27 @@ func (r *FeedCacheRepo) Put(feedId int, key string, val []byte, ttl time.Time) e return nil } +func (r *FeedCacheRepo) PutMany(ctx context.Context, items []domain.FeedCacheItem) error { + queryBuilder := r.db.squirrel. + Insert("feed_cache"). + Columns("feed_id", "key", "value", "ttl") + + for _, item := range items { + queryBuilder = queryBuilder.Values(item.FeedId, item.Key, item.Value, item.TTL) + } + + query, args, err := queryBuilder.ToSql() + if err != nil { + return errors.Wrap(err, "error building query") + } + + if _, err = r.db.handler.ExecContext(ctx, query, args...); err != nil { + return errors.Wrap(err, "error executing query") + } + + return nil +} + func (r *FeedCacheRepo) Delete(ctx context.Context, feedId int, key string) error { queryBuilder := r.db.squirrel. Delete("feed_cache"). diff --git a/internal/database/sqlite.go b/internal/database/sqlite.go index 18a4cb4..53fcd0a 100644 --- a/internal/database/sqlite.go +++ b/internal/database/sqlite.go @@ -27,9 +27,9 @@ func (db *DB) openSQLite() error { } // Set busy timeout - //if _, err = db.handler.Exec(`PRAGMA busy_timeout = 5000;`); err != nil { - // return errors.New("busy timeout pragma: %w", err) - //} + if _, err = db.handler.Exec(`PRAGMA busy_timeout = 5000;`); err != nil { + return errors.Wrap(err, "busy timeout pragma") + } // Enable WAL. SQLite performs better with the WAL because it allows // multiple readers to operate while data is being written. diff --git a/internal/domain/feed.go b/internal/domain/feed.go index 1b4f93c..df90bb9 100644 --- a/internal/domain/feed.go +++ b/internal/domain/feed.go @@ -14,6 +14,7 @@ type FeedCacheRepo interface { GetCountByFeed(ctx context.Context, feedId int) (int, error) Exists(feedId int, key string) (bool, error) Put(feedId int, key string, val []byte, ttl time.Time) error + PutMany(ctx context.Context, items []FeedCacheItem) error Delete(ctx context.Context, feedId int, key string) error DeleteByFeed(ctx context.Context, feedId int) error DeleteStale(ctx context.Context) error diff --git a/internal/feed/newznab.go b/internal/feed/newznab.go index 688216d..595addf 100644 --- a/internal/feed/newznab.go +++ b/internal/feed/newznab.go @@ -144,7 +144,14 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) { return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time) }) + toCache := make([]domain.FeedCacheItem, 0) + + // set ttl to 1 month + ttl := time.Now().AddDate(0, 1, 0) + for _, i := range feed.Channel.Items { + i := i + if i.GUID == "" { j.Log.Error().Msgf("missing GUID from feed: %s", j.Feed.Name) continue @@ -163,18 +170,26 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) { j.Log.Debug().Msgf("found new release: %s", i.Title) - // set ttl to 1 month - ttl := time.Now().AddDate(0, 1, 0) - - if err := j.CacheRepo.Put(j.Feed.ID, i.GUID, []byte(i.Title), ttl); err != nil { - j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache") - continue - } + toCache = append(toCache, domain.FeedCacheItem{ + FeedId: strconv.Itoa(j.Feed.ID), + Key: i.GUID, + Value: []byte(i.Title), + TTL: ttl, + }) // only append if we successfully added to cache items = append(items, *i) } + if len(toCache) > 0 { + go func(items []domain.FeedCacheItem) { + ctx := context.Background() + if err := j.CacheRepo.PutMany(ctx, items); err != nil { + j.Log.Error().Err(err).Msg("cache.PutMany: error storing items in cache") + } + }(toCache) + } + // send to filters return items, nil } diff --git a/internal/feed/rss.go b/internal/feed/rss.go index a7baaf3..b8129e6 100644 --- a/internal/feed/rss.go +++ b/internal/feed/rss.go @@ -8,6 +8,7 @@ import ( "encoding/xml" "net/url" "regexp" + "strconv" "time" "github.com/autobrr/autobrr/internal/domain" @@ -240,6 +241,8 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) //sort.Sort(feed) + toCache := make([]domain.FeedCacheItem, 0) + // set ttl to 1 month ttl := time.Now().AddDate(0, 1, 0) @@ -266,15 +269,26 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) j.Log.Debug().Msgf("found new release: %s", i.Title) - if err := j.CacheRepo.Put(j.Feed.ID, key, []byte(item.Title), ttl); err != nil { - j.Log.Error().Err(err).Str("entry", key).Msg("cache.Put: error storing item in cache") - continue - } + toCache = append(toCache, domain.FeedCacheItem{ + FeedId: strconv.Itoa(j.Feed.ID), + Key: i.GUID, + Value: []byte(i.Title), + TTL: ttl, + }) // only append if we successfully added to cache items = append(items, item) } + if len(toCache) > 0 { + go func(items []domain.FeedCacheItem) { + ctx := context.Background() + if err := j.CacheRepo.PutMany(ctx, items); err != nil { + j.Log.Error().Err(err).Msg("cache.PutMany: error storing items in cache") + } + }(toCache) + } + // send to filters return } diff --git a/internal/feed/torznab.go b/internal/feed/torznab.go index 921b2e4..524aa2a 100644 --- a/internal/feed/torznab.go +++ b/internal/feed/torznab.go @@ -208,9 +208,16 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) { return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time) }) + toCache := make([]domain.FeedCacheItem, 0) + + // set ttl to 1 month + ttl := time.Now().AddDate(0, 1, 0) + for _, i := range feed.Channel.Items { + i := i + if i.GUID == "" { - j.Log.Error().Err(err).Msgf("missing GUID from feed: %s", j.Feed.Name) + j.Log.Error().Msgf("missing GUID from feed: %s", j.Feed.Name) continue } @@ -226,18 +233,26 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) { j.Log.Debug().Msgf("found new release: %s", i.Title) - // set ttl to 1 month - ttl := time.Now().AddDate(0, 1, 0) - - if err := j.CacheRepo.Put(j.Feed.ID, i.GUID, []byte(i.Title), ttl); err != nil { - j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache") - continue - } + toCache = append(toCache, domain.FeedCacheItem{ + FeedId: strconv.Itoa(j.Feed.ID), + Key: i.GUID, + Value: []byte(i.Title), + TTL: ttl, + }) // only append if we successfully added to cache items = append(items, *i) } + if len(toCache) > 0 { + go func(items []domain.FeedCacheItem) { + ctx := context.Background() + if err := j.CacheRepo.PutMany(ctx, items); err != nil { + j.Log.Error().Err(err).Msg("cache.PutMany: error storing items in cache") + } + }(toCache) + } + // send to filters return items, nil }