mirror of
https://github.com/idanoo/autobrr
synced 2025-07-23 00:39:13 +00:00
feat(releases): replay actions (#932)
* feat(releases): replay actions * feat(releases): replay actions component * fix: update filter actions * fix: select filter_id from ras
This commit is contained in:
parent
97333d334f
commit
6898ad8315
16 changed files with 752 additions and 189 deletions
|
@ -19,6 +19,8 @@ import (
|
|||
type Service interface {
|
||||
Find(ctx context.Context, query domain.ReleaseQueryParams) (res []*domain.Release, nextCursor int64, count int64, err error)
|
||||
FindRecent(ctx context.Context) ([]*domain.Release, error)
|
||||
Get(ctx context.Context, req *domain.GetReleaseRequest) (*domain.Release, error)
|
||||
GetActionStatus(ctx context.Context, req *domain.GetReleaseActionStatusRequest) (*domain.ReleaseActionStatus, error)
|
||||
GetIndexerOptions(ctx context.Context) ([]string, error)
|
||||
Stats(ctx context.Context) (*domain.ReleaseStats, error)
|
||||
Store(ctx context.Context, release *domain.Release) error
|
||||
|
@ -27,6 +29,7 @@ type Service interface {
|
|||
|
||||
Process(release *domain.Release)
|
||||
ProcessMultiple(releases []*domain.Release)
|
||||
Retry(ctx context.Context, req *domain.ReleaseActionRetryReq) error
|
||||
}
|
||||
|
||||
type actionClientTypeKey struct {
|
||||
|
@ -59,6 +62,14 @@ func (s *service) FindRecent(ctx context.Context) (res []*domain.Release, err er
|
|||
return s.repo.FindRecent(ctx)
|
||||
}
|
||||
|
||||
func (s *service) Get(ctx context.Context, req *domain.GetReleaseRequest) (*domain.Release, error) {
|
||||
return s.repo.Get(ctx, req)
|
||||
}
|
||||
|
||||
func (s *service) GetActionStatus(ctx context.Context, req *domain.GetReleaseActionStatusRequest) (*domain.ReleaseActionStatus, error) {
|
||||
return s.repo.GetActionStatus(ctx, req)
|
||||
}
|
||||
|
||||
func (s *service) GetIndexerOptions(ctx context.Context) ([]string, error) {
|
||||
return s.repo.GetIndexerOptions(ctx)
|
||||
}
|
||||
|
@ -138,13 +149,13 @@ func (s *service) Process(release *domain.Release) {
|
|||
}
|
||||
|
||||
if !match {
|
||||
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s, no match. rejections: %s", release.Indexer, release.Filter.Name, release.TorrentName, release.RejectionsString())
|
||||
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s, no match. rejections: %s", release.Indexer, release.FilterName, release.TorrentName, release.RejectionsString())
|
||||
|
||||
l.Debug().Msgf("release rejected: %s", release.RejectionsString())
|
||||
continue
|
||||
}
|
||||
|
||||
l.Info().Msgf("Matched '%s' (%s) for %s", release.TorrentName, release.Filter.Name, release.Indexer)
|
||||
l.Info().Msgf("Matched '%s' (%s) for %s", release.TorrentName, release.FilterName, release.Indexer)
|
||||
|
||||
// save release here to only save those with rejections from actions instead of all releases
|
||||
if release.ID == 0 {
|
||||
|
@ -158,7 +169,7 @@ func (s *service) Process(release *domain.Release) {
|
|||
// sleep for the delay period specified in the filter before running actions
|
||||
delay := release.Filter.Delay
|
||||
if delay > 0 {
|
||||
l.Debug().Msgf("Delaying processing of '%s' (%s) for %s by %d seconds as specified in the filter", release.TorrentName, release.Filter.Name, release.Indexer, delay)
|
||||
l.Debug().Msgf("Delaying processing of '%s' (%s) for %s by %d seconds as specified in the filter", release.TorrentName, release.FilterName, release.Indexer, delay)
|
||||
time.Sleep(time.Duration(delay) * time.Second)
|
||||
}
|
||||
|
||||
|
@ -170,30 +181,30 @@ func (s *service) Process(release *domain.Release) {
|
|||
|
||||
// only run enabled actions
|
||||
if !act.Enabled {
|
||||
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s action '%s' not enabled, skip", release.Indexer, release.Filter.Name, release.TorrentName, act.Name)
|
||||
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s action '%s' not enabled, skip", release.Indexer, release.FilterName, release.TorrentName, act.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s , run action: %s", release.Indexer, release.Filter.Name, release.TorrentName, act.Name)
|
||||
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s , run action: %s", release.Indexer, release.FilterName, release.TorrentName, act.Name)
|
||||
|
||||
// keep track of actiom clients to avoid sending the same thing all over again
|
||||
_, tried := triedActionClients[actionClientTypeKey{Type: act.Type, ClientID: act.ClientID}]
|
||||
if tried {
|
||||
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s action client already tried, skip", release.Indexer, release.Filter.Name, release.TorrentName)
|
||||
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s action client already tried, skip", release.Indexer, release.FilterName, release.TorrentName)
|
||||
continue
|
||||
}
|
||||
|
||||
// run action
|
||||
status, err := s.runAction(ctx, act, release)
|
||||
if err != nil {
|
||||
l.Error().Stack().Err(err).Msgf("release.Process: error running actions for filter: %s", release.Filter.Name)
|
||||
l.Error().Stack().Err(err).Msgf("release.Process: error running actions for filter: %s", release.FilterName)
|
||||
//continue
|
||||
}
|
||||
|
||||
rejections = status.Rejections
|
||||
|
||||
if err := s.StoreReleaseActionStatus(ctx, status); err != nil {
|
||||
s.log.Error().Err(err).Msgf("release.Process: error storing action status for filter: %s", release.Filter.Name)
|
||||
s.log.Error().Err(err).Msgf("release.Process: error storing action status for filter: %s", release.FilterName)
|
||||
}
|
||||
|
||||
if len(rejections) > 0 {
|
||||
|
@ -237,12 +248,12 @@ func (s *service) runAction(ctx context.Context, action *domain.Action, release
|
|||
status := domain.NewReleaseActionStatus(action, release)
|
||||
|
||||
if err := s.StoreReleaseActionStatus(ctx, status); err != nil {
|
||||
s.log.Error().Err(err).Msgf("release.runAction: error storing action for filter: %s", release.Filter.Name)
|
||||
s.log.Error().Err(err).Msgf("release.runAction: error storing action for filter: %s", release.FilterName)
|
||||
}
|
||||
|
||||
rejections, err := s.actionSvc.RunAction(ctx, action, release)
|
||||
if err != nil {
|
||||
s.log.Error().Stack().Err(err).Msgf("release.runAction: error running actions for filter: %s", release.Filter.Name)
|
||||
s.log.Error().Stack().Err(err).Msgf("release.runAction: error running actions for filter: %s", release.FilterName)
|
||||
|
||||
status.Status = domain.ReleasePushStatusErr
|
||||
status.Rejections = []string{err.Error()}
|
||||
|
@ -261,3 +272,54 @@ func (s *service) runAction(ctx context.Context, action *domain.Action, release
|
|||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (s *service) retryAction(ctx context.Context, action *domain.Action, release *domain.Release) error {
|
||||
actionStatus, err := s.runAction(ctx, action, release)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msgf("release.retryAction: error running actions for filter: %s", release.FilterName)
|
||||
|
||||
if err := s.StoreReleaseActionStatus(ctx, actionStatus); err != nil {
|
||||
s.log.Error().Err(err).Msgf("release.retryAction: error storing filterAction status for filter: %s", release.FilterName)
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.StoreReleaseActionStatus(ctx, actionStatus); err != nil {
|
||||
s.log.Error().Err(err).Msgf("release.retryAction: error storing filterAction status for filter: %s", release.FilterName)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Retry(ctx context.Context, req *domain.ReleaseActionRetryReq) error {
|
||||
// get release
|
||||
release, err := s.Get(ctx, &domain.GetReleaseRequest{Id: req.ReleaseId})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get release filter action status
|
||||
status, err := s.GetActionStatus(ctx, &domain.GetReleaseActionStatusRequest{Id: req.ActionStatusId})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get filter action with action id from status
|
||||
filterAction, err := s.actionSvc.Get(ctx, &domain.GetActionRequest{Id: int(status.ActionID)})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// run filterAction
|
||||
if err := s.retryAction(ctx, filterAction, release); err != nil {
|
||||
s.log.Error().Err(err).Msgf("release.Retry: error re-running action: %s", filterAction.Name)
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.Info().Msgf("successfully replayed action %s for release %s", filterAction.Name, release.TorrentName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue