Feature: Support multiple action status per release (#69)

* feat: move release actions to separate table

* chore: update sqlite driver
This commit is contained in:
Ludvig Lundgren 2022-01-08 15:40:31 +01:00 committed by GitHub
parent 2ea2293745
commit e03eac24ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 284 additions and 91 deletions

View file

@ -4,6 +4,7 @@ import (
"io"
"os"
"path"
"time"
"github.com/rs/zerolog/log"
@ -24,10 +25,20 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
log.Debug().Msgf("process action: %v for '%v'", action.Name, release.TorrentName)
actionStatus := domain.ReleaseActionStatus{
ReleaseID: release.ID,
Status: domain.ReleasePushStatusPending,
Action: action.Name,
Type: action.Type,
Rejections: []string{},
Timestamp: time.Now(),
}
s.bus.Publish("release:store-action-status", &actionStatus)
switch action.Type {
case domain.ActionTypeTest:
s.test(action.Name)
s.bus.Publish("release:update-push-status", release.ID, domain.ReleasePushStatusApproved)
case domain.ActionTypeExec:
if release.TorrentTmpFile == "" {
@ -42,7 +53,6 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
go func(release domain.Release, action domain.Action, tmpFile string) {
s.execCmd(release, action, tmpFile)
s.bus.Publish("release:update-push-status", release.ID, domain.ReleasePushStatusApproved)
}(release, action, tmpFile)
case domain.ActionTypeWatchFolder:
@ -56,7 +66,6 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
tmpFile = t.TmpFileName
}
s.watchFolder(action.WatchFolder, tmpFile)
s.bus.Publish("release:update-push-status", release.ID, domain.ReleasePushStatusApproved)
case domain.ActionTypeDelugeV1, domain.ActionTypeDelugeV2:
canDownload, err := s.delugeCheckRulesCanDownload(action)
@ -65,7 +74,14 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
continue
}
if !canDownload {
s.bus.Publish("release:update-push-status-rejected", release.ID, "deluge busy")
s.bus.Publish("release:store-action-status", &domain.ReleaseActionStatus{
ID: actionStatus.ID,
ReleaseID: release.ID,
Status: domain.ReleasePushStatusRejected,
Action: action.Name,
Type: action.Type,
Rejections: []string{"deluge busy"},
})
continue
}
if release.TorrentTmpFile == "" {
@ -83,7 +99,6 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
if err != nil {
log.Error().Stack().Err(err).Msg("error sending torrent to Deluge")
}
s.bus.Publish("release:update-push-status", release.ID, domain.ReleasePushStatusApproved)
}(action, tmpFile)
case domain.ActionTypeQbittorrent:
@ -93,7 +108,14 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
continue
}
if !canDownload {
s.bus.Publish("release:update-push-status-rejected", release.ID, "qbittorrent busy")
s.bus.Publish("release:store-action-status", &domain.ReleaseActionStatus{
ID: actionStatus.ID,
ReleaseID: release.ID,
Status: domain.ReleasePushStatusRejected,
Action: action.Name,
Type: action.Type,
Rejections: []string{"qbittorrent busy"},
})
continue
}
@ -113,7 +135,6 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
if err != nil {
log.Error().Stack().Err(err).Msg("error sending torrent to qBittorrent")
}
s.bus.Publish("release:update-push-status", release.ID, domain.ReleasePushStatusApproved)
}(action, hash, tmpFile)
case domain.ActionTypeRadarr:
@ -146,6 +167,15 @@ func (s *service) RunActions(actions []domain.Action, release domain.Release) er
default:
log.Warn().Msgf("unsupported action: %v type: %v", action.Name, action.Type)
}
s.bus.Publish("release:store-action-status", &domain.ReleaseActionStatus{
ID: actionStatus.ID,
ReleaseID: release.ID,
Status: domain.ReleasePushStatusApproved,
Action: action.Name,
Type: action.Type,
Rejections: []string{},
})
}
// safe to delete tmp file

View file

