From ccabe96bdff844bcc58c29ace9449131a8d7a1aa Mon Sep 17 00:00:00 2001 From: ze0s <43699394+zze0s@users.noreply.github.com> Date: Sun, 21 May 2023 15:51:40 +0200 Subject: [PATCH] feat(irc): view announces per channel (#948) * feat(irc): add sse to handler * feat(irc): view and send irc messages per network * refactor(irc): use id as handlerkey * refactor(irc): use id as handlerkey * feat(web): add irc context * refactor: create sse stream per network channel * fix(irc): remove non-working wildcard callback handler * feat: use fork of sse * chore(deps): update ergo/irc-go to v0.3.0 * fix: clean irc msg before sse publish * feat: add view channel button * feat: styling improvements * feat: show time --- cmd/autobrr/main.go | 5 +- go.mod | 4 +- go.sum | 9 +- internal/database/irc.go | 2 +- internal/domain/irc.go | 34 +++++- internal/http/irc.go | 82 ++++++++++--- internal/http/server.go | 2 +- internal/irc/handler.go | 40 +++++-- internal/irc/service.go | 143 ++++++++++++++-------- test/mockindexer/irc/handlers.go | 2 + web/src/api/APIClient.ts | 4 +- web/src/screens/settings/Irc.tsx | 198 +++++++++++++++++++++++++------ web/src/types/Irc.d.ts | 8 ++ web/src/utils/Context.ts | 38 +++++- 14 files changed, 446 insertions(+), 125 deletions(-) diff --git a/cmd/autobrr/main.go b/cmd/autobrr/main.go index da3be2a..9c6c022 100644 --- a/cmd/autobrr/main.go +++ b/cmd/autobrr/main.go @@ -56,8 +56,7 @@ func main() { // setup server-sent-events serverEvents := sse.New() - serverEvents.AutoReplay = false - serverEvents.CreateStream("logs") + serverEvents.CreateStreamWithOpts("logs", sse.StreamOpts{MaxEntries: 1000, AutoReplay: true}) // register SSE hook on logger log.RegisterSSEHook(serverEvents) @@ -107,7 +106,7 @@ func main() { indexerService = indexer.NewService(log, cfg.Config, indexerRepo, indexerAPIService, schedulingService) filterService = filter.NewService(log, filterRepo, actionRepo, releaseRepo, indexerAPIService, indexerService) releaseService = release.NewService(log, releaseRepo, actionService, filterService) - ircService = irc.NewService(log, ircRepo, releaseService, indexerService, notificationService) + ircService = irc.NewService(log, serverEvents, ircRepo, releaseService, indexerService, notificationService) feedService = feed.NewService(log, feedRepo, feedCacheRepo, releaseService, schedulingService) ) diff --git a/go.mod b/go.mod index 155fb83..803e939 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/autobrr/autobrr go 1.20 +replace github.com/r3labs/sse/v2 => github.com/autobrr/sse/v2 v2.0.0-20230520125637-530e06346d7d + require ( github.com/Masterminds/sprig/v3 v3.2.2 github.com/Masterminds/squirrel v1.5.3 @@ -13,7 +15,7 @@ require ( github.com/avast/retry-go v3.0.0+incompatible github.com/dcarbone/zadapters/zstdlog v0.3.1 github.com/dustin/go-humanize v1.0.0 - github.com/ergochat/irc-go v0.2.0 + github.com/ergochat/irc-go v0.3.0 github.com/fsnotify/fsnotify v1.6.0 github.com/go-chi/chi/v5 v5.0.7 github.com/go-chi/render v1.0.2 diff --git a/go.sum b/go.sum index ab5ade9..a9bc241 100644 --- a/go.sum +++ b/go.sum @@ -89,14 +89,14 @@ github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9Pq github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= -github.com/autobrr/go-deluge v1.0.0 h1:dGpA3elktcH8eQ5ctzpU5dTjzR0eOBOnrhTZEGPVv6E= -github.com/autobrr/go-deluge v1.0.0/go.mod h1:ndiXT1eHWv/ATNk9TpE8GHIs8OSSUnsImt4Syk+y5LM= github.com/autobrr/go-deluge v1.0.1 h1:JH2RJnWWaYvN/29KuajtRXS/HxQhAxOk4yq54pjnn68= github.com/autobrr/go-deluge v1.0.1/go.mod h1:ndiXT1eHWv/ATNk9TpE8GHIs8OSSUnsImt4Syk+y5LM= github.com/autobrr/go-qbittorrent v1.3.2 h1:Jk6W4bGUAlmFmSefiK1jwAvITy6sGe6GG3zIP8gOJVg= github.com/autobrr/go-qbittorrent v1.3.2/go.mod h1:z88B3+O/1/3doQABErvIOOxE4hjpmIpulu6XzDG/q78= github.com/autobrr/go-rtorrent v1.0.1 h1:KbSBGcgsThYs4qHBYyFlgSOhDhfRXkJoAxVkB0atIzg= github.com/autobrr/go-rtorrent v1.0.1/go.mod h1:1CyQ2tcLOGP+p9drOqFiVPb/+QvfExMPCHnEGQd0BmM= +github.com/autobrr/sse/v2 v2.0.0-20230520125637-530e06346d7d h1:9EGCYgeugAVWLBAtjHC7AFnXSwUdYfCB98WaOgdDREE= +github.com/autobrr/sse/v2 v2.0.0-20230520125637-530e06346d7d/go.mod h1:zCozZ9lp4DE340T2+wfMPL/eoQwLVIGDOCKCDEFwTQU= github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= @@ -140,6 +140,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ergochat/irc-go v0.2.0 h1:3vHdy4c56UTY6+/rTBrQc1fmt32N5G8PrEZacJDOr+E= github.com/ergochat/irc-go v0.2.0/go.mod h1:2vi7KNpIPWnReB5hmLpl92eMywQvuIeIIGdt/FQCph0= +github.com/ergochat/irc-go v0.3.0 h1:qgvb2knh8d6yIVsHX+PRQ2CiRj1NGG5x88ABmR1lWng= +github.com/ergochat/irc-go v0.3.0/go.mod h1:2vi7KNpIPWnReB5hmLpl92eMywQvuIeIIGdt/FQCph0= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= @@ -360,8 +362,6 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/r3labs/sse/v2 v2.8.1 h1:lZH+W4XOLIq88U5MIHOsLec7+R62uhz3bIi2yn0Sg8o= -github.com/r3labs/sse/v2 v2.8.1/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= @@ -498,7 +498,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= diff --git a/internal/database/irc.go b/internal/database/irc.go index 5afe666..bf10e1a 100644 --- a/internal/database/irc.go +++ b/internal/database/irc.go @@ -447,7 +447,7 @@ func (r *IrcRepo) StoreNetworkChannels(ctx context.Context, networkID int64, cha return nil } -func (r *IrcRepo) StoreChannel(networkID int64, channel *domain.IrcChannel) error { +func (r *IrcRepo) StoreChannel(ctx context.Context, networkID int64, channel *domain.IrcChannel) error { pass := toNullString(channel.Password) var err error diff --git a/internal/domain/irc.go b/internal/domain/irc.go index 4b80fdc..5c6a83d 100644 --- a/internal/domain/irc.go +++ b/internal/domain/irc.go @@ -5,6 +5,7 @@ package domain import ( "context" + "encoding/json" "time" ) @@ -85,10 +86,41 @@ type ChannelHealth struct { LastAnnounce time.Time `json:"last_announce"` } +type SendIrcCmdRequest struct { + NetworkId int64 `json:"network_id"` + Server string `json:"server"` + Channel string `json:"channel"` + Nick string `json:"nick"` + Message string `json:"msg"` +} + +type IrcMessage struct { + Channel string `json:"channel"` + Nick string `json:"nick"` + Message string `json:"msg"` + Time time.Time `json:"time"` +} + +func (m IrcMessage) ToJsonString() string { + j, err := json.Marshal(m) + if err != nil { + return "" + } + return string(j) +} + +func (m IrcMessage) Bytes() []byte { + j, err := json.Marshal(m) + if err != nil { + return nil + } + return j +} + type IrcRepo interface { StoreNetwork(network *IrcNetwork) error UpdateNetwork(ctx context.Context, network *IrcNetwork) error - StoreChannel(networkID int64, channel *IrcChannel) error + StoreChannel(ctx context.Context, networkID int64, channel *IrcChannel) error UpdateChannel(channel *IrcChannel) error UpdateInviteCommand(networkID int64, invite string) error StoreNetworkChannels(ctx context.Context, networkID int64, channels []IrcChannel) error diff --git a/internal/http/irc.go b/internal/http/irc.go index 03ad78a..a62c040 100644 --- a/internal/http/irc.go +++ b/internal/http/irc.go @@ -9,9 +9,10 @@ import ( "net/http" "strconv" - "github.com/go-chi/chi/v5" - "github.com/autobrr/autobrr/internal/domain" + + "github.com/go-chi/chi/v5" + "github.com/r3labs/sse/v2" ) type ircService interface { @@ -21,18 +22,22 @@ type ircService interface { GetNetworkByID(ctx context.Context, id int64) (*domain.IrcNetwork, error) StoreNetwork(ctx context.Context, network *domain.IrcNetwork) error UpdateNetwork(ctx context.Context, network *domain.IrcNetwork) error - StoreChannel(networkID int64, channel *domain.IrcChannel) error + StoreChannel(ctx context.Context, networkID int64, channel *domain.IrcChannel) error RestartNetwork(ctx context.Context, id int64) error + SendCmd(ctx context.Context, req *domain.SendIrcCmdRequest) error } type ircHandler struct { encoder encoder + sse *sse.Server + service ircService } -func newIrcHandler(encoder encoder, service ircService) *ircHandler { +func newIrcHandler(encoder encoder, sse *sse.Server, service ircService) *ircHandler { return &ircHandler{ encoder: encoder, + sse: sse, service: service, } } @@ -40,11 +45,29 @@ func newIrcHandler(encoder encoder, service ircService) *ircHandler { func (h ircHandler) Routes(r chi.Router) { r.Get("/", h.listNetworks) r.Post("/", h.storeNetwork) - r.Put("/network/{networkID}", h.updateNetwork) - r.Post("/network/{networkID}/channel", h.storeChannel) - r.Get("/network/{networkID}/restart", h.restartNetwork) - r.Get("/network/{networkID}", h.getNetworkByID) - r.Delete("/network/{networkID}", h.deleteNetwork) + + r.Route("/network/{networkID}", func(r chi.Router) { + r.Put("/", h.updateNetwork) + r.Get("/", h.getNetworkByID) + r.Delete("/", h.deleteNetwork) + + r.Post("/cmd", h.sendCmd) + r.Post("/channel", h.storeChannel) + r.Get("/restart", h.restartNetwork) + }) + + r.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { + + // inject CORS headers to bypass checks + h.sse.Headers = map[string]string{ + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + + h.sse.ServeHTTP(w, r) + }) } func (h ircHandler) listNetworks(w http.ResponseWriter, r *http.Request) { @@ -97,8 +120,7 @@ func (h ircHandler) storeNetwork(w http.ResponseWriter, r *http.Request) { return } - err := h.service.StoreNetwork(r.Context(), &data) - if err != nil { + if err := h.service.StoreNetwork(r.Context(), &data); err != nil { h.encoder.Error(w, err) return } @@ -117,8 +139,7 @@ func (h ircHandler) updateNetwork(w http.ResponseWriter, r *http.Request) { return } - err := h.service.UpdateNetwork(ctx, &data) - if err != nil { + if err := h.service.UpdateNetwork(ctx, &data); err != nil { h.encoder.Error(w, err) return } @@ -126,10 +147,11 @@ func (h ircHandler) updateNetwork(w http.ResponseWriter, r *http.Request) { h.encoder.NoContent(w) } -func (h ircHandler) storeChannel(w http.ResponseWriter, r *http.Request) { +func (h ircHandler) sendCmd(w http.ResponseWriter, r *http.Request) { var ( - data domain.IrcChannel + ctx = r.Context() networkID = chi.URLParam(r, "networkID") + data domain.SendIrcCmdRequest ) id, _ := strconv.Atoi(networkID) @@ -139,8 +161,31 @@ func (h ircHandler) storeChannel(w http.ResponseWriter, r *http.Request) { return } - err := h.service.StoreChannel(int64(id), &data) - if err != nil { + data.NetworkId = int64(id) + + if err := h.service.SendCmd(ctx, &data); err != nil { + h.encoder.Error(w, err) + return + } + + h.encoder.NoContent(w) +} + +func (h ircHandler) storeChannel(w http.ResponseWriter, r *http.Request) { + var ( + ctx = r.Context() + networkID = chi.URLParam(r, "networkID") + data domain.IrcChannel + ) + + id, _ := strconv.Atoi(networkID) + + if err := json.NewDecoder(r.Body).Decode(&data); err != nil { + h.encoder.Error(w, err) + return + } + + if err := h.service.StoreChannel(ctx, int64(id), &data); err != nil { h.encoder.Error(w, err) return } @@ -156,8 +201,7 @@ func (h ircHandler) deleteNetwork(w http.ResponseWriter, r *http.Request) { id, _ := strconv.Atoi(networkID) - err := h.service.DeleteNetwork(ctx, int64(id)) - if err != nil { + if err := h.service.DeleteNetwork(ctx, int64(id)); err != nil { h.encoder.Error(w, err) return } diff --git a/internal/http/server.go b/internal/http/server.go index 80e9956..3fef0c6 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -121,7 +121,7 @@ func (s Server) Handler() http.Handler { r.Route("/download_clients", newDownloadClientHandler(encoder, s.downloadClientService).Routes) r.Route("/filters", newFilterHandler(encoder, s.filterService).Routes) r.Route("/feeds", newFeedHandler(encoder, s.feedService).Routes) - r.Route("/irc", newIrcHandler(encoder, s.ircService).Routes) + r.Route("/irc", newIrcHandler(encoder, s.sse, s.ircService).Routes) r.Route("/indexer", newIndexerHandler(encoder, s.indexerService, s.ircService).Routes) r.Route("/keys", newAPIKeyHandler(encoder, s.apiService).Routes) r.Route("/logs", newLogsHandler(s.config).Routes) diff --git a/internal/irc/handler.go b/internal/irc/handler.go index 875b5f7..9e6aacf 100644 --- a/internal/irc/handler.go +++ b/internal/irc/handler.go @@ -20,6 +20,7 @@ import ( "github.com/ergochat/irc-go/ircevent" "github.com/ergochat/irc-go/ircfmt" "github.com/ergochat/irc-go/ircmsg" + "github.com/r3labs/sse/v2" "github.com/rs/zerolog" "github.com/sasha-s/go-deadlock" ) @@ -59,6 +60,7 @@ func (ch *channelHealth) resetMonitoring() { type Handler struct { log zerolog.Logger + sse *sse.Server network *domain.IrcNetwork releaseSvc release.Service notificationService notification.Service @@ -83,9 +85,10 @@ type Handler struct { saslauthed bool } -func NewHandler(log zerolog.Logger, network domain.IrcNetwork, definitions []*domain.IndexerDefinition, releaseSvc release.Service, notificationSvc notification.Service) *Handler { +func NewHandler(log zerolog.Logger, sse *sse.Server, network domain.IrcNetwork, definitions []*domain.IndexerDefinition, releaseSvc release.Service, notificationSvc notification.Service) *Handler { h := &Handler{ log: log.With().Str("network", network.Server).Logger(), + sse: sse, client: nil, network: &network, releaseSvc: releaseSvc, @@ -186,6 +189,7 @@ func (h *Handler) Run() error { h.client.AddConnectCallback(h.onConnect) h.client.AddDisconnectCallback(h.onDisconnect) + h.client.AddCallback("MODE", h.handleMode) h.client.AddCallback("INVITE", h.handleInvite) h.client.AddCallback("366", h.handleJoined) @@ -538,30 +542,41 @@ func (h *Handler) onNick(msg ircmsg.Message) { } } +func (h *Handler) publishSSEMsg(msg domain.IrcMessage) { + h.sse.Publish(fmt.Sprintf("%d%s", h.network.ID, strings.TrimPrefix(msg.Channel, "#")), &sse.Event{ + Data: msg.Bytes(), + }) +} + // onMessage handles PRIVMSG events func (h *Handler) onMessage(msg ircmsg.Message) { + if len(msg.Params) < 2 { return } // parse announce - announcer := msg.Nick() + nick := msg.Nick() channel := msg.Params[0] message := msg.Params[1] + // clean message + cleanedMsg := h.cleanMessage(message) + + h.log.Debug().Str("channel", channel).Str("nick", nick).Msg(cleanedMsg) + + // publish to SSE stream + h.publishSSEMsg(domain.IrcMessage{Channel: channel, Nick: nick, Message: cleanedMsg, Time: time.Now()}) + // check if message is from a valid channel, if not return if validChannel := h.isValidChannel(channel); !validChannel { return } // check if message is from announce bot, if not return - if validAnnouncer := h.isValidAnnouncer(announcer); !validAnnouncer { + if validAnnouncer := h.isValidAnnouncer(nick); !validAnnouncer { return } - // clean message - cleanedMsg := h.cleanMessage(message) - h.log.Debug().Str("channel", channel).Str("user", announcer).Msgf("%v", cleanedMsg) - if err := h.sendToAnnounceProcessor(channel, cleanedMsg); err != nil { h.log.Error().Stack().Err(err).Msgf("could not queue line: %v", cleanedMsg) return @@ -819,6 +834,17 @@ func (h *Handler) handleMode(msg ircmsg.Message) { return } +func (h *Handler) SendMsg(channel, msg string) error { + h.log.Debug().Msgf("sending msg command: %s", msg) + + if err := h.client.Privmsg(channel, msg); err != nil { + h.log.Error().Stack().Err(err).Msgf("error sending msg: %v", msg) + return err + } + + return nil +} + // check if announcer is one from the list in the definition func (h *Handler) isValidAnnouncer(nick string) bool { h.m.RLock() diff --git a/internal/irc/service.go b/internal/irc/service.go index ff4c5bf..12c0b5b 100644 --- a/internal/irc/service.go +++ b/internal/irc/service.go @@ -5,6 +5,7 @@ package irc import ( "context" + "fmt" "strings" "sync" "time" @@ -16,13 +17,16 @@ import ( "github.com/autobrr/autobrr/internal/release" "github.com/autobrr/autobrr/pkg/errors" + "github.com/r3labs/sse/v2" "github.com/rs/zerolog" ) type Service interface { StartHandlers() StopHandlers() - StopNetwork(key handlerKey) error + StopNetwork(id int64) error + StopAndRemoveNetwork(id int64) error + StopNetworkIfRunning(id int64) error RestartNetwork(ctx context.Context, id int64) error ListNetworks(ctx context.Context) ([]domain.IrcNetwork, error) GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetworkWithHealth, error) @@ -30,38 +34,39 @@ type Service interface { DeleteNetwork(ctx context.Context, id int64) error StoreNetwork(ctx context.Context, network *domain.IrcNetwork) error UpdateNetwork(ctx context.Context, network *domain.IrcNetwork) error - StoreChannel(networkID int64, channel *domain.IrcChannel) error + StoreChannel(ctx context.Context, networkID int64, channel *domain.IrcChannel) error + SendCmd(ctx context.Context, req *domain.SendIrcCmdRequest) error } type service struct { - stopWG sync.WaitGroup - lock sync.RWMutex + log zerolog.Logger + sse *sse.Server - log zerolog.Logger repo domain.IrcRepo releaseService release.Service indexerService indexer.Service notificationService notification.Service indexerMap map[string]string - handlers map[handlerKey]*Handler + handlers map[int64]*Handler + + stopWG sync.WaitGroup + lock sync.RWMutex } -func NewService(log logger.Logger, repo domain.IrcRepo, releaseSvc release.Service, indexerSvc indexer.Service, notificationSvc notification.Service) Service { +const sseMaxEntries = 1000 + +func NewService(log logger.Logger, sse *sse.Server, repo domain.IrcRepo, releaseSvc release.Service, indexerSvc indexer.Service, notificationSvc notification.Service) Service { return &service{ log: log.With().Str("module", "irc").Logger(), + sse: sse, repo: repo, releaseService: releaseSvc, indexerService: indexerSvc, notificationService: notificationSvc, - handlers: make(map[handlerKey]*Handler), + handlers: make(map[int64]*Handler), } } -type handlerKey struct { - server string - nick string -} - func (s *service) StartHandlers() { networks, err := s.repo.FindActiveNetworks(context.Background()) if err != nil { @@ -73,25 +78,28 @@ func (s *service) StartHandlers() { continue } - // check if already in handlers - //v, ok := s.handlers[network.Name] - - s.lock.Lock() channels, err := s.repo.ListChannels(network.ID) if err != nil { s.log.Error().Err(err).Msgf("failed to list channels for network %q", network.Server) } - network.Channels = channels + + for _, channel := range channels { + // setup SSE stream per channel + s.createSSEStream(network.ID, channel.Name) + } // find indexer definitions for network and add definitions := s.indexerService.GetIndexersByIRCNetwork(network.Server) + s.lock.Lock() + network.Channels = channels + // init new irc handler - handler := NewHandler(s.log, network, definitions, s.releaseService, s.notificationService) + handler := NewHandler(s.log, s.sse, network, definitions, s.releaseService, s.notificationService) // use network.Server + nick to use multiple indexers with different nick per network // this allows for multiple handlers to one network - s.handlers[handlerKey{network.Server, network.Nick}] = handler + s.handlers[network.ID] = handler s.lock.Unlock() s.log.Debug().Msgf("starting network: %+v", network.Name) @@ -115,7 +123,7 @@ func (s *service) StopHandlers() { func (s *service) startNetwork(network domain.IrcNetwork) error { // look if we have the network in handlers already, if so start it - if existingHandler, found := s.handlers[handlerKey{network.Server, network.Nick}]; found { + if existingHandler, found := s.handlers[network.ID]; found { s.log.Debug().Msgf("starting network: %+v", network.Name) if !existingHandler.client.Connected() { @@ -127,21 +135,26 @@ func (s *service) startNetwork(network domain.IrcNetwork) error { } } else { // if not found in handlers, lets add it and run it - - s.lock.Lock() channels, err := s.repo.ListChannels(network.ID) if err != nil { s.log.Error().Err(err).Msgf("failed to list channels for network %q", network.Server) } - network.Channels = channels + + for _, channel := range channels { + // setup SSE stream per channel + s.createSSEStream(network.ID, channel.Name) + } // find indexer definitions for network and add definitions := s.indexerService.GetIndexersByIRCNetwork(network.Server) - // init new irc handler - handler := NewHandler(s.log, network, definitions, s.releaseService, s.notificationService) + s.lock.Lock() + network.Channels = channels - s.handlers[handlerKey{network.Server, network.Nick}] = handler + // init new irc handler + handler := NewHandler(s.log, s.sse, network, definitions, s.releaseService, s.notificationService) + + s.handlers[network.ID] = handler s.lock.Unlock() s.log.Debug().Msgf("starting network: %+v", network.Name) @@ -158,7 +171,7 @@ func (s *service) startNetwork(network domain.IrcNetwork) error { func (s *service) checkIfNetworkRestartNeeded(network *domain.IrcNetwork) error { // look if we have the network in handlers, if so restart it - if existingHandler, found := s.handlers[handlerKey{network.Server, network.Nick}]; found { + if existingHandler, found := s.handlers[network.ID]; found { s.log.Debug().Msgf("irc: decide if irc network handler needs restart or updating: %+v", network.Server) // if server, tls, invite command, port : changed - restart @@ -254,6 +267,9 @@ func (s *service) checkIfNetworkRestartNeeded(network *domain.IrcNetwork) error if err := existingHandler.PartChannel(leaveChannel); err != nil { s.log.Error().Stack().Err(err).Msgf("failed to leave channel: %q", leaveChannel) } + + // create SSE stream for new channel + s.removeSSEStream(network.ID, leaveChannel) } // join channels @@ -263,6 +279,9 @@ func (s *service) checkIfNetworkRestartNeeded(network *domain.IrcNetwork) error if err := existingHandler.JoinChannel(joinChannel.Name, joinChannel.Password); err != nil { s.log.Error().Stack().Err(err).Msgf("failed to join channel: %q", joinChannel.Name) } + + // create SSE stream for new channel + s.createSSEStream(network.ID, joinChannel.Name) } // update network for handler @@ -294,39 +313,43 @@ func (s *service) RestartNetwork(ctx context.Context, id int64) error { func (s *service) restartNetwork(network domain.IrcNetwork) error { // look if we have the network in handlers, if so restart it - hk := handlerKey{network.Server, network.Nick} - if err := s.StopNetworkIfRunning(hk); err != nil { + if err := s.StopNetworkIfRunning(network.ID); err != nil { return err } return s.startNetwork(network) } -func (s *service) StopNetwork(key handlerKey) error { - if handler, found := s.handlers[key]; found { +func (s *service) StopNetwork(id int64) error { + if handler, found := s.handlers[id]; found { handler.Stop() - s.log.Debug().Msgf("stopped network: %+v", key.server) + s.log.Debug().Msgf("stopped network: %+v", handler.network.Server) } return nil } -func (s *service) StopAndRemoveNetwork(key handlerKey) error { - if handler, found := s.handlers[key]; found { +func (s *service) StopAndRemoveNetwork(id int64) error { + if handler, found := s.handlers[id]; found { + // remove SSE streams + for _, channel := range handler.network.Channels { + s.removeSSEStream(handler.network.ID, channel.Name) + } + handler.Stop() // remove from handlers - delete(s.handlers, key) - s.log.Debug().Msgf("stopped network: %+v", key) + delete(s.handlers, id) + s.log.Debug().Msgf("stopped network: %+v", id) } return nil } -func (s *service) StopNetworkIfRunning(key handlerKey) error { - if handler, found := s.handlers[key]; found { +func (s *service) StopNetworkIfRunning(id int64) error { + if handler, found := s.handlers[id]; found { handler.Stop() - s.log.Debug().Msgf("stopped network: %+v", key.server) + s.log.Debug().Msgf("stopped network: %+v", handler.network.Server) } return nil @@ -398,7 +421,7 @@ func (s *service) GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetwor ConnectionErrors: []string{}, } - handler, ok := s.handlers[handlerKey{n.Server, n.Nick}] + handler, ok := s.handlers[n.ID] if ok { handler.m.RLock() @@ -462,7 +485,6 @@ func (s *service) GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetwor } ret = append(ret, netw) - } return ret, nil @@ -479,7 +501,7 @@ func (s *service) DeleteNetwork(ctx context.Context, id int64) error { // Remove network and handler //if err = s.StopNetwork(network.Server); err != nil { - if err = s.StopAndRemoveNetwork(handlerKey{network.Server, network.Nick}); err != nil { + if err = s.StopAndRemoveNetwork(network.ID); err != nil { s.log.Error().Stack().Err(err).Msgf("could not stop and delete network: %v", network.Name) return err } @@ -519,7 +541,7 @@ func (s *service) UpdateNetwork(ctx context.Context, network *domain.IrcNetwork) } else { // take into account multiple channels per network - err := s.StopAndRemoveNetwork(handlerKey{network.Server, network.Nick}) + err := s.StopAndRemoveNetwork(network.ID) if err != nil { s.log.Error().Stack().Err(err).Msgf("could not stop network: %+v", network.Name) return errors.New("could not stop network: %v", network.Name) @@ -544,7 +566,7 @@ func (s *service) StoreNetwork(ctx context.Context, network *domain.IrcNetwork) if network.Channels != nil { for _, channel := range network.Channels { - if err := s.repo.StoreChannel(network.ID, &channel); err != nil { + if err := s.repo.StoreChannel(nil, network.ID, &channel); err != nil { s.log.Error().Stack().Err(err).Msg("irc.storeChannel: error executing query") return errors.Wrap(err, "error storing channel on network") //return err @@ -565,7 +587,7 @@ func (s *service) StoreNetwork(ctx context.Context, network *domain.IrcNetwork) if network.Channels != nil { for _, channel := range network.Channels { // add channels. Make sure it doesn't delete before - if err := s.repo.StoreChannel(existingNetwork.ID, &channel); err != nil { + if err := s.repo.StoreChannel(nil, existingNetwork.ID, &channel); err != nil { return err } } @@ -597,10 +619,35 @@ func (s *service) StoreNetwork(ctx context.Context, network *domain.IrcNetwork) return nil } -func (s *service) StoreChannel(networkID int64, channel *domain.IrcChannel) error { - if err := s.repo.StoreChannel(networkID, channel); err != nil { +func (s *service) StoreChannel(ctx context.Context, networkID int64, channel *domain.IrcChannel) error { + if err := s.repo.StoreChannel(ctx, networkID, channel); err != nil { return err } return nil } + +func (s *service) SendCmd(ctx context.Context, req *domain.SendIrcCmdRequest) error { + if handler, found := s.handlers[req.NetworkId]; found { + if err := handler.SendMsg(req.Channel, req.Message); err != nil { + s.log.Error().Err(err).Msgf("could not send message to channel: %s %s", req.Channel, req.Message) + } + } + + return nil +} + +func (s *service) createSSEStream(networkId int64, channel string) { + key := fmt.Sprintf("%d%s", networkId, strings.TrimPrefix(channel, "#")) + + s.sse.CreateStreamWithOpts(key, sse.StreamOpts{ + MaxEntries: sseMaxEntries, + AutoReplay: true, + }) +} + +func (s *service) removeSSEStream(networkId int64, channel string) { + key := fmt.Sprintf("%d%s", networkId, strings.TrimPrefix(channel, "#")) + + s.sse.RemoveStream(key) +} diff --git a/test/mockindexer/irc/handlers.go b/test/mockindexer/irc/handlers.go index b14b240..e4f057b 100644 --- a/test/mockindexer/irc/handlers.go +++ b/test/mockindexer/irc/handlers.go @@ -37,5 +37,7 @@ func CommandHandler(c *Client, cmd []string) { c.writer <- fmt.Sprintf("366 %s %s :End", c.nick, c.channelName) case "PING": c.writer <- fmt.Sprintf("PONG n %s", strings.Join(cmd[1:], " ")) + case "PRIVMSG": + c.writer <- fmt.Sprintf("%s PRIVMSG %s %s", fmt.Sprintf(":%s", c.nick), cmd[1], fmt.Sprintf("%s", strings.Join(cmd[2:], " "))) } } diff --git a/web/src/api/APIClient.ts b/web/src/api/APIClient.ts index 23b3a3e..98b28de 100644 --- a/web/src/api/APIClient.ts +++ b/web/src/api/APIClient.ts @@ -150,7 +150,9 @@ export const APIClient = { createNetwork: (network: IrcNetworkCreate) => appClient.Post("api/irc", network), updateNetwork: (network: IrcNetwork) => appClient.Put(`api/irc/network/${network.id}`, network), deleteNetwork: (id: number) => appClient.Delete(`api/irc/network/${id}`), - restartNetwork: (id: number) => appClient.Get(`api/irc/network/${id}/restart`) + restartNetwork: (id: number) => appClient.Get(`api/irc/network/${id}/restart`), + sendCmd: (cmd: SendIrcCmdRequest) => appClient.Post(`api/irc/network/${cmd.network_id}/cmd`, cmd), + events: (network: string) => new EventSource(`${sseBaseUrl()}api/irc/events?stream=${network}`, { withCredentials: true }) }, logs: { files: () => appClient.Get("api/logs/files"), diff --git a/web/src/screens/settings/Irc.tsx b/web/src/screens/settings/Irc.tsx index 7af1e8b..9d8b563 100644 --- a/web/src/screens/settings/Irc.tsx +++ b/web/src/screens/settings/Irc.tsx @@ -3,7 +3,7 @@ * SPDX-License-Identifier: GPL-2.0-or-later */ -import { Fragment, useRef, useState, useMemo } from "react"; +import { Fragment, useRef, useState, useMemo, useEffect } from "react"; import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; import { LockClosedIcon, LockOpenIcon } from "@heroicons/react/24/solid"; import { Menu, Switch, Transition } from "@headlessui/react"; @@ -24,6 +24,8 @@ import { APIClient } from "@api/APIClient"; import { EmptySimple } from "@components/emptystates"; import { DeleteModal } from "@components/modals"; import Toast from "@components/notifications/Toast"; +import { SettingsContext } from "@utils/Context"; +import { useForm } from "react-hook-form"; export const ircKeys = { all: ["irc_networks"] as const, @@ -235,7 +237,7 @@ const ListItem = ({ idx, network, expanded }: ListItemProps) => { return (
  • -
    +
    {
    Monitoring since
    -
    +
    Last announce
  • {network.channels.map((c) => ( -
  • -
    -
    - - {network.enabled ? ( - c.monitoring ? ( - - - - - ) : ( - - ) - ) : ( - - )} - {c.name} - -
    -
    - - {IsEmptyDate(c.monitoring_since)} - -
    -
    - - {IsEmptyDate(c.last_announce)} - -
    -
    -
  • + ))} ) : ( @@ -397,6 +366,59 @@ const ListItem = ({ idx, network, expanded }: ListItemProps) => { ); }; +interface ChannelItemProps { + network: IrcNetwork; + channel: IrcChannelWithHealth; +} + +const ChannelItem = ({ network, channel }: ChannelItemProps) => { + const [viewChannel, toggleView] = useToggle(false); + + return ( +
  • +
    +
    + + {network.enabled ? ( + channel.monitoring ? ( + + + + + ) : ( + + ) + ) : ( + + )} + {channel.name} + +
    +
    + + {IsEmptyDate(channel.monitoring_since)} + +
    +
    + + {IsEmptyDate(channel.last_announce)} + +
    +
    + +
    +
    + {viewChannel && ( + + )} +
  • + ); +}; + interface ListItemDropdownProps { network: IrcNetwork; toggleUpdate: () => void; @@ -557,4 +579,106 @@ const ListItemDropdown = ({ ); }; +type IrcEvent = { + channel: string; + nick: string; + msg: string; + time: string; +}; + +type IrcMsg = { + msg: string; +}; + +interface EventsProps { + network: IrcNetwork; + channel: string; +} + +export const Events = ({ network, channel }: EventsProps) => { + const [settings] = SettingsContext.use(); + + const messagesEndRef = useRef(null); + + const [logs, setLogs] = useState([]); + + // const scrollToBottom = () => { + // messagesEndRef.current?.scrollIntoView({ behavior: "smooth", block: "end", inline: "end" }); + // }; + + const { handleSubmit, register , resetField } = useForm({ + defaultValues: { msg: "" }, + mode: "onBlur" + }); + + const cmdMutation = useMutation({ + mutationFn: (data: SendIrcCmdRequest) => APIClient.irc.sendCmd(data), + onSuccess: (_, variables) => { + resetField("msg"); + }, + onError: () => { + toast.custom((t) => ( + + )); + } + }); + + const onSubmit = (msg: IrcMsg) => { + const payload = { network_id: network.id, nick: network.nick, server: network.server, channel: channel, msg: msg.msg }; + cmdMutation.mutate(payload); + }; + + useEffect(() => { + const key = `${network.id}${channel.replace("#", "")}`; + const es = APIClient.irc.events(key); + + es.onmessage = (event) => { + const newData = JSON.parse(event.data) as IrcEvent; + setLogs((prevState) => [...prevState, newData]); + + // if (settings.scrollOnNewLog) + // scrollToBottom(); + }; + + return () => es.close(); + }, [settings]); + + return ( +
    +
    +
    + +
    + {logs.map((entry, idx) => ( +
    + {entry.nick}: {entry.msg} +
    + ))} +
    +
    + + {/*
    */} + {/*
    */} + {/* */} + {/* */} + {/*
    */} +
    + ); +}; + export default IrcSettings; diff --git a/web/src/types/Irc.d.ts b/web/src/types/Irc.d.ts index fc047a6..b614ef0 100644 --- a/web/src/types/Irc.d.ts +++ b/web/src/types/Irc.d.ts @@ -72,3 +72,11 @@ interface IrcAuth { account?: string; // optional password?: string; // optional } + +interface SendIrcCmdRequest { + network_id: number; + server: string; + channel: string; + nick: string; + msg: string; +} diff --git a/web/src/utils/Context.ts b/web/src/utils/Context.ts index 5123554..05e36e9 100644 --- a/web/src/utils/Context.ts +++ b/web/src/utils/Context.ts @@ -28,6 +28,7 @@ export const InitializeGlobalContext = () => { FilterListContext.set(JSON.parse(filterList_ctx)); } }; + interface AuthInfo { username: string; isLoggedIn: boolean; @@ -108,4 +109,39 @@ export const FilterListContext = newRidgeState( } } } -); \ No newline at end of file +); + +export type IrcNetworkState = { + id: number; + name: string; + status: string; +}; + +export type IrcBufferType = "NICK" | "CHANNEL" | "SERVER"; + +export type IrcBufferState = { + id: number; + name: string; + type: IrcBufferType; + messages: string[]; +}; + +export type IrcState = { + networks: Map; + buffers: Map +}; +export const IrcContext = newRidgeState( + { + networks: new Map(), + buffers: new Map() + }, + { + onSet: (new_state) => { + try { + console.log("set irc state", new_state); + } catch (e) { + console.log("Error:", e); + } + } + } +);