From 40fe3e9f54d8b0f526c0d79d826737d61acdf91d Mon Sep 17 00:00:00 2001 From: ze0s <43699394+zze0s@users.noreply.github.com> Date: Wed, 6 Nov 2024 20:48:46 +0100 Subject: [PATCH] fix(indexers): delete feed with indexer (#1810) * fix(indexers): delete feed in one go * fix(indexers): sort imports * refactor(feeds): db methods for find --- cmd/autobrr/main.go | 4 +- internal/action/run.go | 2 +- internal/database/feed.go | 150 +++++++++++++++++++-------------- internal/database/feed_test.go | 6 +- internal/domain/event.go | 7 ++ internal/domain/feed.go | 8 +- internal/domain/indexer.go | 4 + internal/events/subscribers.go | 55 +++++++++--- internal/feed/service.go | 16 ++-- internal/indexer/service.go | 7 +- 10 files changed, 167 insertions(+), 92 deletions(-) diff --git a/cmd/autobrr/main.go b/cmd/autobrr/main.go index d7f55c5..bab486d 100644 --- a/cmd/autobrr/main.go +++ b/cmd/autobrr/main.go @@ -121,7 +121,7 @@ func main() { downloadService = releasedownload.NewDownloadService(log, releaseRepo, indexerRepo, proxyService) downloadClientService = download_client.NewService(log, downloadClientRepo) actionService = action.NewService(log, actionRepo, downloadClientService, downloadService, bus) - indexerService = indexer.NewService(log, cfg.Config, indexerRepo, releaseRepo, indexerAPIService, schedulingService) + indexerService = indexer.NewService(log, cfg.Config, bus, indexerRepo, releaseRepo, indexerAPIService, schedulingService) filterService = filter.NewService(log, filterRepo, actionService, releaseRepo, indexerAPIService, indexerService, downloadService) releaseService = release.NewService(log, releaseRepo, actionService, filterService, indexerService) ircService = irc.NewService(log, serverEvents, ircRepo, releaseService, indexerService, notificationService, proxyService) @@ -129,7 +129,7 @@ func main() { ) // register event subscribers - events.NewSubscribers(log, bus, notificationService, releaseService) + events.NewSubscribers(log, bus, feedService, notificationService, releaseService) errorChannel := make(chan error) diff --git a/internal/action/run.go b/internal/action/run.go index 51ac6ea..dd6d0c4 100644 --- a/internal/action/run.go +++ b/internal/action/run.go @@ -127,7 +127,7 @@ func (s *service) RunAction(ctx context.Context, action *domain.Action, release } // send separate event for notifications - s.bus.Publish("events:notification", &payload.Event, payload) + s.bus.Publish(domain.EventNotificationSend, &payload.Event, payload) return rejections, err } diff --git a/internal/database/feed.go b/internal/database/feed.go index 6ffd9e0..2ea7dd5 100644 --- a/internal/database/feed.go +++ b/internal/database/feed.go @@ -28,6 +28,92 @@ type FeedRepo struct { db *DB } +func (r *FeedRepo) FindOne(ctx context.Context, params domain.FindOneParams) (*domain.Feed, error) { + queryBuilder := r.db.squirrel. + Select( + "f.id", + "i.id", + "i.identifier", + "i.identifier_external", + "i.name", + "i.use_proxy", + "i.proxy_id", + "f.name", + "f.type", + "f.enabled", + "f.url", + "f.interval", + "f.timeout", + "f.max_age", + "f.api_key", + "f.cookie", + "f.settings", + "f.created_at", + "f.updated_at", + "f.indexer_id", + ). + From("feed f"). + LeftJoin("indexer i ON f.indexer_id = i.id") + + if params.FeedID != 0 { + queryBuilder = queryBuilder.Where(sq.Eq{"f.id": params.FeedID}) + } else if params.IndexerID != 0 { + queryBuilder = queryBuilder.Where(sq.Eq{"f.indexer_id": params.IndexerID}) + } else if params.IndexerIdentifier != "" { + queryBuilder = queryBuilder.Where(sq.Eq{"i.identifier": params.IndexerIdentifier}) + } else { + return nil, errors.New("invalid params") + } + + query, args, err := queryBuilder.ToSql() + if err != nil { + return nil, errors.Wrap(err, "error building query") + } + + row := r.db.handler.QueryRowContext(ctx, query, args...) + if err := row.Err(); err != nil { + return nil, errors.Wrap(err, "error executing query") + } + + var f domain.Feed + + var apiKey, cookie, settings sql.NullString + var indexerID, indexerProxyID sql.NullInt64 + var indexerIdentifier, indexerIdentifierExternal, indexerName sql.NullString + var indexerUseProxy sql.NullBool + + if err := row.Scan(&f.ID, &indexerID, &indexerIdentifier, &indexerIdentifierExternal, &indexerName, &indexerUseProxy, &indexerProxyID, &f.Name, &f.Type, &f.Enabled, &f.URL, &f.Interval, &f.Timeout, &f.MaxAge, &apiKey, &cookie, &settings, &f.CreatedAt, &f.UpdatedAt, &f.IndexerID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, domain.ErrRecordNotFound + } + + return nil, errors.Wrap(err, "error scanning row") + } + + if indexerID.Valid { + f.Indexer.ID = int(indexerID.Int64) + f.Indexer.Identifier = indexerIdentifier.String + f.Indexer.IdentifierExternal = indexerIdentifierExternal.String + f.Indexer.Name = indexerName.String + f.UseProxy = indexerUseProxy.Bool + f.ProxyID = indexerProxyID.Int64 + } + + f.ApiKey = apiKey.String + f.Cookie = cookie.String + + if settings.Valid { + var settingsJson domain.FeedSettingsJSON + if err = json.Unmarshal([]byte(settings.String), &settingsJson); err != nil { + return nil, errors.Wrap(err, "error unmarshal settings") + } + + f.Settings = &settingsJson + } + + return &f, nil +} + func (r *FeedRepo) FindByID(ctx context.Context, id int) (*domain.Feed, error) { queryBuilder := r.db.squirrel. Select( @@ -94,70 +180,6 @@ func (r *FeedRepo) FindByID(ctx context.Context, id int) (*domain.Feed, error) { return &f, nil } -func (r *FeedRepo) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) { - queryBuilder := r.db.squirrel. - Select( - "f.id", - "i.id", - "i.identifier", - "i.identifier_external", - "i.name", - "i.use_proxy", - "i.proxy_id", - "f.name", - "f.type", - "f.enabled", - "f.url", - "f.interval", - "f.timeout", - "f.max_age", - "f.api_key", - "f.cookie", - "f.settings", - "f.created_at", - "f.updated_at", - ). - From("feed f"). - Join("indexer i ON f.indexer_id = i.id"). - Where(sq.Eq{"i.name": indexer}) - - query, args, err := queryBuilder.ToSql() - if err != nil { - return nil, errors.Wrap(err, "error building query") - } - - row := r.db.handler.QueryRowContext(ctx, query, args...) - if err := row.Err(); err != nil { - return nil, errors.Wrap(err, "error executing query") - } - - var f domain.Feed - - var apiKey, cookie, settings sql.NullString - var proxyID sql.NullInt64 - - if err := row.Scan(&f.ID, &f.Indexer.ID, &f.Indexer.Identifier, &f.Indexer.IdentifierExternal, &f.Indexer.Name, &f.UseProxy, &proxyID, &f.Name, &f.Type, &f.Enabled, &f.URL, &f.Interval, &f.Timeout, &f.MaxAge, &apiKey, &cookie, &settings, &f.CreatedAt, &f.UpdatedAt); err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, domain.ErrRecordNotFound - } - - return nil, errors.Wrap(err, "error scanning row") - } - - f.ProxyID = proxyID.Int64 - f.ApiKey = apiKey.String - f.Cookie = cookie.String - - var settingsJson domain.FeedSettingsJSON - if err = json.Unmarshal([]byte(settings.String), &settingsJson); err != nil { - return nil, errors.Wrap(err, "error unmarshal settings") - } - - f.Settings = &settingsJson - - return &f, nil -} - func (r *FeedRepo) Find(ctx context.Context) ([]domain.Feed, error) { queryBuilder := r.db.squirrel. Select( diff --git a/internal/database/feed_test.go b/internal/database/feed_test.go index 521fe13..ba41b7d 100644 --- a/internal/database/feed_test.go +++ b/internal/database/feed_test.go @@ -205,7 +205,7 @@ func TestFeedRepo_FindByID(t *testing.T) { } } -func TestFeedRepo_FindByIndexerIdentifier(t *testing.T) { +func TestFeedRepo_FindOne(t *testing.T) { for dbType, db := range testDBs { log := setupLoggerForTest() repo := NewFeedRepo(log, db) @@ -222,7 +222,7 @@ func TestFeedRepo_FindByIndexerIdentifier(t *testing.T) { assert.NoError(t, err) // Execute - feed, err := repo.FindByIndexerIdentifier(context.Background(), indexer.Identifier) + feed, err := repo.FindOne(context.Background(), domain.FindOneParams{IndexerIdentifier: indexer.Identifier}) assert.NoError(t, err) // Verify @@ -240,7 +240,7 @@ func TestFeedRepo_FindByIndexerIdentifier(t *testing.T) { t.Run(fmt.Sprintf("FindByIndexerIdentifier_Fails_Wrong_Identifier [%s]", dbType), func(t *testing.T) { // Execute - feed, err := repo.FindByIndexerIdentifier(context.Background(), "wrong-identifier") + feed, err := repo.FindOne(context.Background(), domain.FindOneParams{IndexerIdentifier: "wrong-identifier"}) assert.Error(t, err) assert.Nil(t, feed) }) diff --git a/internal/domain/event.go b/internal/domain/event.go index 13a0612..d9d3be3 100644 --- a/internal/domain/event.go +++ b/internal/domain/event.go @@ -5,6 +5,13 @@ package domain import "time" +const ( + EventReleaseStoreActionStatus = "release:store-action-status" + EventReleasePushStatus = "release:push" + EventNotificationSend = "events:notification" + EventIndexerDelete = "indexer:delete" +) + type EventsReleasePushed struct { ReleaseName string Filter string diff --git a/internal/domain/feed.go b/internal/domain/feed.go index 29a4b23..81a40a8 100644 --- a/internal/domain/feed.go +++ b/internal/domain/feed.go @@ -21,8 +21,8 @@ type FeedCacheRepo interface { } type FeedRepo interface { + FindOne(ctx context.Context, params FindOneParams) (*Feed, error) FindByID(ctx context.Context, id int) (*Feed, error) - FindByIndexerIdentifier(ctx context.Context, indexer string) (*Feed, error) Find(ctx context.Context) ([]Feed, error) GetLastRunDataByID(ctx context.Context, id int) (string, error) Store(ctx context.Context, feed *Feed) error @@ -91,3 +91,9 @@ type FeedCacheItem struct { Value []byte `json:"value"` TTL time.Time `json:"ttl"` } + +type FindOneParams struct { + FeedID int + IndexerID int + IndexerIdentifier string +} diff --git a/internal/domain/indexer.go b/internal/domain/indexer.go index 63d410d..54a08ee 100644 --- a/internal/domain/indexer.go +++ b/internal/domain/indexer.go @@ -40,6 +40,10 @@ type Indexer struct { Settings map[string]string `json:"settings,omitempty"` } +func (i Indexer) ImplementationIsFeed() bool { + return i.Implementation == "rss" || i.Implementation == "torznab" || i.Implementation == "newznab" +} + type IndexerMinimal struct { ID int `json:"id"` Name string `json:"name"` diff --git a/internal/events/subscribers.go b/internal/events/subscribers.go index 3dbae42..45af114 100644 --- a/internal/events/subscribers.go +++ b/internal/events/subscribers.go @@ -7,25 +7,30 @@ import ( "context" "github.com/autobrr/autobrr/internal/domain" + "github.com/autobrr/autobrr/internal/feed" "github.com/autobrr/autobrr/internal/logger" "github.com/autobrr/autobrr/internal/notification" "github.com/autobrr/autobrr/internal/release" + "github.com/autobrr/autobrr/pkg/errors" "github.com/asaskevich/EventBus" "github.com/rs/zerolog" ) type Subscriber struct { - log zerolog.Logger - eventbus EventBus.Bus + log zerolog.Logger + eventbus EventBus.Bus + + feedSvc feed.Service notificationSvc notification.Service releaseSvc release.Service } -func NewSubscribers(log logger.Logger, eventbus EventBus.Bus, notificationSvc notification.Service, releaseSvc release.Service) Subscriber { +func NewSubscribers(log logger.Logger, eventbus EventBus.Bus, feedSvc feed.Service, notificationSvc notification.Service, releaseSvc release.Service) Subscriber { s := Subscriber{ log: log.With().Str("module", "events").Logger(), eventbus: eventbus, + feedSvc: feedSvc, notificationSvc: notificationSvc, releaseSvc: releaseSvc, } @@ -36,13 +41,14 @@ func NewSubscribers(log logger.Logger, eventbus EventBus.Bus, notificationSvc no } func (s Subscriber) Register() { - s.eventbus.Subscribe("release:store-action-status", s.releaseActionStatus) - s.eventbus.Subscribe("release:push", s.releasePushStatus) - s.eventbus.Subscribe("events:notification", s.sendNotification) + s.eventbus.Subscribe(domain.EventReleaseStoreActionStatus, s.handleReleaseActionStatus) + s.eventbus.Subscribe(domain.EventReleasePushStatus, s.handleReleasePushStatus) + s.eventbus.Subscribe(domain.EventNotificationSend, s.handleSendNotification) + s.eventbus.Subscribe(domain.EventIndexerDelete, s.handleIndexerDelete) } -func (s Subscriber) releaseActionStatus(actionStatus *domain.ReleaseActionStatus) { - s.log.Trace().Msgf("events: 'release:store-action-status' '%+v'", actionStatus) +func (s Subscriber) handleReleaseActionStatus(actionStatus *domain.ReleaseActionStatus) { + s.log.Trace().Str("event", domain.EventReleaseStoreActionStatus).Msgf("store action status: '%+v'", actionStatus) err := s.releaseSvc.StoreReleaseActionStatus(context.Background(), actionStatus) if err != nil { @@ -50,16 +56,41 @@ func (s Subscriber) releaseActionStatus(actionStatus *domain.ReleaseActionStatus } } -func (s Subscriber) releasePushStatus(actionStatus *domain.ReleaseActionStatus) { - s.log.Trace().Msgf("events: 'release:push' '%+v'", actionStatus) +func (s Subscriber) handleReleasePushStatus(actionStatus *domain.ReleaseActionStatus) { + s.log.Trace().Str("event", domain.EventReleasePushStatus).Msgf("events: 'release:push' '%+v'", actionStatus) if err := s.releaseSvc.StoreReleaseActionStatus(context.Background(), actionStatus); err != nil { s.log.Error().Err(err).Msgf("events: 'release:push' error") } } -func (s Subscriber) sendNotification(event *domain.NotificationEvent, payload *domain.NotificationPayload) { - s.log.Trace().Msgf("events: '%v' '%+v'", *event, payload) +func (s Subscriber) handleSendNotification(event *domain.NotificationEvent, payload *domain.NotificationPayload) { + s.log.Trace().Str("event", domain.EventNotificationSend).Msgf("send notification events: '%v' '%+v'", *event, payload) s.notificationSvc.Send(*event, *payload) } + +// handleIndexerDelete handle feed cleanup via event because feed service can't be imported in indexer service +func (s Subscriber) handleIndexerDelete(indexer *domain.Indexer) { + s.log.Trace().Str("event", domain.EventIndexerDelete).Msgf("events: 'indexer:delete' '%d'", indexer.ID) + + ctx := context.Background() + + if indexer.ImplementationIsFeed() { + feedItem, err := s.feedSvc.FindOne(ctx, domain.FindOneParams{IndexerID: int(indexer.ID)}) + if err != nil { + if errors.Is(err, domain.ErrRecordNotFound) { + return + } + + s.log.Error().Err(err).Msgf("events: 'indexer:delete' error, could not find feed with indexer id: %d", indexer.ID) + return + } + + if err := s.feedSvc.Delete(ctx, feedItem.ID); err != nil { + s.log.Error().Err(err).Msgf("events: 'indexer:delete' error, could not delete feed with id: %d", feedItem.ID) + } + + s.log.Debug().Msgf("successfully removed feed: %s", feedItem.Name) + } +} diff --git a/internal/feed/service.go b/internal/feed/service.go index 1f819c2..1236851 100644 --- a/internal/feed/service.go +++ b/internal/feed/service.go @@ -24,8 +24,8 @@ import ( ) type Service interface { + FindOne(ctx context.Context, params domain.FindOneParams) (*domain.Feed, error) FindByID(ctx context.Context, id int) (*domain.Feed, error) - FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) Find(ctx context.Context) ([]domain.Feed, error) GetCacheByID(ctx context.Context, feedId int) ([]domain.FeedCacheItem, error) Store(ctx context.Context, feed *domain.Feed) error @@ -85,12 +85,12 @@ func NewService(log logger.Logger, repo domain.FeedRepo, cacheRepo domain.FeedCa } } -func (s *service) FindByID(ctx context.Context, id int) (*domain.Feed, error) { - return s.repo.FindByID(ctx, id) +func (s *service) FindOne(ctx context.Context, params domain.FindOneParams) (*domain.Feed, error) { + return s.repo.FindOne(ctx, params) } -func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) (*domain.Feed, error) { - return s.repo.FindByIndexerIdentifier(ctx, indexer) +func (s *service) FindByID(ctx context.Context, id int) (*domain.Feed, error) { + return s.repo.FindByID(ctx, id) } func (s *service) Find(ctx context.Context) ([]domain.Feed, error) { @@ -154,7 +154,7 @@ func (s *service) update(ctx context.Context, feed *domain.Feed) error { } // get Feed again for ProxyID and UseProxy to be correctly populated - feed, err := s.repo.FindByID(ctx, feed.ID) + feed, err := s.repo.FindOne(ctx, domain.FindOneParams{FeedID: feed.ID}) if err != nil { s.log.Error().Err(err).Msg("error finding feed") return err @@ -169,7 +169,7 @@ func (s *service) update(ctx context.Context, feed *domain.Feed) error { } func (s *service) delete(ctx context.Context, id int) error { - f, err := s.repo.FindByID(ctx, id) + f, err := s.repo.FindOne(ctx, domain.FindOneParams{FeedID: id}) if err != nil { s.log.Error().Err(err).Msg("error finding feed") return err @@ -192,7 +192,7 @@ func (s *service) delete(ctx context.Context, id int) error { } func (s *service) toggleEnabled(ctx context.Context, id int, enabled bool) error { - f, err := s.repo.FindByID(ctx, id) + f, err := s.repo.FindOne(ctx, domain.FindOneParams{FeedID: id}) if err != nil { s.log.Error().Err(err).Msg("error finding feed") return err diff --git a/internal/indexer/service.go b/internal/indexer/service.go index 40085b6..7526154 100644 --- a/internal/indexer/service.go +++ b/internal/indexer/service.go @@ -19,6 +19,7 @@ import ( "github.com/autobrr/autobrr/pkg/errors" "github.com/autobrr/autobrr/pkg/sanitize" + "github.com/asaskevich/EventBus" "github.com/gosimple/slug" "github.com/rs/zerolog" "gopkg.in/yaml.v3" @@ -50,6 +51,7 @@ type service struct { releaseRepo domain.ReleaseRepo ApiService APIService scheduler scheduler.Service + bus EventBus.Bus // contains all raw indexer definitions definitions map[string]domain.IndexerDefinition @@ -65,7 +67,7 @@ type service struct { rssIndexers map[string]*domain.IndexerDefinition } -func NewService(log logger.Logger, config *domain.Config, repo domain.IndexerRepo, releaseRepo domain.ReleaseRepo, apiService APIService, scheduler scheduler.Service) Service { +func NewService(log logger.Logger, config *domain.Config, bus EventBus.Bus, repo domain.IndexerRepo, releaseRepo domain.ReleaseRepo, apiService APIService, scheduler scheduler.Service) Service { return &service{ log: log.With().Str("module", "indexer").Logger(), config: config, @@ -73,6 +75,7 @@ func NewService(log logger.Logger, config *domain.Config, repo domain.IndexerRep releaseRepo: releaseRepo, ApiService: apiService, scheduler: scheduler, + bus: bus, lookupIRCServerDefinition: make(map[string]map[string]*domain.IndexerDefinition), torznabIndexers: make(map[string]*domain.IndexerDefinition), newznabIndexers: make(map[string]*domain.IndexerDefinition), @@ -189,6 +192,8 @@ func (s *service) Delete(ctx context.Context, id int) error { s.log.Error().Err(err).Msgf("could not delete indexer api client: %s", indexer.Identifier) } + s.bus.Publish(domain.EventIndexerDelete, indexer) + return nil }