mirror of
https://github.com/idanoo/autobrr
synced 2025-07-22 16:29:12 +00:00
fix(feeds): torznab update and delete (#346)
* fix(feeds): torznab update and delete * fix(feeds): repo cache exists check err
This commit is contained in:
parent
72d4942104
commit
c1df9c817f
6 changed files with 90 additions and 34 deletions
|
@ -1,6 +1,7 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
|
@ -18,7 +19,7 @@ type FeedCacheRepo struct {
|
|||
|
||||
func NewFeedCacheRepo(log logger.Logger, db *DB) domain.FeedCacheRepo {
|
||||
return &FeedCacheRepo{
|
||||
log: log.With().Str("repo", "feed_cache").Logger(),
|
||||
log: log.With().Str("module", "database").Str("repo", "feed_cache").Logger(),
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
@ -95,7 +96,48 @@ func (r *FeedCacheRepo) Put(bucket string, key string, val []byte, ttl time.Time
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *FeedCacheRepo) Delete(bucket string, key string) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (r *FeedCacheRepo) Delete(ctx context.Context, bucket string, key string) error {
|
||||
queryBuilder := r.db.squirrel.
|
||||
Delete("feed_cache").
|
||||
Where("bucket = ?", bucket).
|
||||
Where("key = ?", key)
|
||||
|
||||
query, args, err := queryBuilder.ToSql()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error building query")
|
||||
}
|
||||
|
||||
_, err = r.db.handler.ExecContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error executing query")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *FeedCacheRepo) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
queryBuilder := r.db.squirrel.
|
||||
Delete("feed_cache").
|
||||
Where("bucket = ?", bucket)
|
||||
|
||||
query, args, err := queryBuilder.ToSql()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error building query")
|
||||
}
|
||||
|
||||
result, err := r.db.handler.ExecContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error executing query")
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error exec result")
|
||||
}
|
||||
|
||||
if rows == 0 {
|
||||
return errors.Wrap(err, "error no rows affected")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -9,7 +9,8 @@ type FeedCacheRepo interface {
|
|||
Get(bucket string, key string) ([]byte, error)
|
||||
Exists(bucket string, key string) (bool, error)
|
||||
Put(bucket string, key string, val []byte, ttl time.Time) error
|
||||
Delete(bucket string, key string) error
|
||||
Delete(ctx context.Context, bucket string, key string) error
|
||||
DeleteBucket(ctx context.Context, bucket string) error
|
||||
}
|
||||
|
||||
type FeedRepo interface {
|
||||
|
|
|
@ -92,28 +92,33 @@ func (s *service) Store(ctx context.Context, feed *domain.Feed) error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("successfully added feed: %+v", feed)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Update(ctx context.Context, feed *domain.Feed) error {
|
||||
if err := s.repo.Update(ctx, feed); err != nil {
|
||||
if err := s.update(ctx, feed); err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not update feed: %+v", feed)
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("successfully updated feed: %+v", feed)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Delete(ctx context.Context, id int) error {
|
||||
if err := s.repo.Delete(ctx, id); err != nil {
|
||||
if err := s.delete(ctx, id); err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not delete feed by id: %v", id)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) ToggleEnabled(ctx context.Context, id int, enabled bool) error {
|
||||
err := s.repo.ToggleEnabled(ctx, id, enabled)
|
||||
if err != nil {
|
||||
if err := s.toggleEnabled(ctx, id, enabled); err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not toggle feed by id: %v", id)
|
||||
return err
|
||||
}
|
||||
|
@ -137,17 +142,22 @@ func (s *service) update(ctx context.Context, feed *domain.Feed) error {
|
|||
func (s *service) delete(ctx context.Context, id int) error {
|
||||
f, err := s.repo.FindByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.ToggleEnabled: error finding feed")
|
||||
s.log.Error().Err(err).Msg("error finding feed")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.stopTorznabJob(f.Indexer); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.Delete: error stopping torznab job")
|
||||
s.log.Error().Err(err).Msg("error stopping torznab job")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.repo.Delete(ctx, id); err != nil {
|
||||
s.log.Error().Err(err).Msg("feed.Delete: error deleting feed")
|
||||
s.log.Error().Err(err).Msg("error deleting feed")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.cacheRepo.DeleteBucket(ctx, f.Name); err != nil {
|
||||
s.log.Error().Err(err).Msgf("could not delete feedCache bucket by id: %v", id)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -273,7 +283,7 @@ func (s *service) addTorznabJob(f feedInstance) error {
|
|||
}
|
||||
|
||||
// setup logger
|
||||
l := s.log.With().Str("feed_name", f.Name).Logger()
|
||||
l := s.log.With().Str("feed", f.Name).Logger()
|
||||
|
||||
// setup torznab Client
|
||||
c := torznab.NewClient(f.URL, f.ApiKey)
|
||||
|
|
|
@ -55,16 +55,16 @@ func (j *TorznabJob) process() error {
|
|||
// get feed
|
||||
items, err := j.getFeed()
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msgf("torznab.process: error fetching feed items")
|
||||
return errors.Wrap(err, "torznab.process: error getting feed items")
|
||||
j.Log.Error().Err(err).Msgf("error fetching feed items")
|
||||
return errors.Wrap(err, "error getting feed items")
|
||||
}
|
||||
|
||||
j.Log.Debug().Msgf("found (%d) new items to process", len(items))
|
||||
|
||||
if len(items) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
j.Log.Debug().Msgf("torznab.process: refreshing feed: %v, found (%d) new items to check", j.Name, len(items))
|
||||
|
||||
releases := make([]*domain.Release, 0)
|
||||
|
||||
for _, item := range items {
|
||||
|
@ -93,11 +93,11 @@ func (j *TorznabJob) getFeed() ([]torznab.FeedItem, error) {
|
|||
// get feed
|
||||
feedItems, err := j.Client.GetFeed()
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msgf("torznab.getFeed: error fetching feed items")
|
||||
j.Log.Error().Err(err).Msgf("error fetching feed items")
|
||||
return nil, errors.Wrap(err, "error fetching feed items")
|
||||
}
|
||||
|
||||
j.Log.Trace().Msgf("torznab getFeed: refreshing feed: %v, found (%d) items", j.Name, len(feedItems))
|
||||
j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feedItems))
|
||||
|
||||
items := make([]torznab.FeedItem, 0)
|
||||
if len(feedItems) == 0 {
|
||||
|
@ -113,27 +113,26 @@ func (j *TorznabJob) getFeed() ([]torznab.FeedItem, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
//if cacheValue, err := j.Repo.Get(j.Name, i.GUID); err == nil {
|
||||
// j.Log.Trace().Msgf("torznab getFeed: cacheValue: %v", cacheValue)
|
||||
//}
|
||||
|
||||
if exists, err := j.Repo.Exists(j.Name, i.GUID); err == nil {
|
||||
if exists {
|
||||
j.Log.Trace().Msg("torznab getFeed: cache item exists, skip")
|
||||
continue
|
||||
}
|
||||
exists, err := j.Repo.Exists(j.Name, i.GUID)
|
||||
if err != nil {
|
||||
j.Log.Error().Err(err).Msg("could not check if item exists")
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
j.Log.Trace().Msgf("cache item exists, skipping release: %v", i.Title)
|
||||
continue
|
||||
}
|
||||
|
||||
// do something more
|
||||
|
||||
items = append(items, i)
|
||||
|
||||
// set ttl to 1 month
|
||||
ttl := time.Now().AddDate(0, 1, 0)
|
||||
|
||||
if err := j.Repo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil {
|
||||
j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("torznab getFeed: cache.Put: error storing item in cache")
|
||||
j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache")
|
||||
continue
|
||||
}
|
||||
|
||||
// only append if we successfully added to cache
|
||||
items = append(items, i)
|
||||
}
|
||||
|
||||
// send to filters
|
||||
|
|
|
@ -102,7 +102,7 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I
|
|||
// add to indexerInstances
|
||||
err = s.updateIndexer(*i)
|
||||
if err != nil {
|
||||
s.log.Error().Stack().Err(err).Msgf("failed to add indexer: %v", indexer.Name)
|
||||
s.log.Error().Err(err).Msgf("failed to add indexer: %v", indexer.Name)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -112,6 +112,8 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I
|
|||
}
|
||||
}
|
||||
|
||||
s.log.Debug().Msgf("successfully updated indexer: %v", indexer.Name)
|
||||
|
||||
return i, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -199,6 +199,8 @@ func (s *service) Process(release *domain.Release) {
|
|||
}
|
||||
|
||||
func (s *service) ProcessMultiple(releases []*domain.Release) {
|
||||
s.log.Debug().Msgf("process (%v) new releases from feed", len(releases))
|
||||
|
||||
for _, rls := range releases {
|
||||
if rls == nil {
|
||||
continue
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue