feat(feeds): improve caching (#1191)

* feat(feeds): improve caching

* fix(feeds): put cache if not empty

* fix(feeds): reassign loop var

* fix(feeds): enable busy_timeout again

* fix(feeds): enable busy_timeout again
This commit is contained in:
ze0s 2023-10-21 17:03:52 +02:00 committed by GitHub
parent 8c7c147328
commit 9793764905
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 22 deletions

View file

@ -175,6 +175,27 @@ func (r *FeedCacheRepo) Put(feedId int, key string, val []byte, ttl time.Time) e
return nil
}
func (r *FeedCacheRepo) PutMany(ctx context.Context, items []domain.FeedCacheItem) error {
queryBuilder := r.db.squirrel.
Insert("feed_cache").
Columns("feed_id", "key", "value", "ttl")
for _, item := range items {
queryBuilder = queryBuilder.Values(item.FeedId, item.Key, item.Value, item.TTL)
}
query, args, err := queryBuilder.ToSql()
if err != nil {
return errors.Wrap(err, "error building query")
}
if _, err = r.db.handler.ExecContext(ctx, query, args...); err != nil {
return errors.Wrap(err, "error executing query")
}
return nil
}
func (r *FeedCacheRepo) Delete(ctx context.Context, feedId int, key string) error {
queryBuilder := r.db.squirrel.
Delete("feed_cache").

View file

@ -27,9 +27,9 @@ func (db *DB) openSQLite() error {
}
// Set busy timeout
//if _, err = db.handler.Exec(`PRAGMA busy_timeout = 5000;`); err != nil {
// return errors.New("busy timeout pragma: %w", err)
//}
if _, err = db.handler.Exec(`PRAGMA busy_timeout = 5000;`); err != nil {
return errors.Wrap(err, "busy timeout pragma")
}
// Enable WAL. SQLite performs better with the WAL because it allows
// multiple readers to operate while data is being written.

View file

@ -14,6 +14,7 @@ type FeedCacheRepo interface {
GetCountByFeed(ctx context.Context, feedId int) (int, error)
Exists(feedId int, key string) (bool, error)
Put(feedId int, key string, val []byte, ttl time.Time) error
PutMany(ctx context.Context, items []FeedCacheItem) error
Delete(ctx context.Context, feedId int, key string) error
DeleteByFeed(ctx context.Context, feedId int) error
DeleteStale(ctx context.Context) error

View file

@ -144,7 +144,14 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) {
return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time)
})
toCache := make([]domain.FeedCacheItem, 0)
// set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0)
for _, i := range feed.Channel.Items {
i := i
if i.GUID == "" {
j.Log.Error().Msgf("missing GUID from feed: %s", j.Feed.Name)
continue
@ -163,18 +170,26 @@ func (j *NewznabJob) getFeed(ctx context.Context) ([]newznab.FeedItem, error) {
j.Log.Debug().Msgf("found new release: %s", i.Title)
// set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0)
if err := j.CacheRepo.Put(j.Feed.ID, i.GUID, []byte(i.Title), ttl); err != nil {
j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache")
continue
}
toCache = append(toCache, domain.FeedCacheItem{
FeedId: strconv.Itoa(j.Feed.ID),
Key: i.GUID,
Value: []byte(i.Title),
TTL: ttl,
})
// only append if we successfully added to cache
items = append(items, *i)
}
if len(toCache) > 0 {
go func(items []domain.FeedCacheItem) {
ctx := context.Background()
if err := j.CacheRepo.PutMany(ctx, items); err != nil {
j.Log.Error().Err(err).Msg("cache.PutMany: error storing items in cache")
}
}(toCache)
}
// send to filters
return items, nil
}

View file

@ -8,6 +8,7 @@ import (
"encoding/xml"
"net/url"
"regexp"
"strconv"
"time"
"github.com/autobrr/autobrr/internal/domain"
@ -240,6 +241,8 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error)
//sort.Sort(feed)
toCache := make([]domain.FeedCacheItem, 0)
// set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0)
@ -266,15 +269,26 @@ func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error)
j.Log.Debug().Msgf("found new release: %s", i.Title)
if err := j.CacheRepo.Put(j.Feed.ID, key, []byte(item.Title), ttl); err != nil {
j.Log.Error().Err(err).Str("entry", key).Msg("cache.Put: error storing item in cache")
continue
}
toCache = append(toCache, domain.FeedCacheItem{
FeedId: strconv.Itoa(j.Feed.ID),
Key: i.GUID,
Value: []byte(i.Title),
TTL: ttl,
})
// only append if we successfully added to cache
items = append(items, item)
}
if len(toCache) > 0 {
go func(items []domain.FeedCacheItem) {
ctx := context.Background()
if err := j.CacheRepo.PutMany(ctx, items); err != nil {
j.Log.Error().Err(err).Msg("cache.PutMany: error storing items in cache")
}
}(toCache)
}
// send to filters
return
}

View file

@ -208,9 +208,16 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) {
return feed.Channel.Items[i].PubDate.After(feed.Channel.Items[j].PubDate.Time)
})
toCache := make([]domain.FeedCacheItem, 0)
// set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0)
for _, i := range feed.Channel.Items {
i := i
if i.GUID == "" {
j.Log.Error().Err(err).Msgf("missing GUID from feed: %s", j.Feed.Name)
j.Log.Error().Msgf("missing GUID from feed: %s", j.Feed.Name)
continue
}
@ -226,18 +233,26 @@ func (j *TorznabJob) getFeed(ctx context.Context) ([]torznab.FeedItem, error) {
j.Log.Debug().Msgf("found new release: %s", i.Title)
// set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0)
if err := j.CacheRepo.Put(j.Feed.ID, i.GUID, []byte(i.Title), ttl); err != nil {
j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache")
continue
}
toCache = append(toCache, domain.FeedCacheItem{
FeedId: strconv.Itoa(j.Feed.ID),
Key: i.GUID,
Value: []byte(i.Title),
TTL: ttl,
})
// only append if we successfully added to cache
items = append(items, *i)
}
if len(toCache) > 0 {
go func(items []domain.FeedCacheItem) {
ctx := context.Background()
if err := j.CacheRepo.PutMany(ctx, items); err != nil {
j.Log.Error().Err(err).Msg("cache.PutMany: error storing items in cache")
}
}(toCache)
}
// send to filters
return items, nil
}