diff --git a/server/internal/broadcast/manager.go b/server/internal/broadcast/manager.go index ae5a73d6..bca83cf7 100644 --- a/server/internal/broadcast/manager.go +++ b/server/internal/broadcast/manager.go @@ -13,18 +13,22 @@ type BroadcastManager struct { pipeline *gst.Pipeline remote *config.Remote config *config.Broadcast + enabled bool + url string } func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager { return &BroadcastManager{ - logger: log.With().Str("module", "remote").Logger(), - remote: remote, - config: config, + logger: log.With().Str("module", "remote").Logger(), + remote: remote, + config: config, + enabled: false, + url: "", } } func (manager *BroadcastManager) Start() { - if !manager.config.Enabled { + if !manager.enabled || manager.IsActive() { return } @@ -32,7 +36,7 @@ func (manager *BroadcastManager) Start() { manager.pipeline, err = gst.CreateRTMPPipeline( manager.remote.Device, manager.remote.Display, - manager.config.RTMP, + manager.url, ) manager.logger.Info(). @@ -50,10 +54,29 @@ func (manager *BroadcastManager) Start() { } func (manager *BroadcastManager) Stop() { - if manager.pipeline == nil { + if !manager.IsActive() { return } manager.pipeline.Stop() manager.pipeline = nil } + +func (manager *BroadcastManager) IsActive() bool { + return manager.pipeline != nil +} + +func (manager *BroadcastManager) Create(url string) { + manager.url = url + manager.enabled = true + manager.Start() +} + +func (manager *BroadcastManager) Destroy() { + manager.Stop() + manager.enabled = false +} + +func (manager *BroadcastManager) GetUrl() string { + return manager.url +} diff --git a/server/internal/types/broadcast.go b/server/internal/types/broadcast.go index 7b4d698a..fee3fd52 100644 --- a/server/internal/types/broadcast.go +++ b/server/internal/types/broadcast.go @@ -3,4 +3,8 @@ package types type BroadcastManager interface { Start() Stop() + IsActive() bool + Create(url string) + Destroy() + GetUrl() string } diff --git a/server/internal/types/event/events.go b/server/internal/types/event/events.go index 7b9d31ba..b97c49c8 100644 --- a/server/internal/types/event/events.go +++ b/server/internal/types/event/events.go @@ -36,6 +36,12 @@ const ( SCREEN_SET = "screen/set" ) +const ( + BORADCAST_STATUS = "broadcast/status" + BORADCAST_CREATE = "broadcast/create" + BORADCAST_DESTROY = "broadcast/destroy" +) + const ( ADMIN_BAN = "admin/ban" ADMIN_KICK = "admin/kick" diff --git a/server/internal/types/message/messages.go b/server/internal/types/message/messages.go index 8113e332..fcf04f30 100644 --- a/server/internal/types/message/messages.go +++ b/server/internal/types/message/messages.go @@ -110,3 +110,14 @@ type ScreenConfigurations struct { Event string `json:"event"` Configurations map[int]types.ScreenConfiguration `json:"configurations"` } + +type BroadcastStatus struct { + Event string `json:"event"` + URL string `json:"url"` + IsActive bool `json:"isActive"` +} + +type BroadcastCreate struct { + Event string `json:"event"` + URL string `json:"url"` +} diff --git a/server/internal/websocket/broadcast.go b/server/internal/websocket/broadcast.go new file mode 100644 index 00000000..c2d6a884 --- /dev/null +++ b/server/internal/websocket/broadcast.go @@ -0,0 +1,56 @@ +package websocket + +import ( + "n.eko.moe/neko/internal/types" + "n.eko.moe/neko/internal/types/event" + "n.eko.moe/neko/internal/types/message" +) + +func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error { + if !session.Admin() { + h.logger.Debug().Msg("user not admin") + return nil + } + + h.broadcast.Create(payload.URL) + + if err := h.boradcastStatus(session); err != nil { + return err + } + + return nil +} + +func (h *MessageHandler) boradcastDestroy(session types.Session) error { + if !session.Admin() { + h.logger.Debug().Msg("user not admin") + return nil + } + + h.broadcast.Destroy() + + if err := h.boradcastStatus(session); err != nil { + return err + } + + return nil +} + +func (h *MessageHandler) boradcastStatus(session types.Session) error { + if !session.Admin() { + h.logger.Debug().Msg("user not admin") + return nil + } + + if err := session.Send( + message.BroadcastStatus{ + Event: event.BORADCAST_STATUS, + IsActive: h.broadcast.IsActive(), + URL: h.broadcast.GetUrl(), + }); err != nil { + h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS) + return err + } + + return nil +} diff --git a/server/internal/websocket/handler.go b/server/internal/websocket/handler.go index 5693c970..af829c82 100644 --- a/server/internal/websocket/handler.go +++ b/server/internal/websocket/handler.go @@ -13,12 +13,13 @@ import ( ) type MessageHandler struct { - logger zerolog.Logger - sessions types.SessionManager - webrtc types.WebRTCManager - remote types.RemoteManager - banned map[string]bool - locked bool + logger zerolog.Logger + sessions types.SessionManager + webrtc types.WebRTCManager + remote types.RemoteManager + broadcast types.BroadcastManager + banned map[string]bool + locked bool } func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string, error) { @@ -123,6 +124,16 @@ func (h *MessageHandler) Message(id string, raw []byte) error { return h.screenSet(id, session, payload) }), "%s failed", header.Event) + // Boradcast Events + case event.BORADCAST_CREATE: + payload := &message.BroadcastCreate{} + return errors.Wrapf( + utils.Unmarshal(payload, raw, func() error { + return h.boradcastCreate(session, payload) + }), "%s failed", header.Event) + case event.BORADCAST_DESTROY: + return errors.Wrapf(h.boradcastDestroy(session), "%s failed", header.Event) + // Admin Events case event.ADMIN_LOCK: return errors.Wrapf(h.adminLock(id, session), "%s failed", header.Event) diff --git a/server/internal/websocket/session.go b/server/internal/websocket/session.go index 23d71cdb..9151aa23 100644 --- a/server/internal/websocket/session.go +++ b/server/internal/websocket/session.go @@ -17,6 +17,11 @@ func (h *MessageHandler) SessionCreated(id string, session types.Session) error if err := h.screenConfigurations(id, session); err != nil { return err } + + // send broadcast status if admin + if err := h.boradcastStatus(session); err != nil { + return err + } } return nil diff --git a/server/internal/websocket/websocket.go b/server/internal/websocket/websocket.go index c7d85239..77e9651d 100644 --- a/server/internal/websocket/websocket.go +++ b/server/internal/websocket/websocket.go @@ -16,26 +16,27 @@ import ( "n.eko.moe/neko/internal/utils" ) -func New(sessions types.SessionManager, remote types.RemoteManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler { +func New(sessions types.SessionManager, remote types.RemoteManager, broadcast types.BroadcastManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler { logger := log.With().Str("module", "websocket").Logger() return &WebSocketHandler{ - logger: logger, - conf: conf, - sessions: sessions, - remote: remote, - upgrader: websocket.Upgrader{ + logger: logger, + conf: conf, + sessions: sessions, + remote: remote, + upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, }, handler: &MessageHandler{ - logger: logger.With().Str("subsystem", "handler").Logger(), - remote: remote, - sessions: sessions, - webrtc: webrtc, - banned: make(map[string]bool), - locked: false, + logger: logger.With().Str("subsystem", "handler").Logger(), + remote: remote, + broadcast: broadcast, + sessions: sessions, + webrtc: webrtc, + banned: make(map[string]bool), + locked: false, }, } } @@ -44,13 +45,13 @@ func New(sessions types.SessionManager, remote types.RemoteManager, webrtc types const pingPeriod = 60 * time.Second type WebSocketHandler struct { - logger zerolog.Logger - upgrader websocket.Upgrader - sessions types.SessionManager - remote types.RemoteManager - conf *config.WebSocket - handler *MessageHandler - shutdown chan bool + logger zerolog.Logger + upgrader websocket.Upgrader + sessions types.SessionManager + remote types.RemoteManager + conf *config.WebSocket + handler *MessageHandler + shutdown chan bool } func (ws *WebSocketHandler) Start() error { diff --git a/server/neko.go b/server/neko.go index 447979b8..6c3d6163 100644 --- a/server/neko.go +++ b/server/neko.go @@ -130,7 +130,7 @@ func (neko *Neko) Start() { webRTCManager := webrtc.New(sessionManager, remoteManager, neko.WebRTC) webRTCManager.Start() - webSocketHandler := websocket.New(sessionManager, remoteManager, webRTCManager, neko.WebSocket) + webSocketHandler := websocket.New(sessionManager, remoteManager, broadcastManager, webRTCManager, neko.WebSocket) webSocketHandler.Start() server := http.New(neko.Server, webSocketHandler)