From bc064e15e25c534820da69b44b99f1399e79b621 Mon Sep 17 00:00:00 2001 From: Ludvig Lundgren Date: Sat, 25 Dec 2021 17:44:50 +0100 Subject: [PATCH] fix: announce parse queue (#50) --- internal/{irc => announce}/announce.go | 12 ++++++++---- internal/irc/handler.go | 7 ++++--- 2 files changed, 12 insertions(+), 7 deletions(-) rename internal/{irc => announce}/announce.go (96%) diff --git a/internal/irc/announce.go b/internal/announce/announce.go similarity index 96% rename from internal/irc/announce.go rename to internal/announce/announce.go index c797196..e6a114f 100644 --- a/internal/irc/announce.go +++ b/internal/announce/announce.go @@ -1,4 +1,4 @@ -package irc +package announce import ( "bytes" @@ -50,6 +50,7 @@ func (a *announceProcessor) setupQueues() { channel = strings.ToLower(channel) queues[channel] = make(chan string, 128) + log.Trace().Msgf("announce: setup queue: %v", channel) } a.queues = queues @@ -58,7 +59,9 @@ func (a *announceProcessor) setupQueues() { func (a *announceProcessor) setupQueueConsumers() { for queueName, queue := range a.queues { go func(name string, q chan string) { + log.Trace().Msgf("announce: setup queue consumer: %v", name) a.processQueue(q) + log.Trace().Msgf("announce: queue consumer stopped: %v", name) }(queueName, queue) } } @@ -95,6 +98,7 @@ func (a *announceProcessor) processQueue(queue chan string) { } if parseFailed { + log.Trace().Msg("announce: parse failed") continue } @@ -151,12 +155,12 @@ func (a *announceProcessor) processQueue(queue chan string) { log.Info().Msgf("Matched '%v' (%v) for %v", newRelease.TorrentName, newRelease.Filter.Name, newRelease.Indexer) // process release - go func() { - err = a.releaseSvc.Process(*newRelease) + go func(rel *domain.Release) { + err = a.releaseSvc.Process(*rel) if err != nil { log.Error().Err(err).Msgf("could not process release: %+v", newRelease) } - }() + }(newRelease) } } diff --git a/internal/irc/handler.go b/internal/irc/handler.go index f0257b4..95e5df5 100644 --- a/internal/irc/handler.go +++ b/internal/irc/handler.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/autobrr/autobrr/internal/announce" "github.com/autobrr/autobrr/internal/domain" "github.com/autobrr/autobrr/internal/filter" "github.com/autobrr/autobrr/internal/release" @@ -34,7 +35,7 @@ type Handler struct { network *domain.IrcNetwork filterService filter.Service releaseService release.Service - announceProcessors map[string]Processor + announceProcessors map[string]announce.Processor definitions []domain.IndexerDefinition client *irc.Client @@ -60,7 +61,7 @@ func NewHandler(network domain.IrcNetwork, filterService filter.Service, release filterService: filterService, releaseService: releaseService, definitions: definitions, - announceProcessors: map[string]Processor{}, + announceProcessors: map[string]announce.Processor{}, validAnnouncers: map[string]struct{}{}, } @@ -71,7 +72,7 @@ func NewHandler(network domain.IrcNetwork, filterService filter.Service, release for _, channel := range definition.IRC.Channels { channel = strings.ToLower(channel) - h.announceProcessors[channel] = NewAnnounceProcessor(definition, filterService, releaseService) + h.announceProcessors[channel] = announce.NewAnnounceProcessor(definition, filterService, releaseService) } // create map of valid announcers