@ -183,7 +183,8 @@ func (r *FilterRepo) FindByIndexerIdentifier(indexer string) ([]domain.Filter, e
WHERE i.identifier = ?
AND f.enabled = true`, indexer)
if err != nil {
log.Fatal().Err(err)
log.Error().Stack().Err(err).Msg("error querying filter row")
return nil, err
}
defer rows.Close()
@ -199,8 +200,6 @@ func (r *FilterRepo) FindByIndexerIdentifier(indexer string) ([]domain.Filter, e
if err := rows.Scan(&f.ID, &f.Enabled, &f.Name, &minSize, &maxSize, &delay, &matchReleases, &exceptReleases, &useRegex, &matchReleaseGroups, &exceptReleaseGroups, &scene, &freeleech, &freeleechPercent, &shows, &seasons, &episodes, pq.Array(&f.Resolutions), pq.Array(&f.Codecs), pq.Array(&f.Sources), pq.Array(&f.Containers), &years, &matchCategories, &exceptCategories, &matchUploaders, &exceptUploaders, &tags, &exceptTags, &createdAt, &updatedAt); err != nil {
log.Error().Stack().Err(err).Msg("error scanning data to struct")
}
if err != nil {
return nil, err
}

View file

@ -143,7 +143,6 @@ CREATE TABLE "release"
(
id INTEGER PRIMARY KEY,
filter_status TEXT,
push_status TEXT,
rejections TEXT [] DEFAULT '{}' NOT NULL,
indexer TEXT,
filter TEXT,
@ -190,6 +189,20 @@ CREATE TABLE "release"
uploader TEXT,
pre_time TEXT
);
CREATE TABLE release_action_status
(
id INTEGER PRIMARY KEY,
status TEXT,
action TEXT NOT NULL,
type TEXT NOT NULL,
rejections TEXT [] DEFAULT '{}' NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
raw TEXT,
log TEXT,
release_id INTEGER NOT NULL,
FOREIGN KEY (release_id) REFERENCES "release"(id)
);
`
var migrations = []string{
@ -247,6 +260,27 @@ var migrations = []string{
pre_time TEXT
);
`,
`
CREATE TABLE release_action_status
(
id INTEGER PRIMARY KEY,
status TEXT,
action TEXT NOT NULL,
type TEXT NOT NULL,
rejections TEXT [] DEFAULT '{}' NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
raw TEXT,
log TEXT,
release_id INTEGER NOT NULL,
FOREIGN KEY (release_id) REFERENCES "release"(id)
);
INSERT INTO "release_action_status" (status, action, type, timestamp, release_id)
SELECT push_status, 'DEFAULT', 'QBITTORRENT', timestamp, id FROM "release";
ALTER TABLE "release"
DROP COLUMN push_status;
`,
}
func Migrate(db *sql.DB) error {

View file

@ -23,8 +23,8 @@ func NewReleaseRepo(db *sql.DB) domain.ReleaseRepo {
func (repo *ReleaseRepo) Store(ctx context.Context, r *domain.Release) (*domain.Release, error) {
query, args, err := sq.
Insert("release").
Columns("filter_status", "push_status", "rejections", "indexer", "filter", "protocol", "implementation", "timestamp", "group_id", "torrent_id", "torrent_name", "size", "raw", "title", "category", "season", "episode", "year", "resolution", "source", "codec", "container", "hdr", "audio", "release_group", "region", "language", "edition", "unrated", "hybrid", "proper", "repack", "website", "artists", "type", "format", "bitrate", "log_score", "has_log", "has_cue", "is_scene", "origin", "tags", "freeleech", "freeleech_percent", "uploader", "pre_time").
Values(r.FilterStatus, r.PushStatus, pq.Array(r.Rejections), r.Indexer, r.FilterName, r.Protocol, r.Implementation, r.Timestamp, r.GroupID, r.TorrentID, r.TorrentName, r.Size, r.Raw, r.Title, r.Category, r.Season, r.Episode, r.Year, r.Resolution, r.Source, r.Codec, r.Container, r.HDR, r.Audio, r.Group, r.Region, r.Language, r.Edition, r.Unrated, r.Hybrid, r.Proper, r.Repack, r.Website, pq.Array(r.Artists), r.Type, r.Format, r.Bitrate, r.LogScore, r.HasLog, r.HasCue, r.IsScene, r.Origin, pq.Array(r.Tags), r.Freeleech, r.FreeleechPercent, r.Uploader, r.PreTime).
Columns("filter_status", "rejections", "indexer", "filter", "protocol", "implementation", "timestamp", "group_id", "torrent_id", "torrent_name", "size", "raw", "title", "category", "season", "episode", "year", "resolution", "source", "codec", "container", "hdr", "audio", "release_group", "region", "language", "edition", "unrated", "hybrid", "proper", "repack", "website", "artists", "type", "format", "bitrate", "log_score", "has_log", "has_cue", "is_scene", "origin", "tags", "freeleech", "freeleech_percent", "uploader", "pre_time").
Values(r.FilterStatus, pq.Array(r.Rejections), r.Indexer, r.FilterName, r.Protocol, r.Implementation, r.Timestamp, r.GroupID, r.TorrentID, r.TorrentName, r.Size, r.Raw, r.Title, r.Category, r.Season, r.Episode, r.Year, r.Resolution, r.Source, r.Codec, r.Container, r.HDR, r.Audio, r.Group, r.Region, r.Language, r.Edition, r.Unrated, r.Hybrid, r.Proper, r.Repack, r.Website, pq.Array(r.Artists), r.Type, r.Format, r.Bitrate, r.LogScore, r.HasLog, r.HasCue, r.IsScene, r.Origin, pq.Array(r.Tags), r.Freeleech, r.FreeleechPercent, r.Uploader, r.PreTime).
ToSql()
res, err := repo.db.ExecContext(ctx, query, args...)
@ -41,37 +41,42 @@ func (repo *ReleaseRepo) Store(ctx context.Context, r *domain.Release) (*domain.
return r, nil
}
func (repo *ReleaseRepo) UpdatePushStatus(ctx context.Context, id int64, status domain.ReleasePushStatus) error {
query, args, err := sq.Update("release").Set("push_status", status).Where("id = ?", id).ToSql()
func (repo *ReleaseRepo) StoreReleaseActionStatus(ctx context.Context, a *domain.ReleaseActionStatus) error {
_, err = repo.db.ExecContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("error updating status of release")
return err
if a.ID != 0 {
query, args, err := sq.
Update("release_action_status").
Set("status", a.Status).
Set("rejections", pq.Array(a.Rejections)).
Set("timestamp", time.Now().Format(time.RFC3339)).
Where("id = ?", a.ID).
Where("release_id = ?", a.ReleaseID).
ToSql()
_, err = repo.db.ExecContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("error updating status of release")
return err
}
} else {
query, args, err := sq.
Insert("release_action_status").
Columns("status", "action", "type", "rejections", "timestamp", "release_id").
Values(a.Status, a.Action, a.Type, pq.Array(a.Rejections), a.Timestamp, a.ReleaseID).
ToSql()
res, err := repo.db.ExecContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("error inserting status of release")
return err
}
resId, _ := res.LastInsertId()
a.ID = resId
}
log.Trace().Msgf("release.update_push_status: id %+v", id)
return nil
}
func (repo *ReleaseRepo) UpdatePushStatusRejected(ctx context.Context, id int64, rejections string) error {
r := []string{rejections}
query, args, err := sq.
Update("release").
Set("push_status", domain.ReleasePushStatusRejected).
Set("rejections", pq.Array(r)).
Where("id = ?", id).
ToSql()
_, err = repo.db.ExecContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("error updating status of release")
return err
}
log.Trace().Msgf("release.update_push_status_rejected: id %+v", id)
log.Trace().Msgf("release.store_release_action_status: %+v", a)
return nil
}
@ -79,7 +84,7 @@ func (repo *ReleaseRepo) UpdatePushStatusRejected(ctx context.Context, id int64,
func (repo *ReleaseRepo) Find(ctx context.Context, params domain.QueryParams) ([]domain.Release, int64, int64, error) {
queryBuilder := sq.
Select("id", "filter_status", "push_status", "rejections", "indexer", "filter", "protocol", "title", "torrent_name", "size", "timestamp", "COUNT() OVER() AS total_count").
Select("id", "filter_status", "rejections", "indexer", "filter", "protocol", "title", "torrent_name", "size", "timestamp", "COUNT() OVER() AS total_count").
From("release").
OrderBy("timestamp DESC")
@ -132,7 +137,7 @@ func (repo *ReleaseRepo) Find(ctx context.Context, params domain.QueryParams) ([
var indexer, filter sql.NullString
var timestamp string
if err := rows.Scan(&rls.ID, &rls.FilterStatus, &rls.PushStatus, pq.Array(&rls.Rejections), &indexer, &filter, &rls.Protocol, &rls.Title, &rls.TorrentName, &rls.Size, &timestamp, &countItems); err != nil {
if err := rows.Scan(&rls.ID, &rls.FilterStatus, pq.Array(&rls.Rejections), &indexer, &filter, &rls.Protocol, &rls.Title, &rls.TorrentName, &rls.Size, &timestamp, &countItems); err != nil {
log.Error().Stack().Err(err).Msg("release.find: error scanning data to struct")
return res, 0, 0, err
}
@ -143,6 +148,15 @@ func (repo *ReleaseRepo) Find(ctx context.Context, params domain.QueryParams) ([
ca, _ := time.Parse(time.RFC3339, timestamp)
rls.Timestamp = ca
// get action status
actionStatus, err := repo.GetActionStatusByReleaseID(ctx, rls.ID)
if err != nil {
log.Error().Stack().Err(err).Msg("release.find: error getting action status")
return res, 0, 0, err
}
rls.ActionStatus = actionStatus
res = append(res, rls)
}
@ -156,13 +170,57 @@ func (repo *ReleaseRepo) Find(ctx context.Context, params domain.QueryParams) ([
return res, nextCursor, countItems, nil
}
func (repo *ReleaseRepo) GetActionStatusByReleaseID(ctx context.Context, releaseID int64) ([]domain.ReleaseActionStatus, error) {
queryBuilder := sq.
Select("id", "status", "action", "type", "rejections", "timestamp").
From("release_action_status").
Where("release_id = ?", releaseID)
query, args, err := queryBuilder.ToSql()
res := make([]domain.ReleaseActionStatus, 0)
rows, err := repo.db.QueryContext(ctx, query, args...)
if err != nil {
log.Error().Stack().Err(err).Msg("error fetching releases")
return res, nil
}
defer rows.Close()
if err := rows.Err(); err != nil {
log.Error().Stack().Err(err)
return res, err
}
for rows.Next() {
var rls domain.ReleaseActionStatus
var timestamp string
if err := rows.Scan(&rls.ID, &rls.Status, &rls.Action, &rls.Type, pq.Array(&rls.Rejections), &timestamp); err != nil {
log.Error().Stack().Err(err).Msg("release.find: error scanning data to struct")
return res, err
}
ca, _ := time.Parse(time.RFC3339, timestamp)
rls.Timestamp = ca
res = append(res, rls)
}
return res, nil
}
func (repo *ReleaseRepo) Stats(ctx context.Context) (*domain.ReleaseStats, error) {
query := `SELECT
COUNT(*) total,
IFNULL(SUM(CASE WHEN push_status = 'PUSH_APPROVED' THEN 1 ELSE 0 END), 0) push_approved_count,
IFNULL(SUM(CASE WHEN push_status = 'PUSH_REJECTED' THEN 1 ELSE 0 END), 0) push_rejected_count,
IFNULL(SUM(CASE WHEN filter_status = 'FILTER_APPROVED' THEN 1 ELSE 0 END), 0) filtered_count,
IFNULL(SUM(CASE WHEN filter_status = 'FILTER_REJECTED' THEN 1 ELSE 0 END), 0) filter_rejected_count
query := `SELECT COUNT(*) total,
IFNULL(SUM(CASE WHEN filter_status = 'FILTER_APPROVED' THEN 1 ELSE 0 END), 0) filtered_count,
IFNULL(SUM(CASE WHEN filter_status = 'FILTER_REJECTED' THEN 1 ELSE 0 END), 0) filter_rejected_count,
(SELECT IFNULL(SUM(CASE WHEN status = 'PUSH_APPROVED' THEN 1 ELSE 0 END), 0)
FROM "release_action_status") AS push_approved_count,
(SELECT IFNULL(SUM(CASE WHEN status = 'PUSH_REJECTED' THEN 1 ELSE 0 END), 0)
FROM "release_action_status") AS push_rejected_count
FROM "release";`
row := repo.db.QueryRowContext(ctx, query)

View file

@ -27,15 +27,14 @@ import (
type ReleaseRepo interface {
Store(ctx context.Context, release *Release) (*Release, error)
Find(ctx context.Context, params QueryParams) (res []Release, nextCursor int64, count int64, err error)
GetActionStatusByReleaseID(ctx context.Context, releaseID int64) ([]ReleaseActionStatus, error)
Stats(ctx context.Context) (*ReleaseStats, error)
UpdatePushStatus(ctx context.Context, id int64, status ReleasePushStatus) error
UpdatePushStatusRejected(ctx context.Context, id int64, rejections string) error
StoreReleaseActionStatus(ctx context.Context, actionStatus *ReleaseActionStatus) error
}
type Release struct {
ID int64 `json:"id"`
FilterStatus ReleaseFilterStatus `json:"filter_status"`
PushStatus ReleasePushStatus `json:"push_status"`
Rejections []string `json:"rejections"`
Indexer string `json:"indexer"`
FilterName string `json:"filter"`
@ -89,6 +88,17 @@ type Release struct {
AdditionalSizeCheckRequired bool `json:"-"`
FilterID int `json:"-"`
Filter *Filter `json:"-"`
ActionStatus []ReleaseActionStatus `json:"action_status"`
}
type ReleaseActionStatus struct {
ID int64 `json:"id"`
Status ReleasePushStatus `json:"status"`
Action string `json:"action"`
Type ActionType `json:"type"`
Rejections []string `json:"rejections"`
Timestamp time.Time `json:"timestamp"`
ReleaseID int64 `json:"-"`
}
func NewRelease(indexer string, line string) (*Release, error) {
@ -96,7 +106,6 @@ func NewRelease(indexer string, line string) (*Release, error) {
Indexer: indexer,
Raw: line,
FilterStatus: ReleaseStatusFilterPending,
PushStatus: ReleasePushStatusPending,
Rejections: []string{},
Protocol: ReleaseProtocolTorrent,
Implementation: ReleaseImplementationIRC,

View file

@ -24,23 +24,14 @@ func NewSubscribers(eventbus EventBus.Bus, releaseSvc release.Service) Subscribe
}
func (s Subscriber) Register() {
s.eventbus.Subscribe("release:update-push-status", s.releaseUpdatePushStatus)
s.eventbus.Subscribe("release:update-push-status-rejected", s.releaseUpdatePushStatusRejected)
s.eventbus.Subscribe("release:store-action-status", s.releaseActionStatus)
}
func (s Subscriber) releaseUpdatePushStatus(id int64, status domain.ReleasePushStatus) {
log.Trace().Msgf("event: 'release:update-push-status' release ID '%v' update push status: '%v'", id, status)
func (s Subscriber) releaseActionStatus(actionStatus *domain.ReleaseActionStatus) {
log.Trace().Msgf("events: 'release:store-action-status' '%+v'", actionStatus)
err := s.releaseSvc.UpdatePushStatus(context.Background(), id, status)
err := s.releaseSvc.StoreReleaseActionStatus(context.Background(), actionStatus)
if err != nil {
log.Error().Err(err).Msgf("events: error")
}
}
func (s Subscriber) releaseUpdatePushStatusRejected(id int64, rejections string) {
log.Trace().Msgf("event: 'release:update-push-status-rejected' release ID '%v' update push status rejected rejections: '%v'", id, rejections)
err := s.releaseSvc.UpdatePushStatusRejected(context.Background(), id, rejections)
if err != nil {
log.Error().Err(err).Msgf("events: error")
log.Error().Err(err).Msgf("events: 'release:store-action-status' error")
}
}

View file

@ -14,8 +14,7 @@ type Service interface {
Find(ctx context.Context, query domain.QueryParams) (res []domain.Release, nextCursor int64, count int64, err error)
Stats(ctx context.Context) (*domain.ReleaseStats, error)
Store(ctx context.Context, release *domain.Release) error
UpdatePushStatus(ctx context.Context, id int64, status domain.ReleasePushStatus) error
UpdatePushStatusRejected(ctx context.Context, id int64, rejections string) error
StoreReleaseActionStatus(ctx context.Context, actionStatus *domain.ReleaseActionStatus) error
Process(release domain.Release) error
}
@ -36,6 +35,7 @@ func (s *service) Find(ctx context.Context, query domain.QueryParams) (res []dom
if err != nil {
return
}
return
}
@ -57,17 +57,8 @@ func (s *service) Store(ctx context.Context, release *domain.Release) error {
return nil
}
func (s *service) UpdatePushStatus(ctx context.Context, id int64, status domain.ReleasePushStatus) error {
err := s.repo.UpdatePushStatus(ctx, id, status)
if err != nil {
return err
}
return nil
}
func (s *service) UpdatePushStatusRejected(ctx context.Context, id int64, rejections string) error {
err := s.repo.UpdatePushStatusRejected(ctx, id, rejections)
func (s *service) StoreReleaseActionStatus(ctx context.Context, actionStatus *domain.ReleaseActionStatus) error {
err := s.repo.StoreReleaseActionStatus(ctx, actionStatus)
if err != nil {
return err
}