mirror of
https://github.com/idanoo/autobrr
synced 2025-07-22 16:29:12 +00:00
feat: add postgres support (#215)
* feat: add postgres support and refactor * feat: improve releases find * fix: autobrrctl create user
This commit is contained in:
parent
f6873e932e
commit
3185832708
30 changed files with 1708 additions and 831 deletions
|
@ -4,10 +4,9 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
"github.com/lib/pq"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/autobrr/autobrr/internal/domain"
|
||||
)
|
||||
|
||||
type ReleaseRepo struct {
|
||||
|
@ -19,42 +18,43 @@ func NewReleaseRepo(db *DB) domain.ReleaseRepo {
|
|||
}
|
||||
|
||||
func (repo *ReleaseRepo) Store(ctx context.Context, r *domain.Release) (*domain.Release, error) {
|
||||
//r.db.lock.RLock()
|
||||
//defer r.db.lock.RUnlock()
|
||||
|
||||
query, args, err := sq.
|
||||
queryBuilder := repo.db.squirrel.
|
||||
Insert("release").
|
||||
Columns("filter_status", "rejections", "indexer", "filter", "protocol", "implementation", "timestamp", "group_id", "torrent_id", "torrent_name", "size", "raw", "title", "category", "season", "episode", "year", "resolution", "source", "codec", "container", "hdr", "audio", "release_group", "region", "language", "edition", "unrated", "hybrid", "proper", "repack", "website", "artists", "type", "format", "quality", "log_score", "has_log", "has_cue", "is_scene", "origin", "tags", "freeleech", "freeleech_percent", "uploader", "pre_time").
|
||||
Values(r.FilterStatus, pq.Array(r.Rejections), r.Indexer, r.FilterName, r.Protocol, r.Implementation, r.Timestamp, r.GroupID, r.TorrentID, r.TorrentName, r.Size, r.Raw, r.Title, r.Category, r.Season, r.Episode, r.Year, r.Resolution, r.Source, r.Codec, r.Container, r.HDR, r.Audio, r.Group, r.Region, r.Language, r.Edition, r.Unrated, r.Hybrid, r.Proper, r.Repack, r.Website, pq.Array(r.Artists), r.Type, r.Format, r.Quality, r.LogScore, r.HasLog, r.HasCue, r.IsScene, r.Origin, pq.Array(r.Tags), r.Freeleech, r.FreeleechPercent, r.Uploader, r.PreTime).
|
||||
ToSql()
|
||||
Suffix("RETURNING id").RunWith(repo.db.handler)
|
||||
|
||||
res, err := repo.db.handler.ExecContext(ctx, query, args...)
|
||||
// return values
|
||||
var retID int64
|
||||
|
||||
err := queryBuilder.QueryRowContext(ctx).Scan(&retID)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error inserting release")
|
||||
log.Error().Stack().Err(err).Msg("release.store: error executing query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resId, _ := res.LastInsertId()
|
||||
r.ID = resId
|
||||
r.ID = retID
|
||||
|
||||
log.Trace().Msgf("release.store: %+v", r)
|
||||
log.Debug().Msgf("release.store: %+v", r)
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (repo *ReleaseRepo) StoreReleaseActionStatus(ctx context.Context, a *domain.ReleaseActionStatus) error {
|
||||
//r.db.lock.RLock()
|
||||
//defer r.db.lock.RUnlock()
|
||||
|
||||
if a.ID != 0 {
|
||||
query, args, err := sq.
|
||||
queryBuilder := repo.db.squirrel.
|
||||
Update("release_action_status").
|
||||
Set("status", a.Status).
|
||||
Set("rejections", pq.Array(a.Rejections)).
|
||||
Set("timestamp", a.Timestamp).
|
||||
Where("id = ?", a.ID).
|
||||
Where("release_id = ?", a.ReleaseID).
|
||||
ToSql()
|
||||
Where("release_id = ?", a.ReleaseID)
|
||||
|
||||
query, args, err := queryBuilder.ToSql()
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("release.store: error building query")
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = repo.db.handler.ExecContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
|
@ -63,20 +63,22 @@ func (repo *ReleaseRepo) StoreReleaseActionStatus(ctx context.Context, a *domain
|
|||
}
|
||||
|
||||
} else {
|
||||
query, args, err := sq.
|
||||
queryBuilder := repo.db.squirrel.
|
||||
Insert("release_action_status").
|
||||
Columns("status", "action", "type", "rejections", "timestamp", "release_id").
|
||||
Values(a.Status, a.Action, a.Type, pq.Array(a.Rejections), a.Timestamp, a.ReleaseID).
|
||||
ToSql()
|
||||
Suffix("RETURNING id").RunWith(repo.db.handler)
|
||||
|
||||
res, err := repo.db.handler.ExecContext(ctx, query, args...)
|
||||
// return values
|
||||
var retID int64
|
||||
|
||||
err := queryBuilder.QueryRowContext(ctx).Scan(&retID)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error inserting status of release")
|
||||
log.Error().Stack().Err(err).Msg("release.storeReleaseActionStatus: error executing query")
|
||||
return err
|
||||
}
|
||||
|
||||
resId, _ := res.LastInsertId()
|
||||
a.ID = resId
|
||||
a.ID = retID
|
||||
}
|
||||
|
||||
log.Trace().Msgf("release.store_release_action_status: %+v", a)
|
||||
|
@ -84,12 +86,32 @@ func (repo *ReleaseRepo) StoreReleaseActionStatus(ctx context.Context, a *domain
|
|||
return nil
|
||||
}
|
||||
|
||||
func (repo *ReleaseRepo) Find(ctx context.Context, params domain.ReleaseQueryParams) ([]domain.Release, int64, int64, error) {
|
||||
//r.db.lock.RLock()
|
||||
//defer r.db.lock.RUnlock()
|
||||
func (repo *ReleaseRepo) Find(ctx context.Context, params domain.ReleaseQueryParams) ([]*domain.Release, int64, int64, error) {
|
||||
tx, err := repo.db.BeginTx(ctx, &sql.TxOptions{})
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
queryBuilder := sq.
|
||||
Select("r.id", "r.filter_status", "r.rejections", "r.indexer", "r.filter", "r.protocol", "r.title", "r.torrent_name", "r.size", "r.timestamp", "COUNT() OVER() AS total_count").
|
||||
releases, nextCursor, total, err := repo.findReleases(ctx, tx, params)
|
||||
if err != nil {
|
||||
return nil, nextCursor, total, err
|
||||
}
|
||||
|
||||
for _, release := range releases {
|
||||
statuses, err := repo.attachActionStatus(ctx, tx, release.ID)
|
||||
if err != nil {
|
||||
return releases, nextCursor, total, err
|
||||
}
|
||||
release.ActionStatus = statuses
|
||||
}
|
||||
|
||||
return releases, nextCursor, total, nil
|
||||
}
|
||||
|
||||
func (repo *ReleaseRepo) findReleases(ctx context.Context, tx *Tx, params domain.ReleaseQueryParams) ([]*domain.Release, int64, int64, error) {
|
||||
queryBuilder := repo.db.squirrel.
|
||||
Select("r.id", "r.filter_status", "r.rejections", "r.indexer", "r.filter", "r.protocol", "r.title", "r.torrent_name", "r.size", "r.timestamp", "COUNT(*) OVER() AS total_count").
|
||||
From("release r").
|
||||
OrderBy("r.timestamp DESC")
|
||||
|
||||
|
@ -123,9 +145,9 @@ func (repo *ReleaseRepo) Find(ctx context.Context, params domain.ReleaseQueryPar
|
|||
query, args, err := queryBuilder.ToSql()
|
||||
log.Trace().Str("database", "release.find").Msgf("query: '%v', args: '%v'", query, args)
|
||||
|
||||
res := make([]domain.Release, 0)
|
||||
res := make([]*domain.Release, 0)
|
||||
|
||||
rows, err := repo.db.handler.QueryContext(ctx, query, args...)
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error fetching releases")
|
||||
return res, 0, 0, nil
|
||||
|
@ -153,36 +175,21 @@ func (repo *ReleaseRepo) Find(ctx context.Context, params domain.ReleaseQueryPar
|
|||
rls.Indexer = indexer.String
|
||||
rls.FilterName = filter.String
|
||||
|
||||
// get action status
|
||||
actionStatus, err := repo.GetActionStatusByReleaseID(ctx, rls.ID)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("release.find: error getting action status")
|
||||
return res, 0, 0, err
|
||||
}
|
||||
|
||||
rls.ActionStatus = actionStatus
|
||||
|
||||
res = append(res, rls)
|
||||
res = append(res, &rls)
|
||||
}
|
||||
|
||||
nextCursor := int64(0)
|
||||
if len(res) > 0 {
|
||||
lastID := res[len(res)-1].ID
|
||||
nextCursor = lastID
|
||||
//nextCursor, _ = strconv.ParseInt(lastID, 10, 64)
|
||||
}
|
||||
|
||||
return res, nextCursor, countItems, nil
|
||||
}
|
||||
|
||||
func (repo *ReleaseRepo) GetIndexerOptions(ctx context.Context) ([]string, error) {
|
||||
//r.db.lock.RLock()
|
||||
//defer r.db.lock.RUnlock()
|
||||
|
||||
query := `
|
||||
SELECT DISTINCT indexer FROM "release"
|
||||
UNION
|
||||
SELECT DISTINCT identifier indexer FROM indexer;`
|
||||
query := `SELECT DISTINCT indexer FROM "release" UNION SELECT DISTINCT identifier indexer FROM indexer;`
|
||||
|
||||
log.Trace().Str("database", "release.get_indexers").Msgf("query: '%v'", query)
|
||||
|
||||
|
@ -216,10 +223,8 @@ func (repo *ReleaseRepo) GetIndexerOptions(ctx context.Context) ([]string, error
|
|||
}
|
||||
|
||||
func (repo *ReleaseRepo) GetActionStatusByReleaseID(ctx context.Context, releaseID int64) ([]domain.ReleaseActionStatus, error) {
|
||||
//r.db.lock.RLock()
|
||||
//defer r.db.lock.RUnlock()
|
||||
|
||||
queryBuilder := sq.
|
||||
queryBuilder := repo.db.squirrel.
|
||||
Select("id", "status", "action", "type", "rejections", "timestamp").
|
||||
From("release_action_status").
|
||||
Where("release_id = ?", releaseID)
|
||||
|
@ -255,16 +260,52 @@ func (repo *ReleaseRepo) GetActionStatusByReleaseID(ctx context.Context, release
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (repo *ReleaseRepo) attachActionStatus(ctx context.Context, tx *Tx, releaseID int64) ([]domain.ReleaseActionStatus, error) {
|
||||
|
||||
queryBuilder := repo.db.squirrel.
|
||||
Select("id", "status", "action", "type", "rejections", "timestamp").
|
||||
From("release_action_status").
|
||||
Where("release_id = ?", releaseID)
|
||||
|
||||
query, args, err := queryBuilder.ToSql()
|
||||
|
||||
res := make([]domain.ReleaseActionStatus, 0)
|
||||
|
||||
rows, err := tx.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
log.Error().Stack().Err(err).Msg("error fetching releases")
|
||||
return res, nil
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Error().Stack().Err(err)
|
||||
return res, err
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var rls domain.ReleaseActionStatus
|
||||
|
||||
if err := rows.Scan(&rls.ID, &rls.Status, &rls.Action, &rls.Type, pq.Array(&rls.Rejections), &rls.Timestamp); err != nil {
|
||||
log.Error().Stack().Err(err).Msg("release.find: error scanning data to struct")
|
||||
return res, err
|
||||
}
|
||||
|
||||
res = append(res, rls)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (repo *ReleaseRepo) Stats(ctx context.Context) (*domain.ReleaseStats, error) {
|
||||
//r.db.lock.RLock()
|
||||
//defer r.db.lock.RUnlock()
|
||||
|
||||
query := `SELECT COUNT(*) total,
|
||||
IFNULL(SUM(CASE WHEN filter_status = 'FILTER_APPROVED' THEN 1 ELSE 0 END), 0) filtered_count,
|
||||
IFNULL(SUM(CASE WHEN filter_status = 'FILTER_REJECTED' THEN 1 ELSE 0 END), 0) filter_rejected_count,
|
||||
(SELECT IFNULL(SUM(CASE WHEN status = 'PUSH_APPROVED' THEN 1 ELSE 0 END), 0)
|
||||
COALESCE(SUM(CASE WHEN filter_status = 'FILTER_APPROVED' THEN 1 ELSE 0 END), 0) AS filtered_count,
|
||||
COALESCE(SUM(CASE WHEN filter_status = 'FILTER_REJECTED' THEN 1 ELSE 0 END), 0) AS filter_rejected_count,
|
||||
(SELECT COALESCE(SUM(CASE WHEN status = 'PUSH_APPROVED' THEN 1 ELSE 0 END), 0)
|
||||
FROM "release_action_status") AS push_approved_count,
|
||||
(SELECT IFNULL(SUM(CASE WHEN status = 'PUSH_REJECTED' THEN 1 ELSE 0 END), 0)
|
||||
(SELECT COALESCE(SUM(CASE WHEN status = 'PUSH_REJECTED' THEN 1 ELSE 0 END), 0)
|
||||
FROM "release_action_status") AS push_rejected_count
|
||||
FROM "release";`
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue