diff --git a/internal/database/feed_cache.go b/internal/database/feed_cache.go index cd7aefa..4a5d4f2 100644 --- a/internal/database/feed_cache.go +++ b/internal/database/feed_cache.go @@ -1,6 +1,7 @@ package database import ( + "context" "database/sql" "time" @@ -18,7 +19,7 @@ type FeedCacheRepo struct { func NewFeedCacheRepo(log logger.Logger, db *DB) domain.FeedCacheRepo { return &FeedCacheRepo{ - log: log.With().Str("repo", "feed_cache").Logger(), + log: log.With().Str("module", "database").Str("repo", "feed_cache").Logger(), db: db, } } @@ -95,7 +96,48 @@ func (r *FeedCacheRepo) Put(bucket string, key string, val []byte, ttl time.Time return nil } -func (r *FeedCacheRepo) Delete(bucket string, key string) error { - //TODO implement me - panic("implement me") +func (r *FeedCacheRepo) Delete(ctx context.Context, bucket string, key string) error { + queryBuilder := r.db.squirrel. + Delete("feed_cache"). + Where("bucket = ?", bucket). + Where("key = ?", key) + + query, args, err := queryBuilder.ToSql() + if err != nil { + return errors.Wrap(err, "error building query") + } + + _, err = r.db.handler.ExecContext(ctx, query, args...) + if err != nil { + return errors.Wrap(err, "error executing query") + } + + return nil +} + +func (r *FeedCacheRepo) DeleteBucket(ctx context.Context, bucket string) error { + queryBuilder := r.db.squirrel. + Delete("feed_cache"). + Where("bucket = ?", bucket) + + query, args, err := queryBuilder.ToSql() + if err != nil { + return errors.Wrap(err, "error building query") + } + + result, err := r.db.handler.ExecContext(ctx, query, args...) + if err != nil { + return errors.Wrap(err, "error executing query") + } + + rows, err := result.RowsAffected() + if err != nil { + return errors.Wrap(err, "error exec result") + } + + if rows == 0 { + return errors.Wrap(err, "error no rows affected") + } + + return nil } diff --git a/internal/domain/feed.go b/internal/domain/feed.go index 84f06f3..e255a26 100644 --- a/internal/domain/feed.go +++ b/internal/domain/feed.go @@ -9,7 +9,8 @@ type FeedCacheRepo interface { Get(bucket string, key string) ([]byte, error) Exists(bucket string, key string) (bool, error) Put(bucket string, key string, val []byte, ttl time.Time) error - Delete(bucket string, key string) error + Delete(ctx context.Context, bucket string, key string) error + DeleteBucket(ctx context.Context, bucket string) error } type FeedRepo interface { diff --git a/internal/feed/service.go b/internal/feed/service.go index 3db4e8b..42701ac 100644 --- a/internal/feed/service.go +++ b/internal/feed/service.go @@ -92,28 +92,33 @@ func (s *service) Store(ctx context.Context, feed *domain.Feed) error { return err } + s.log.Debug().Msgf("successfully added feed: %+v", feed) + return nil } func (s *service) Update(ctx context.Context, feed *domain.Feed) error { - if err := s.repo.Update(ctx, feed); err != nil { + if err := s.update(ctx, feed); err != nil { s.log.Error().Err(err).Msgf("could not update feed: %+v", feed) return err } + + s.log.Debug().Msgf("successfully updated feed: %+v", feed) + return nil } func (s *service) Delete(ctx context.Context, id int) error { - if err := s.repo.Delete(ctx, id); err != nil { + if err := s.delete(ctx, id); err != nil { s.log.Error().Err(err).Msgf("could not delete feed by id: %v", id) return err } + return nil } func (s *service) ToggleEnabled(ctx context.Context, id int, enabled bool) error { - err := s.repo.ToggleEnabled(ctx, id, enabled) - if err != nil { + if err := s.toggleEnabled(ctx, id, enabled); err != nil { s.log.Error().Err(err).Msgf("could not toggle feed by id: %v", id) return err } @@ -137,17 +142,22 @@ func (s *service) update(ctx context.Context, feed *domain.Feed) error { func (s *service) delete(ctx context.Context, id int) error { f, err := s.repo.FindByID(ctx, id) if err != nil { - s.log.Error().Err(err).Msg("feed.ToggleEnabled: error finding feed") + s.log.Error().Err(err).Msg("error finding feed") return err } if err := s.stopTorznabJob(f.Indexer); err != nil { - s.log.Error().Err(err).Msg("feed.Delete: error stopping torznab job") + s.log.Error().Err(err).Msg("error stopping torznab job") return err } if err := s.repo.Delete(ctx, id); err != nil { - s.log.Error().Err(err).Msg("feed.Delete: error deleting feed") + s.log.Error().Err(err).Msg("error deleting feed") + return err + } + + if err := s.cacheRepo.DeleteBucket(ctx, f.Name); err != nil { + s.log.Error().Err(err).Msgf("could not delete feedCache bucket by id: %v", id) return err } @@ -273,7 +283,7 @@ func (s *service) addTorznabJob(f feedInstance) error { } // setup logger - l := s.log.With().Str("feed_name", f.Name).Logger() + l := s.log.With().Str("feed", f.Name).Logger() // setup torznab Client c := torznab.NewClient(f.URL, f.ApiKey) diff --git a/internal/feed/torznab.go b/internal/feed/torznab.go index 289f4c9..f75a544 100644 --- a/internal/feed/torznab.go +++ b/internal/feed/torznab.go @@ -55,16 +55,16 @@ func (j *TorznabJob) process() error { // get feed items, err := j.getFeed() if err != nil { - j.Log.Error().Err(err).Msgf("torznab.process: error fetching feed items") - return errors.Wrap(err, "torznab.process: error getting feed items") + j.Log.Error().Err(err).Msgf("error fetching feed items") + return errors.Wrap(err, "error getting feed items") } + j.Log.Debug().Msgf("found (%d) new items to process", len(items)) + if len(items) == 0 { return nil } - j.Log.Debug().Msgf("torznab.process: refreshing feed: %v, found (%d) new items to check", j.Name, len(items)) - releases := make([]*domain.Release, 0) for _, item := range items { @@ -93,11 +93,11 @@ func (j *TorznabJob) getFeed() ([]torznab.FeedItem, error) { // get feed feedItems, err := j.Client.GetFeed() if err != nil { - j.Log.Error().Err(err).Msgf("torznab.getFeed: error fetching feed items") + j.Log.Error().Err(err).Msgf("error fetching feed items") return nil, errors.Wrap(err, "error fetching feed items") } - j.Log.Trace().Msgf("torznab getFeed: refreshing feed: %v, found (%d) items", j.Name, len(feedItems)) + j.Log.Debug().Msgf("refreshing feed: %v, found (%d) items", j.Name, len(feedItems)) items := make([]torznab.FeedItem, 0) if len(feedItems) == 0 { @@ -113,27 +113,26 @@ func (j *TorznabJob) getFeed() ([]torznab.FeedItem, error) { continue } - //if cacheValue, err := j.Repo.Get(j.Name, i.GUID); err == nil { - // j.Log.Trace().Msgf("torznab getFeed: cacheValue: %v", cacheValue) - //} - - if exists, err := j.Repo.Exists(j.Name, i.GUID); err == nil { - if exists { - j.Log.Trace().Msg("torznab getFeed: cache item exists, skip") - continue - } + exists, err := j.Repo.Exists(j.Name, i.GUID) + if err != nil { + j.Log.Error().Err(err).Msg("could not check if item exists") + continue + } + if exists { + j.Log.Trace().Msgf("cache item exists, skipping release: %v", i.Title) + continue } - - // do something more - - items = append(items, i) // set ttl to 1 month ttl := time.Now().AddDate(0, 1, 0) if err := j.Repo.Put(j.Name, i.GUID, []byte(i.Title), ttl); err != nil { - j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("torznab getFeed: cache.Put: error storing item in cache") + j.Log.Error().Stack().Err(err).Str("guid", i.GUID).Msg("cache.Put: error storing item in cache") + continue } + + // only append if we successfully added to cache + items = append(items, i) } // send to filters diff --git a/internal/indexer/service.go b/internal/indexer/service.go index 826f676..5756700 100644 --- a/internal/indexer/service.go +++ b/internal/indexer/service.go @@ -102,7 +102,7 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I // add to indexerInstances err = s.updateIndexer(*i) if err != nil { - s.log.Error().Stack().Err(err).Msgf("failed to add indexer: %v", indexer.Name) + s.log.Error().Err(err).Msgf("failed to add indexer: %v", indexer.Name) return nil, err } @@ -112,6 +112,8 @@ func (s *service) Update(ctx context.Context, indexer domain.Indexer) (*domain.I } } + s.log.Debug().Msgf("successfully updated indexer: %v", indexer.Name) + return i, nil } diff --git a/internal/release/service.go b/internal/release/service.go index 912d7ec..0251027 100644 --- a/internal/release/service.go +++ b/internal/release/service.go @@ -199,6 +199,8 @@ func (s *service) Process(release *domain.Release) { } func (s *service) ProcessMultiple(releases []*domain.Release) { + s.log.Debug().Msgf("process (%v) new releases from feed", len(releases)) + for _, rls := range releases { if rls == nil { continue