feat(irc): view announces per channel (#948)

* feat(irc): add sse to handler

* feat(irc): view and send irc messages per network

* refactor(irc): use id as handlerkey

* refactor(irc): use id as handlerkey

* feat(web): add irc context

* refactor: create sse stream per network channel

* fix(irc): remove non-working wildcard callback handler

* feat: use fork of sse

* chore(deps): update ergo/irc-go to v0.3.0

* fix: clean irc msg before sse publish

* feat: add view channel button

* feat: styling improvements

* feat: show time
This commit is contained in:
ze0s 2023-05-21 15:51:40 +02:00 committed by GitHub
parent bbfcf303ef
commit ccabe96bdf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 446 additions and 125 deletions

View file

@ -20,6 +20,7 @@ import (
"github.com/ergochat/irc-go/ircevent"
"github.com/ergochat/irc-go/ircfmt"
"github.com/ergochat/irc-go/ircmsg"
"github.com/r3labs/sse/v2"
"github.com/rs/zerolog"
"github.com/sasha-s/go-deadlock"
)
@ -59,6 +60,7 @@ func (ch *channelHealth) resetMonitoring() {
type Handler struct {
log zerolog.Logger
sse *sse.Server
network *domain.IrcNetwork
releaseSvc release.Service
notificationService notification.Service
@ -83,9 +85,10 @@ type Handler struct {
saslauthed bool
}
func NewHandler(log zerolog.Logger, network domain.IrcNetwork, definitions []*domain.IndexerDefinition, releaseSvc release.Service, notificationSvc notification.Service) *Handler {
func NewHandler(log zerolog.Logger, sse *sse.Server, network domain.IrcNetwork, definitions []*domain.IndexerDefinition, releaseSvc release.Service, notificationSvc notification.Service) *Handler {
h := &Handler{
log: log.With().Str("network", network.Server).Logger(),
sse: sse,
client: nil,
network: &network,
releaseSvc: releaseSvc,
@ -186,6 +189,7 @@ func (h *Handler) Run() error {
h.client.AddConnectCallback(h.onConnect)
h.client.AddDisconnectCallback(h.onDisconnect)
h.client.AddCallback("MODE", h.handleMode)
h.client.AddCallback("INVITE", h.handleInvite)
h.client.AddCallback("366", h.handleJoined)
@ -538,30 +542,41 @@ func (h *Handler) onNick(msg ircmsg.Message) {
}
}
func (h *Handler) publishSSEMsg(msg domain.IrcMessage) {
h.sse.Publish(fmt.Sprintf("%d%s", h.network.ID, strings.TrimPrefix(msg.Channel, "#")), &sse.Event{
Data: msg.Bytes(),
})
}
// onMessage handles PRIVMSG events
func (h *Handler) onMessage(msg ircmsg.Message) {
if len(msg.Params) < 2 {
return
}
// parse announce
announcer := msg.Nick()
nick := msg.Nick()
channel := msg.Params[0]
message := msg.Params[1]
// clean message
cleanedMsg := h.cleanMessage(message)
h.log.Debug().Str("channel", channel).Str("nick", nick).Msg(cleanedMsg)
// publish to SSE stream
h.publishSSEMsg(domain.IrcMessage{Channel: channel, Nick: nick, Message: cleanedMsg, Time: time.Now()})
// check if message is from a valid channel, if not return
if validChannel := h.isValidChannel(channel); !validChannel {
return
}
// check if message is from announce bot, if not return
if validAnnouncer := h.isValidAnnouncer(announcer); !validAnnouncer {
if validAnnouncer := h.isValidAnnouncer(nick); !validAnnouncer {
return
}
// clean message
cleanedMsg := h.cleanMessage(message)
h.log.Debug().Str("channel", channel).Str("user", announcer).Msgf("%v", cleanedMsg)
if err := h.sendToAnnounceProcessor(channel, cleanedMsg); err != nil {
h.log.Error().Stack().Err(err).Msgf("could not queue line: %v", cleanedMsg)
return
@ -819,6 +834,17 @@ func (h *Handler) handleMode(msg ircmsg.Message) {
return
}
func (h *Handler) SendMsg(channel, msg string) error {
h.log.Debug().Msgf("sending msg command: %s", msg)
if err := h.client.Privmsg(channel, msg); err != nil {
h.log.Error().Stack().Err(err).Msgf("error sending msg: %v", msg)
return err
}
return nil
}
// check if announcer is one from the list in the definition
func (h *Handler) isValidAnnouncer(nick string) bool {
h.m.RLock()

View file

@ -5,6 +5,7 @@ package irc
import (
"context"
"fmt"
"strings"
"sync"
"time"
@ -16,13 +17,16 @@ import (
"github.com/autobrr/autobrr/internal/release"
"github.com/autobrr/autobrr/pkg/errors"
"github.com/r3labs/sse/v2"
"github.com/rs/zerolog"
)
type Service interface {
StartHandlers()
StopHandlers()
StopNetwork(key handlerKey) error
StopNetwork(id int64) error
StopAndRemoveNetwork(id int64) error
StopNetworkIfRunning(id int64) error
RestartNetwork(ctx context.Context, id int64) error
ListNetworks(ctx context.Context) ([]domain.IrcNetwork, error)
GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetworkWithHealth, error)
@ -30,38 +34,39 @@ type Service interface {
DeleteNetwork(ctx context.Context, id int64) error
StoreNetwork(ctx context.Context, network *domain.IrcNetwork) error
UpdateNetwork(ctx context.Context, network *domain.IrcNetwork) error
StoreChannel(networkID int64, channel *domain.IrcChannel) error
StoreChannel(ctx context.Context, networkID int64, channel *domain.IrcChannel) error
SendCmd(ctx context.Context, req *domain.SendIrcCmdRequest) error
}
type service struct {
stopWG sync.WaitGroup
lock sync.RWMutex
log zerolog.Logger
sse *sse.Server
log zerolog.Logger
repo domain.IrcRepo
releaseService release.Service
indexerService indexer.Service
notificationService notification.Service
indexerMap map[string]string
handlers map[handlerKey]*Handler
handlers map[int64]*Handler
stopWG sync.WaitGroup
lock sync.RWMutex
}
func NewService(log logger.Logger, repo domain.IrcRepo, releaseSvc release.Service, indexerSvc indexer.Service, notificationSvc notification.Service) Service {
const sseMaxEntries = 1000
func NewService(log logger.Logger, sse *sse.Server, repo domain.IrcRepo, releaseSvc release.Service, indexerSvc indexer.Service, notificationSvc notification.Service) Service {
return &service{
log: log.With().Str("module", "irc").Logger(),
sse: sse,
repo: repo,
releaseService: releaseSvc,
indexerService: indexerSvc,
notificationService: notificationSvc,
handlers: make(map[handlerKey]*Handler),
handlers: make(map[int64]*Handler),
}
}
type handlerKey struct {
server string
nick string
}
func (s *service) StartHandlers() {
networks, err := s.repo.FindActiveNetworks(context.Background())
if err != nil {
@ -73,25 +78,28 @@ func (s *service) StartHandlers() {
continue
}
// check if already in handlers
//v, ok := s.handlers[network.Name]
s.lock.Lock()
channels, err := s.repo.ListChannels(network.ID)
if err != nil {
s.log.Error().Err(err).Msgf("failed to list channels for network %q", network.Server)
}
network.Channels = channels
for _, channel := range channels {
// setup SSE stream per channel
s.createSSEStream(network.ID, channel.Name)
}
// find indexer definitions for network and add
definitions := s.indexerService.GetIndexersByIRCNetwork(network.Server)
s.lock.Lock()
network.Channels = channels
// init new irc handler
handler := NewHandler(s.log, network, definitions, s.releaseService, s.notificationService)
handler := NewHandler(s.log, s.sse, network, definitions, s.releaseService, s.notificationService)
// use network.Server + nick to use multiple indexers with different nick per network
// this allows for multiple handlers to one network
s.handlers[handlerKey{network.Server, network.Nick}] = handler
s.handlers[network.ID] = handler
s.lock.Unlock()
s.log.Debug().Msgf("starting network: %+v", network.Name)
@ -115,7 +123,7 @@ func (s *service) StopHandlers() {
func (s *service) startNetwork(network domain.IrcNetwork) error {
// look if we have the network in handlers already, if so start it
if existingHandler, found := s.handlers[handlerKey{network.Server, network.Nick}]; found {
if existingHandler, found := s.handlers[network.ID]; found {
s.log.Debug().Msgf("starting network: %+v", network.Name)
if !existingHandler.client.Connected() {
@ -127,21 +135,26 @@ func (s *service) startNetwork(network domain.IrcNetwork) error {
}
} else {
// if not found in handlers, lets add it and run it
s.lock.Lock()
channels, err := s.repo.ListChannels(network.ID)
if err != nil {
s.log.Error().Err(err).Msgf("failed to list channels for network %q", network.Server)
}
network.Channels = channels
for _, channel := range channels {
// setup SSE stream per channel
s.createSSEStream(network.ID, channel.Name)
}
// find indexer definitions for network and add
definitions := s.indexerService.GetIndexersByIRCNetwork(network.Server)
// init new irc handler
handler := NewHandler(s.log, network, definitions, s.releaseService, s.notificationService)
s.lock.Lock()
network.Channels = channels
s.handlers[handlerKey{network.Server, network.Nick}] = handler
// init new irc handler
handler := NewHandler(s.log, s.sse, network, definitions, s.releaseService, s.notificationService)
s.handlers[network.ID] = handler
s.lock.Unlock()
s.log.Debug().Msgf("starting network: %+v", network.Name)
@ -158,7 +171,7 @@ func (s *service) startNetwork(network domain.IrcNetwork) error {
func (s *service) checkIfNetworkRestartNeeded(network *domain.IrcNetwork) error {
// look if we have the network in handlers, if so restart it
if existingHandler, found := s.handlers[handlerKey{network.Server, network.Nick}]; found {
if existingHandler, found := s.handlers[network.ID]; found {
s.log.Debug().Msgf("irc: decide if irc network handler needs restart or updating: %+v", network.Server)
// if server, tls, invite command, port : changed - restart
@ -254,6 +267,9 @@ func (s *service) checkIfNetworkRestartNeeded(network *domain.IrcNetwork) error
if err := existingHandler.PartChannel(leaveChannel); err != nil {
s.log.Error().Stack().Err(err).Msgf("failed to leave channel: %q", leaveChannel)
}
// create SSE stream for new channel
s.removeSSEStream(network.ID, leaveChannel)
}
// join channels
@ -263,6 +279,9 @@ func (s *service) checkIfNetworkRestartNeeded(network *domain.IrcNetwork) error
if err := existingHandler.JoinChannel(joinChannel.Name, joinChannel.Password); err != nil {
s.log.Error().Stack().Err(err).Msgf("failed to join channel: %q", joinChannel.Name)
}
// create SSE stream for new channel
s.createSSEStream(network.ID, joinChannel.Name)
}
// update network for handler
@ -294,39 +313,43 @@ func (s *service) RestartNetwork(ctx context.Context, id int64) error {
func (s *service) restartNetwork(network domain.IrcNetwork) error {
// look if we have the network in handlers, if so restart it
hk := handlerKey{network.Server, network.Nick}
if err := s.StopNetworkIfRunning(hk); err != nil {
if err := s.StopNetworkIfRunning(network.ID); err != nil {
return err
}
return s.startNetwork(network)
}
func (s *service) StopNetwork(key handlerKey) error {
if handler, found := s.handlers[key]; found {
func (s *service) StopNetwork(id int64) error {
if handler, found := s.handlers[id]; found {
handler.Stop()
s.log.Debug().Msgf("stopped network: %+v", key.server)
s.log.Debug().Msgf("stopped network: %+v", handler.network.Server)
}
return nil
}
func (s *service) StopAndRemoveNetwork(key handlerKey) error {
if handler, found := s.handlers[key]; found {
func (s *service) StopAndRemoveNetwork(id int64) error {
if handler, found := s.handlers[id]; found {
// remove SSE streams
for _, channel := range handler.network.Channels {
s.removeSSEStream(handler.network.ID, channel.Name)
}
handler.Stop()
// remove from handlers
delete(s.handlers, key)
s.log.Debug().Msgf("stopped network: %+v", key)
delete(s.handlers, id)
s.log.Debug().Msgf("stopped network: %+v", id)
}
return nil
}
func (s *service) StopNetworkIfRunning(key handlerKey) error {
if handler, found := s.handlers[key]; found {
func (s *service) StopNetworkIfRunning(id int64) error {
if handler, found := s.handlers[id]; found {
handler.Stop()
s.log.Debug().Msgf("stopped network: %+v", key.server)
s.log.Debug().Msgf("stopped network: %+v", handler.network.Server)
}
return nil
@ -398,7 +421,7 @@ func (s *service) GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetwor
ConnectionErrors: []string{},
}
handler, ok := s.handlers[handlerKey{n.Server, n.Nick}]
handler, ok := s.handlers[n.ID]
if ok {
handler.m.RLock()
@ -462,7 +485,6 @@ func (s *service) GetNetworksWithHealth(ctx context.Context) ([]domain.IrcNetwor
}
ret = append(ret, netw)
}
return ret, nil
@ -479,7 +501,7 @@ func (s *service) DeleteNetwork(ctx context.Context, id int64) error {
// Remove network and handler
//if err = s.StopNetwork(network.Server); err != nil {
if err = s.StopAndRemoveNetwork(handlerKey{network.Server, network.Nick}); err != nil {
if err = s.StopAndRemoveNetwork(network.ID); err != nil {
s.log.Error().Stack().Err(err).Msgf("could not stop and delete network: %v", network.Name)
return err
}
@ -519,7 +541,7 @@ func (s *service) UpdateNetwork(ctx context.Context, network *domain.IrcNetwork)
} else {
// take into account multiple channels per network
err := s.StopAndRemoveNetwork(handlerKey{network.Server, network.Nick})
err := s.StopAndRemoveNetwork(network.ID)
if err != nil {
s.log.Error().Stack().Err(err).Msgf("could not stop network: %+v", network.Name)
return errors.New("could not stop network: %v", network.Name)
@ -544,7 +566,7 @@ func (s *service) StoreNetwork(ctx context.Context, network *domain.IrcNetwork)
if network.Channels != nil {
for _, channel := range network.Channels {
if err := s.repo.StoreChannel(network.ID, &channel); err != nil {
if err := s.repo.StoreChannel(nil, network.ID, &channel); err != nil {
s.log.Error().Stack().Err(err).Msg("irc.storeChannel: error executing query")
return errors.Wrap(err, "error storing channel on network")
//return err
@ -565,7 +587,7 @@ func (s *service) StoreNetwork(ctx context.Context, network *domain.IrcNetwork)
if network.Channels != nil {
for _, channel := range network.Channels {
// add channels. Make sure it doesn't delete before
if err := s.repo.StoreChannel(existingNetwork.ID, &channel); err != nil {
if err := s.repo.StoreChannel(nil, existingNetwork.ID, &channel); err != nil {
return err
}
}
@ -597,10 +619,35 @@ func (s *service) StoreNetwork(ctx context.Context, network *domain.IrcNetwork)
return nil
}
func (s *service) StoreChannel(networkID int64, channel *domain.IrcChannel) error {
if err := s.repo.StoreChannel(networkID, channel); err != nil {
func (s *service) StoreChannel(ctx context.Context, networkID int64, channel *domain.IrcChannel) error {
if err := s.repo.StoreChannel(ctx, networkID, channel); err != nil {
return err
}
return nil
}
func (s *service) SendCmd(ctx context.Context, req *domain.SendIrcCmdRequest) error {
if handler, found := s.handlers[req.NetworkId]; found {
if err := handler.SendMsg(req.Channel, req.Message); err != nil {
s.log.Error().Err(err).Msgf("could not send message to channel: %s %s", req.Channel, req.Message)
}
}
return nil
}
func (s *service) createSSEStream(networkId int64, channel string) {
key := fmt.Sprintf("%d%s", networkId, strings.TrimPrefix(channel, "#"))
s.sse.CreateStreamWithOpts(key, sse.StreamOpts{
MaxEntries: sseMaxEntries,
AutoReplay: true,
})
}
func (s *service) removeSSEStream(networkId int64, channel string) {
key := fmt.Sprintf("%d%s", networkId, strings.TrimPrefix(channel, "#"))
s.sse.RemoveStream(key)
}