mirror of
https://github.com/idanoo/autobrr
synced 2025-07-23 16:59:12 +00:00
fix(releases): max downloads per hour (#883)
* fix(releases): max downloads per hour * refactor: release processing * compare apples to apples (#884) * from rocketships back to apples * Update internal/database/filter.go * cast me to the * keep your eye on the case --------- Co-authored-by: Kyle Sanderson <kyle.leet@gmail.com>
This commit is contained in:
parent
da5492febb
commit
ef3445cbed
7 changed files with 139 additions and 107 deletions
|
@ -93,18 +93,6 @@ func (s *service) RunAction(ctx context.Context, action *domain.Action, release
|
||||||
return rejections, err
|
return rejections, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rlsActionStatus := &domain.ReleaseActionStatus{
|
|
||||||
ReleaseID: release.ID,
|
|
||||||
Status: domain.ReleasePushStatusApproved,
|
|
||||||
Action: action.Name,
|
|
||||||
Type: action.Type,
|
|
||||||
Client: action.Client.Name,
|
|
||||||
Filter: release.Filter.Name,
|
|
||||||
FilterID: int64(release.Filter.ID),
|
|
||||||
Rejections: []string{},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := &domain.NotificationPayload{
|
payload := &domain.NotificationPayload{
|
||||||
Event: domain.NotificationEventPushApproved,
|
Event: domain.NotificationEventPushApproved,
|
||||||
ReleaseName: release.TorrentName,
|
ReleaseName: release.TorrentName,
|
||||||
|
@ -126,26 +114,17 @@ func (s *service) RunAction(ctx context.Context, action *domain.Action, release
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error().Err(err).Msgf("process action failed: %v for '%v'", action.Name, release.TorrentName)
|
s.log.Error().Err(err).Msgf("process action failed: %v for '%v'", action.Name, release.TorrentName)
|
||||||
|
|
||||||
rlsActionStatus.Status = domain.ReleasePushStatusErr
|
|
||||||
rlsActionStatus.Rejections = []string{err.Error()}
|
|
||||||
|
|
||||||
payload.Event = domain.NotificationEventPushError
|
payload.Event = domain.NotificationEventPushError
|
||||||
payload.Status = domain.ReleasePushStatusErr
|
payload.Status = domain.ReleasePushStatusErr
|
||||||
payload.Rejections = []string{err.Error()}
|
payload.Rejections = []string{err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
if rejections != nil {
|
if rejections != nil {
|
||||||
rlsActionStatus.Status = domain.ReleasePushStatusRejected
|
|
||||||
rlsActionStatus.Rejections = rejections
|
|
||||||
|
|
||||||
payload.Event = domain.NotificationEventPushRejected
|
payload.Event = domain.NotificationEventPushRejected
|
||||||
payload.Status = domain.ReleasePushStatusRejected
|
payload.Status = domain.ReleasePushStatusRejected
|
||||||
payload.Rejections = rejections
|
payload.Rejections = rejections
|
||||||
}
|
}
|
||||||
|
|
||||||
// send event for actions
|
|
||||||
s.bus.Publish("release:push", rlsActionStatus)
|
|
||||||
|
|
||||||
// send separate event for notifications
|
// send separate event for notifications
|
||||||
s.bus.Publish("events:notification", &payload.Event, payload)
|
s.bus.Publish("events:notification", &payload.Event, payload)
|
||||||
|
|
||||||
|
|
|
@ -306,35 +306,11 @@ func (r *FilterRepo) FindByID(ctx context.Context, filterID int) (*domain.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindByIndexerIdentifier find active filters with active indexer only
|
// FindByIndexerIdentifier find active filters with active indexer only
|
||||||
func (r *FilterRepo) FindByIndexerIdentifier(indexer string) ([]domain.Filter, error) {
|
func (r *FilterRepo) FindByIndexerIdentifier(ctx context.Context, indexer string) ([]domain.Filter, error) {
|
||||||
ctx := context.TODO()
|
return r.findByIndexerIdentifier(ctx, indexer)
|
||||||
tx, err := r.db.BeginTx(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "error begin transaction")
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
|
|
||||||
filters, err := r.findByIndexerIdentifier(ctx, tx, indexer)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, filter := range filters {
|
|
||||||
downloads, err := r.attachDownloadsByFilter(ctx, tx, filter.ID)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
filters[i].Downloads = downloads
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
return nil, errors.Wrap(err, "error finding filter by identifier")
|
|
||||||
}
|
|
||||||
|
|
||||||
return filters, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *FilterRepo) findByIndexerIdentifier(ctx context.Context, tx *Tx, indexer string) ([]domain.Filter, error) {
|
func (r *FilterRepo) findByIndexerIdentifier(ctx context.Context, indexer string) ([]domain.Filter, error) {
|
||||||
queryBuilder := r.db.squirrel.
|
queryBuilder := r.db.squirrel.
|
||||||
Select(
|
Select(
|
||||||
"f.id",
|
"f.id",
|
||||||
|
@ -416,7 +392,7 @@ func (r *FilterRepo) findByIndexerIdentifier(ctx context.Context, tx *Tx, indexe
|
||||||
return nil, errors.Wrap(err, "error building query")
|
return nil, errors.Wrap(err, "error building query")
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := tx.QueryContext(ctx, query, args...)
|
rows, err := r.db.handler.QueryContext(ctx, query, args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "error executing query")
|
return nil, errors.Wrap(err, "error executing query")
|
||||||
}
|
}
|
||||||
|
@ -1052,25 +1028,25 @@ func (r *FilterRepo) Delete(ctx context.Context, filterID int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *FilterRepo) attachDownloadsByFilter(ctx context.Context, tx *Tx, filterID int) (*domain.FilterDownloads, error) {
|
func (r *FilterRepo) GetDownloadsByFilterId(ctx context.Context, filterID int) (*domain.FilterDownloads, error) {
|
||||||
if r.db.Driver == "sqlite" {
|
if r.db.Driver == "sqlite" {
|
||||||
return r.downloadsByFilterSqlite(ctx, tx, filterID)
|
return r.downloadsByFilterSqlite(ctx, filterID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.downloadsByFilterPostgres(ctx, tx, filterID)
|
return r.downloadsByFilterPostgres(ctx, filterID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *FilterRepo) downloadsByFilterSqlite(ctx context.Context, tx *Tx, filterID int) (*domain.FilterDownloads, error) {
|
func (r *FilterRepo) downloadsByFilterSqlite(ctx context.Context, filterID int) (*domain.FilterDownloads, error) {
|
||||||
query := `SELECT
|
query := `SELECT
|
||||||
IFNULL(SUM(CASE WHEN release_action_status.timestamp >= strftime('%Y-%m-%d %H:00:00', datetime('now','localtime')) THEN 1 ELSE 0 END),0) as "hour_count",
|
COUNT(CASE WHEN CAST(strftime('%s', datetime(release_action_status.timestamp, 'localtime')) AS INTEGER) >= CAST(strftime('%s', strftime('%Y-%m-%dT%H:00:00', datetime('now','localtime'))) AS INTEGER) THEN 1 END) as "hour_count",
|
||||||
IFNULL(SUM(CASE WHEN release_action_status.timestamp >= datetime('now', 'localtime', 'start of day') THEN 1 ELSE 0 END),0) as "day_count",
|
COUNT(CASE WHEN CAST(strftime('%s', datetime(release_action_status.timestamp, 'localtime')) AS INTEGER) >= CAST(strftime('%s', datetime('now', 'localtime', 'start of day')) AS INTEGER) THEN 1 END) as "day_count",
|
||||||
IFNULL(SUM(CASE WHEN release_action_status.timestamp >= datetime('now', 'localtime', 'weekday 0', '-7 days') THEN 1 ELSE 0 END),0) as "week_count",
|
COUNT(CASE WHEN CAST(strftime('%s', datetime(release_action_status.timestamp, 'localtime')) AS INTEGER) >= CAST(strftime('%s', datetime('now', 'localtime', 'weekday 0', '-7 days', 'start of day')) AS INTEGER) THEN 1 END) as "week_count",
|
||||||
IFNULL(SUM(CASE WHEN release_action_status.timestamp >= datetime('now', 'localtime', 'start of month') THEN 1 ELSE 0 END),0) as "month_count",
|
COUNT(CASE WHEN CAST(strftime('%s', datetime(release_action_status.timestamp, 'localtime')) AS INTEGER) >= CAST(strftime('%s', datetime('now', 'localtime', 'start of month')) AS INTEGER) THEN 1 END) as "month_count",
|
||||||
count(*) as "total_count"
|
COUNT(*) as "total_count"
|
||||||
FROM release_action_status
|
FROM release_action_status
|
||||||
WHERE release_action_status.status = 'PUSH_APPROVED' AND release_action_status.filter_id = ?;`
|
WHERE (release_action_status.status = 'PUSH_APPROVED' OR release_action_status.status = 'PENDING') AND release_action_status.filter_id = ?;`
|
||||||
|
|
||||||
row := tx.QueryRowContext(ctx, query, filterID)
|
row := r.db.handler.QueryRowContext(ctx, query, filterID)
|
||||||
if err := row.Err(); err != nil {
|
if err := row.Err(); err != nil {
|
||||||
return nil, errors.Wrap(err, "error executing query")
|
return nil, errors.Wrap(err, "error executing query")
|
||||||
}
|
}
|
||||||
|
@ -1086,7 +1062,7 @@ WHERE release_action_status.status = 'PUSH_APPROVED' AND release_action_status.f
|
||||||
return &f, nil
|
return &f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *FilterRepo) downloadsByFilterPostgres(ctx context.Context, tx *Tx, filterID int) (*domain.FilterDownloads, error) {
|
func (r *FilterRepo) downloadsByFilterPostgres(ctx context.Context, filterID int) (*domain.FilterDownloads, error) {
|
||||||
query := `SELECT
|
query := `SELECT
|
||||||
COALESCE(SUM(CASE WHEN release_action_status.timestamp >= date_trunc('hour', CURRENT_TIMESTAMP) THEN 1 ELSE 0 END),0) as "hour_count",
|
COALESCE(SUM(CASE WHEN release_action_status.timestamp >= date_trunc('hour', CURRENT_TIMESTAMP) THEN 1 ELSE 0 END),0) as "hour_count",
|
||||||
COALESCE(SUM(CASE WHEN release_action_status.timestamp >= date_trunc('day', CURRENT_DATE) THEN 1 ELSE 0 END),0) as "day_count",
|
COALESCE(SUM(CASE WHEN release_action_status.timestamp >= date_trunc('day', CURRENT_DATE) THEN 1 ELSE 0 END),0) as "day_count",
|
||||||
|
@ -1096,7 +1072,7 @@ func (r *FilterRepo) downloadsByFilterPostgres(ctx context.Context, tx *Tx, filt
|
||||||
FROM release_action_status
|
FROM release_action_status
|
||||||
WHERE release_action_status.status = 'PUSH_APPROVED' AND release_action_status.filter_id = $1;`
|
WHERE release_action_status.status = 'PUSH_APPROVED' AND release_action_status.filter_id = $1;`
|
||||||
|
|
||||||
row := tx.QueryRowContext(ctx, query, filterID)
|
row := r.db.handler.QueryRowContext(ctx, query, filterID)
|
||||||
if err := row.Err(); err != nil {
|
if err := row.Err(); err != nil {
|
||||||
return nil, errors.Wrap(err, "error executing query")
|
return nil, errors.Wrap(err, "error executing query")
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,23 +54,22 @@ func (repo *ReleaseRepo) Store(ctx context.Context, r *domain.Release) (*domain.
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *ReleaseRepo) StoreReleaseActionStatus(ctx context.Context, a *domain.ReleaseActionStatus) error {
|
func (repo *ReleaseRepo) StoreReleaseActionStatus(ctx context.Context, status *domain.ReleaseActionStatus) error {
|
||||||
if a.ID != 0 {
|
if status.ID != 0 {
|
||||||
queryBuilder := repo.db.squirrel.
|
queryBuilder := repo.db.squirrel.
|
||||||
Update("release_action_status").
|
Update("release_action_status").
|
||||||
Set("status", a.Status).
|
Set("status", status.Status).
|
||||||
Set("rejections", pq.Array(a.Rejections)).
|
Set("rejections", pq.Array(status.Rejections)).
|
||||||
Set("timestamp", a.Timestamp).
|
Set("timestamp", status.Timestamp.Format(time.RFC3339)).
|
||||||
Where(sq.Eq{"id": a.ID}).
|
Where(sq.Eq{"id": status.ID}).
|
||||||
Where(sq.Eq{"release_id": a.ReleaseID})
|
Where(sq.Eq{"release_id": status.ReleaseID})
|
||||||
|
|
||||||
query, args, err := queryBuilder.ToSql()
|
query, args, err := queryBuilder.ToSql()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "error building query")
|
return errors.Wrap(err, "error building query")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = repo.db.handler.ExecContext(ctx, query, args...)
|
if _, err = repo.db.handler.ExecContext(ctx, query, args...); err != nil {
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "error executing query")
|
return errors.Wrap(err, "error executing query")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,21 +77,20 @@ func (repo *ReleaseRepo) StoreReleaseActionStatus(ctx context.Context, a *domain
|
||||||
queryBuilder := repo.db.squirrel.
|
queryBuilder := repo.db.squirrel.
|
||||||
Insert("release_action_status").
|
Insert("release_action_status").
|
||||||
Columns("status", "action", "type", "client", "filter", "filter_id", "rejections", "timestamp", "release_id").
|
Columns("status", "action", "type", "client", "filter", "filter_id", "rejections", "timestamp", "release_id").
|
||||||
Values(a.Status, a.Action, a.Type, a.Client, a.Filter, a.FilterID, pq.Array(a.Rejections), a.Timestamp, a.ReleaseID).
|
Values(status.Status, status.Action, status.Type, status.Client, status.Filter, status.FilterID, pq.Array(status.Rejections), status.Timestamp.Format(time.RFC3339), status.ReleaseID).
|
||||||
Suffix("RETURNING id").RunWith(repo.db.handler)
|
Suffix("RETURNING id").RunWith(repo.db.handler)
|
||||||
|
|
||||||
// return values
|
// return values
|
||||||
var retID int64
|
var retID int64
|
||||||
|
|
||||||
err := queryBuilder.QueryRowContext(ctx).Scan(&retID)
|
if err := queryBuilder.QueryRowContext(ctx).Scan(&retID); err != nil {
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "error executing query")
|
return errors.Wrap(err, "error executing query")
|
||||||
}
|
}
|
||||||
|
|
||||||
a.ID = retID
|
status.ID = retID
|
||||||
}
|
}
|
||||||
|
|
||||||
repo.log.Trace().Msgf("release.store_release_action_status: %+v", a)
|
repo.log.Trace().Msgf("release.store_release_action_status: %+v", status)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ https://autodl-community.github.io/autodl-irssi/configuration/filter/
|
||||||
|
|
||||||
type FilterRepo interface {
|
type FilterRepo interface {
|
||||||
FindByID(ctx context.Context, filterID int) (*Filter, error)
|
FindByID(ctx context.Context, filterID int) (*Filter, error)
|
||||||
FindByIndexerIdentifier(indexer string) ([]Filter, error)
|
FindByIndexerIdentifier(ctx context.Context, indexer string) ([]Filter, error)
|
||||||
Find(ctx context.Context, params FilterQueryParams) ([]Filter, error)
|
Find(ctx context.Context, params FilterQueryParams) ([]Filter, error)
|
||||||
ListFilters(ctx context.Context) ([]Filter, error)
|
ListFilters(ctx context.Context) ([]Filter, error)
|
||||||
Store(ctx context.Context, filter Filter) (*Filter, error)
|
Store(ctx context.Context, filter Filter) (*Filter, error)
|
||||||
|
@ -30,6 +30,7 @@ type FilterRepo interface {
|
||||||
StoreIndexerConnection(ctx context.Context, filterID int, indexerID int) error
|
StoreIndexerConnection(ctx context.Context, filterID int, indexerID int) error
|
||||||
StoreIndexerConnections(ctx context.Context, filterID int, indexers []Indexer) error
|
StoreIndexerConnections(ctx context.Context, filterID int, indexers []Indexer) error
|
||||||
DeleteIndexerConnections(ctx context.Context, filterID int) error
|
DeleteIndexerConnections(ctx context.Context, filterID int) error
|
||||||
|
GetDownloadsByFilterId(ctx context.Context, filterID int) (*FilterDownloads, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type FilterDownloads struct {
|
type FilterDownloads struct {
|
||||||
|
|
|
@ -30,7 +30,7 @@ type ReleaseRepo interface {
|
||||||
GetIndexerOptions(ctx context.Context) ([]string, error)
|
GetIndexerOptions(ctx context.Context) ([]string, error)
|
||||||
GetActionStatusByReleaseID(ctx context.Context, releaseID int64) ([]ReleaseActionStatus, error)
|
GetActionStatusByReleaseID(ctx context.Context, releaseID int64) ([]ReleaseActionStatus, error)
|
||||||
Stats(ctx context.Context) (*ReleaseStats, error)
|
Stats(ctx context.Context) (*ReleaseStats, error)
|
||||||
StoreReleaseActionStatus(ctx context.Context, actionStatus *ReleaseActionStatus) error
|
StoreReleaseActionStatus(ctx context.Context, status *ReleaseActionStatus) error
|
||||||
Delete(ctx context.Context) error
|
Delete(ctx context.Context) error
|
||||||
CanDownloadShow(ctx context.Context, title string, season int, episode int) (bool, error)
|
CanDownloadShow(ctx context.Context, title string, season int, episode int) (bool, error)
|
||||||
}
|
}
|
||||||
|
@ -105,6 +105,21 @@ type ReleaseActionStatus struct {
|
||||||
ReleaseID int64 `json:"-"`
|
ReleaseID int64 `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewReleaseActionStatus(action *Action, release *Release) *ReleaseActionStatus {
|
||||||
|
return &ReleaseActionStatus{
|
||||||
|
ID: 0,
|
||||||
|
Status: ReleasePushStatusPending,
|
||||||
|
Action: action.Name,
|
||||||
|
Type: action.Type,
|
||||||
|
Client: action.Client.Name,
|
||||||
|
Filter: release.Filter.Name,
|
||||||
|
FilterID: int64(release.Filter.ID),
|
||||||
|
Rejections: []string{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
ReleaseID: release.ID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type DownloadTorrentFileResponse struct {
|
type DownloadTorrentFileResponse struct {
|
||||||
MetaInfo *metainfo.MetaInfo
|
MetaInfo *metainfo.MetaInfo
|
||||||
TmpFileName string
|
TmpFileName string
|
||||||
|
@ -121,11 +136,10 @@ type ReleaseStats struct {
|
||||||
type ReleasePushStatus string
|
type ReleasePushStatus string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
ReleasePushStatusPending ReleasePushStatus = "PENDING" // Initial status
|
||||||
ReleasePushStatusApproved ReleasePushStatus = "PUSH_APPROVED"
|
ReleasePushStatusApproved ReleasePushStatus = "PUSH_APPROVED"
|
||||||
ReleasePushStatusRejected ReleasePushStatus = "PUSH_REJECTED"
|
ReleasePushStatusRejected ReleasePushStatus = "PUSH_REJECTED"
|
||||||
ReleasePushStatusErr ReleasePushStatus = "PUSH_ERROR"
|
ReleasePushStatusErr ReleasePushStatus = "PUSH_ERROR"
|
||||||
|
|
||||||
//ReleasePushStatusPending ReleasePushStatus = "PENDING" // Initial status
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r ReleasePushStatus) String() string {
|
func (r ReleasePushStatus) String() string {
|
||||||
|
@ -571,7 +585,7 @@ func (r *Release) MapVars(def *IndexerDefinition, varMap map[string]string) erro
|
||||||
//log.Debug().Msgf("bad freeleechPercent var: %v", year)
|
//log.Debug().Msgf("bad freeleechPercent var: %v", year)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (freeleechPercentInt > 0) {
|
if freeleechPercentInt > 0 {
|
||||||
r.Freeleech = true
|
r.Freeleech = true
|
||||||
r.FreeleechPercent = freeleechPercentInt
|
r.FreeleechPercent = freeleechPercentInt
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import (
|
||||||
|
|
||||||
type Service interface {
|
type Service interface {
|
||||||
FindByID(ctx context.Context, filterID int) (*domain.Filter, error)
|
FindByID(ctx context.Context, filterID int) (*domain.Filter, error)
|
||||||
FindByIndexerIdentifier(indexer string) ([]domain.Filter, error)
|
FindByIndexerIdentifier(ctx context.Context, indexer string) ([]domain.Filter, error)
|
||||||
Find(ctx context.Context, params domain.FilterQueryParams) ([]domain.Filter, error)
|
Find(ctx context.Context, params domain.FilterQueryParams) ([]domain.Filter, error)
|
||||||
CheckFilter(ctx context.Context, f domain.Filter, release *domain.Release) (bool, error)
|
CheckFilter(ctx context.Context, f domain.Filter, release *domain.Release) (bool, error)
|
||||||
ListFilters(ctx context.Context) ([]domain.Filter, error)
|
ListFilters(ctx context.Context) ([]domain.Filter, error)
|
||||||
|
@ -35,6 +35,7 @@ type Service interface {
|
||||||
Delete(ctx context.Context, filterID int) error
|
Delete(ctx context.Context, filterID int) error
|
||||||
AdditionalSizeCheck(ctx context.Context, f domain.Filter, release *domain.Release) (bool, error)
|
AdditionalSizeCheck(ctx context.Context, f domain.Filter, release *domain.Release) (bool, error)
|
||||||
CanDownloadShow(ctx context.Context, release *domain.Release) (bool, error)
|
CanDownloadShow(ctx context.Context, release *domain.Release) (bool, error)
|
||||||
|
GetDownloadsByFilterId(ctx context.Context, filterID int) (*domain.FilterDownloads, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
|
@ -128,9 +129,9 @@ func (s *service) FindByID(ctx context.Context, filterID int) (*domain.Filter, e
|
||||||
return filter, nil
|
return filter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) FindByIndexerIdentifier(indexer string) ([]domain.Filter, error) {
|
func (s *service) FindByIndexerIdentifier(ctx context.Context, indexer string) ([]domain.Filter, error) {
|
||||||
// get filters for indexer
|
// get filters for indexer
|
||||||
filters, err := s.repo.FindByIndexerIdentifier(indexer)
|
filters, err := s.repo.FindByIndexerIdentifier(ctx, indexer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error().Err(err).Msgf("could not find filters for indexer: %v", indexer)
|
s.log.Error().Err(err).Msgf("could not find filters for indexer: %v", indexer)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -139,6 +140,10 @@ func (s *service) FindByIndexerIdentifier(indexer string) ([]domain.Filter, erro
|
||||||
return filters, nil
|
return filters, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *service) GetDownloadsByFilterId(ctx context.Context, filterID int) (*domain.FilterDownloads, error) {
|
||||||
|
return s.GetDownloadsByFilterId(ctx, filterID)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *service) Store(ctx context.Context, filter domain.Filter) (*domain.Filter, error) {
|
func (s *service) Store(ctx context.Context, filter domain.Filter) (*domain.Filter, error) {
|
||||||
// validate data
|
// validate data
|
||||||
|
|
||||||
|
@ -302,6 +307,16 @@ func (s *service) CheckFilter(ctx context.Context, f domain.Filter, release *dom
|
||||||
s.log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %v %+v", f.Name, f)
|
s.log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %v %+v", f.Name, f)
|
||||||
s.log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %v for release: %+v", f.Name, release)
|
s.log.Trace().Msgf("filter.Service.CheckFilter: checking filter: %v for release: %+v", f.Name, release)
|
||||||
|
|
||||||
|
// do additional fetch to get download counts for filter
|
||||||
|
if f.MaxDownloads > 0 {
|
||||||
|
downloadCounts, err := s.repo.GetDownloadsByFilterId(ctx, f.ID)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error().Err(err).Msg("filter.Service.CheckFilter: error getting download counters for filter")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
f.Downloads = downloadCounts
|
||||||
|
}
|
||||||
|
|
||||||
rejections, matchedFilter := f.CheckFilter(release)
|
rejections, matchedFilter := f.CheckFilter(release)
|
||||||
if len(rejections) > 0 {
|
if len(rejections) > 0 {
|
||||||
s.log.Debug().Msgf("filter.Service.CheckFilter: (%v) for release: %v rejections: (%v)", f.Name, release.TorrentName, release.RejectionsString())
|
s.log.Debug().Msgf("filter.Service.CheckFilter: (%v) for release: %v rejections: (%v)", f.Name, release.TorrentName, release.RejectionsString())
|
||||||
|
@ -380,7 +395,7 @@ func (s *service) CheckFilter(ctx context.Context, f domain.Filter, release *dom
|
||||||
}
|
}
|
||||||
|
|
||||||
// found matching filter, lets find the filter actions and attach
|
// found matching filter, lets find the filter actions and attach
|
||||||
actions, err := s.actionRepo.FindByFilterID(context.TODO(), f.ID)
|
actions, err := s.actionRepo.FindByFilterID(ctx, f.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error().Err(err).Msgf("filter.Service.CheckFilter: error finding actions for filter: %+v", f.Name)
|
s.log.Error().Err(err).Msgf("filter.Service.CheckFilter: error finding actions for filter: %+v", f.Name)
|
||||||
return false, err
|
return false, err
|
||||||
|
|
|
@ -73,8 +73,8 @@ func (s *service) Store(ctx context.Context, release *domain.Release) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) StoreReleaseActionStatus(ctx context.Context, actionStatus *domain.ReleaseActionStatus) error {
|
func (s *service) StoreReleaseActionStatus(ctx context.Context, status *domain.ReleaseActionStatus) error {
|
||||||
return s.repo.StoreReleaseActionStatus(ctx, actionStatus)
|
return s.repo.StoreReleaseActionStatus(ctx, status)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Delete(ctx context.Context) error {
|
func (s *service) Delete(ctx context.Context) error {
|
||||||
|
@ -86,7 +86,16 @@ func (s *service) Process(release *domain.Release) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
s.log.Error().Msgf("recovering from panic in release process %s error: %v", release.TorrentName, r)
|
||||||
|
//err := errors.New("panic in release process: %s", release.TorrentName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
defer release.CleanupTemporaryFiles()
|
defer release.CleanupTemporaryFiles()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// TODO check in config for "Save all releases"
|
// TODO check in config for "Save all releases"
|
||||||
|
@ -94,13 +103,14 @@ func (s *service) Process(release *domain.Release) {
|
||||||
// TODO dupe checks
|
// TODO dupe checks
|
||||||
|
|
||||||
// get filters by priority
|
// get filters by priority
|
||||||
filters, err := s.filterSvc.FindByIndexerIdentifier(release.Indexer)
|
filters, err := s.filterSvc.FindByIndexerIdentifier(ctx, release.Indexer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error().Err(err).Msgf("release.Process: error finding filters for indexer: %v", release.Indexer)
|
s.log.Error().Err(err).Msgf("release.Process: error finding filters for indexer: %s", release.Indexer)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(filters) == 0 {
|
if len(filters) == 0 {
|
||||||
|
s.log.Warn().Msgf("no active filters found for indexer: %s", release.Indexer)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,13 +135,13 @@ func (s *service) Process(release *domain.Release) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !match {
|
if !match {
|
||||||
l.Trace().Msgf("release.Process: indexer: %v, filter: %v release: %v, no match. rejections: %v", release.Indexer, release.Filter.Name, release.TorrentName, release.RejectionsString())
|
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s, no match. rejections: %s", release.Indexer, release.Filter.Name, release.TorrentName, release.RejectionsString())
|
||||||
|
|
||||||
l.Debug().Msgf("release rejected: %v", release.RejectionsString())
|
l.Debug().Msgf("release rejected: %s", release.RejectionsString())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
l.Info().Msgf("Matched '%v' (%v) for %v", release.TorrentName, release.Filter.Name, release.Indexer)
|
l.Info().Msgf("Matched '%s' (%s) for %s", release.TorrentName, release.Filter.Name, release.Indexer)
|
||||||
|
|
||||||
// save release here to only save those with rejections from actions instead of all releases
|
// save release here to only save those with rejections from actions instead of all releases
|
||||||
if release.ID == 0 {
|
if release.ID == 0 {
|
||||||
|
@ -145,7 +155,7 @@ func (s *service) Process(release *domain.Release) {
|
||||||
// sleep for the delay period specified in the filter before running actions
|
// sleep for the delay period specified in the filter before running actions
|
||||||
delay := release.Filter.Delay
|
delay := release.Filter.Delay
|
||||||
if delay > 0 {
|
if delay > 0 {
|
||||||
l.Debug().Msgf("Delaying processing of '%v' (%v) for %v by %d seconds as specified in the filter", release.TorrentName, release.Filter.Name, release.Indexer, delay)
|
l.Debug().Msgf("Delaying processing of '%s' (%s) for %s by %d seconds as specified in the filter", release.TorrentName, release.Filter.Name, release.Indexer, delay)
|
||||||
time.Sleep(time.Duration(delay) * time.Second)
|
time.Sleep(time.Duration(delay) * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,33 +163,42 @@ func (s *service) Process(release *domain.Release) {
|
||||||
|
|
||||||
// run actions (watchFolder, test, exec, qBittorrent, Deluge, arr etc.)
|
// run actions (watchFolder, test, exec, qBittorrent, Deluge, arr etc.)
|
||||||
for _, a := range release.Filter.Actions {
|
for _, a := range release.Filter.Actions {
|
||||||
|
act := a
|
||||||
|
|
||||||
// only run enabled actions
|
// only run enabled actions
|
||||||
if !a.Enabled {
|
if !act.Enabled {
|
||||||
l.Trace().Msgf("release.Process: indexer: %v, filter: %v release: %v action '%v' not enabled, skip", release.Indexer, release.Filter.Name, release.TorrentName, a.Name)
|
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s action '%s' not enabled, skip", release.Indexer, release.Filter.Name, release.TorrentName, act.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
l.Trace().Msgf("release.Process: indexer: %v, filter: %v release: %v , run action: %v", release.Indexer, release.Filter.Name, release.TorrentName, a.Name)
|
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s , run action: %s", release.Indexer, release.Filter.Name, release.TorrentName, act.Name)
|
||||||
|
|
||||||
// keep track of action clients to avoid sending the same thing all over again
|
// keep track of actiom clients to avoid sending the same thing all over again
|
||||||
_, tried := triedActionClients[actionClientTypeKey{Type: a.Type, ClientID: a.ClientID}]
|
_, tried := triedActionClients[actionClientTypeKey{Type: act.Type, ClientID: act.ClientID}]
|
||||||
if tried {
|
if tried {
|
||||||
l.Trace().Msgf("release.Process: indexer: %v, filter: %v release: %v action client already tried, skip", release.Indexer, release.Filter.Name, release.TorrentName)
|
l.Trace().Msgf("release.Process: indexer: %s, filter: %s release: %s action client already tried, skip", release.Indexer, release.Filter.Name, release.TorrentName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
rejections, err = s.actionSvc.RunAction(ctx, a, release)
|
// run action
|
||||||
|
status, err := s.runAction(ctx, act, release)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Error().Stack().Err(err).Msgf("release.Process: error running actions for filter: %v", release.Filter.Name)
|
l.Error().Stack().Err(err).Msgf("release.Process: error running actions for filter: %s", release.Filter.Name)
|
||||||
continue
|
//continue
|
||||||
|
}
|
||||||
|
|
||||||
|
rejections = status.Rejections
|
||||||
|
|
||||||
|
if err := s.StoreReleaseActionStatus(ctx, status); err != nil {
|
||||||
|
s.log.Error().Err(err).Msgf("release.Process: error storing action status for filter: %s", release.Filter.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rejections) > 0 {
|
if len(rejections) > 0 {
|
||||||
// if we get a rejection, remember which action client it was from
|
// if we get action rejection, remember which action client it was from
|
||||||
triedActionClients[actionClientTypeKey{Type: a.Type, ClientID: a.ClientID}] = struct{}{}
|
triedActionClients[actionClientTypeKey{Type: act.Type, ClientID: act.ClientID}] = struct{}{}
|
||||||
|
|
||||||
// log something and fire events
|
// log something and fire events
|
||||||
l.Debug().Str("action", a.Name).Str("action_type", string(a.Type)).Msgf("release rejected: %v", strings.Join(rejections, ", "))
|
l.Debug().Str("action", act.Name).Str("action_type", string(act.Type)).Msgf("release rejected: %s", strings.Join(rejections, ", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// if no rejections consider action approved, run next
|
// if no rejections consider action approved, run next
|
||||||
|
@ -209,3 +228,33 @@ func (s *service) ProcessMultiple(releases []*domain.Release) {
|
||||||
s.Process(rls)
|
s.Process(rls)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *service) runAction(ctx context.Context, action *domain.Action, release *domain.Release) (*domain.ReleaseActionStatus, error) {
|
||||||
|
// add action status as pending
|
||||||
|
status := domain.NewReleaseActionStatus(action, release)
|
||||||
|
|
||||||
|
if err := s.StoreReleaseActionStatus(ctx, status); err != nil {
|
||||||
|
s.log.Error().Err(err).Msgf("release.runAction: error storing action for filter: %s", release.Filter.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
rejections, err := s.actionSvc.RunAction(ctx, action, release)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error().Stack().Err(err).Msgf("release.runAction: error running actions for filter: %s", release.Filter.Name)
|
||||||
|
|
||||||
|
status.Status = domain.ReleasePushStatusErr
|
||||||
|
status.Rejections = []string{err.Error()}
|
||||||
|
|
||||||
|
return status, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if rejections != nil {
|
||||||
|
status.Status = domain.ReleasePushStatusRejected
|
||||||
|
status.Rejections = rejections
|
||||||
|
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
status.Status = domain.ReleasePushStatusApproved
|
||||||
|
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue