add WS handlers

This commit is contained in:
m1k1o 2020-09-27 00:10:34 +02:00
parent b2afd1af6d
commit ea80f07bcd
9 changed files with 149 additions and 32 deletions

View File

@ -13,18 +13,22 @@ type BroadcastManager struct {
pipeline *gst.Pipeline pipeline *gst.Pipeline
remote *config.Remote remote *config.Remote
config *config.Broadcast config *config.Broadcast
enabled bool
url string
} }
func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager { func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager {
return &BroadcastManager{ return &BroadcastManager{
logger: log.With().Str("module", "remote").Logger(), logger: log.With().Str("module", "remote").Logger(),
remote: remote, remote: remote,
config: config, config: config,
enabled: false,
url: "",
} }
} }
func (manager *BroadcastManager) Start() { func (manager *BroadcastManager) Start() {
if !manager.config.Enabled { if !manager.enabled || manager.IsActive() {
return return
} }
@ -32,7 +36,7 @@ func (manager *BroadcastManager) Start() {
manager.pipeline, err = gst.CreateRTMPPipeline( manager.pipeline, err = gst.CreateRTMPPipeline(
manager.remote.Device, manager.remote.Device,
manager.remote.Display, manager.remote.Display,
manager.config.RTMP, manager.url,
) )
manager.logger.Info(). manager.logger.Info().
@ -50,10 +54,29 @@ func (manager *BroadcastManager) Start() {
} }
func (manager *BroadcastManager) Stop() { func (manager *BroadcastManager) Stop() {
if manager.pipeline == nil { if !manager.IsActive() {
return return
} }
manager.pipeline.Stop() manager.pipeline.Stop()
manager.pipeline = nil manager.pipeline = nil
} }
func (manager *BroadcastManager) IsActive() bool {
return manager.pipeline != nil
}
func (manager *BroadcastManager) Create(url string) {
manager.url = url
manager.enabled = true
manager.Start()
}
func (manager *BroadcastManager) Destroy() {
manager.Stop()
manager.enabled = false
}
func (manager *BroadcastManager) GetUrl() string {
return manager.url
}

View File

@ -3,4 +3,8 @@ package types
type BroadcastManager interface { type BroadcastManager interface {
Start() Start()
Stop() Stop()
IsActive() bool
Create(url string)
Destroy()
GetUrl() string
} }

View File

@ -36,6 +36,12 @@ const (
SCREEN_SET = "screen/set" SCREEN_SET = "screen/set"
) )
const (
BORADCAST_STATUS = "broadcast/status"
BORADCAST_CREATE = "broadcast/create"
BORADCAST_DESTROY = "broadcast/destroy"
)
const ( const (
ADMIN_BAN = "admin/ban" ADMIN_BAN = "admin/ban"
ADMIN_KICK = "admin/kick" ADMIN_KICK = "admin/kick"

View File

@ -110,3 +110,14 @@ type ScreenConfigurations struct {
Event string `json:"event"` Event string `json:"event"`
Configurations map[int]types.ScreenConfiguration `json:"configurations"` Configurations map[int]types.ScreenConfiguration `json:"configurations"`
} }
type BroadcastStatus struct {
Event string `json:"event"`
URL string `json:"url"`
IsActive bool `json:"isActive"`
}
type BroadcastCreate struct {
Event string `json:"event"`
URL string `json:"url"`
}

View File

@ -0,0 +1,56 @@
package websocket
import (
"n.eko.moe/neko/internal/types"
"n.eko.moe/neko/internal/types/event"
"n.eko.moe/neko/internal/types/message"
)
func (h *MessageHandler) boradcastCreate(session types.Session, payload *message.BroadcastCreate) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
h.broadcast.Create(payload.URL)
if err := h.boradcastStatus(session); err != nil {
return err
}
return nil
}
func (h *MessageHandler) boradcastDestroy(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
h.broadcast.Destroy()
if err := h.boradcastStatus(session); err != nil {
return err
}
return nil
}
func (h *MessageHandler) boradcastStatus(session types.Session) error {
if !session.Admin() {
h.logger.Debug().Msg("user not admin")
return nil
}
if err := session.Send(
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.broadcast.IsActive(),
URL: h.broadcast.GetUrl(),
}); err != nil {
h.logger.Warn().Err(err).Msgf("sending event %s has failed", event.BORADCAST_STATUS)
return err
}
return nil
}

View File

@ -13,12 +13,13 @@ import (
) )
type MessageHandler struct { type MessageHandler struct {
logger zerolog.Logger logger zerolog.Logger
sessions types.SessionManager sessions types.SessionManager
webrtc types.WebRTCManager webrtc types.WebRTCManager
remote types.RemoteManager remote types.RemoteManager
banned map[string]bool broadcast types.BroadcastManager
locked bool banned map[string]bool
locked bool
} }
func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string, error) { func (h *MessageHandler) Connected(id string, socket *WebSocket) (bool, string, error) {
@ -123,6 +124,16 @@ func (h *MessageHandler) Message(id string, raw []byte) error {
return h.screenSet(id, session, payload) return h.screenSet(id, session, payload)
}), "%s failed", header.Event) }), "%s failed", header.Event)
// Boradcast Events
case event.BORADCAST_CREATE:
payload := &message.BroadcastCreate{}
return errors.Wrapf(
utils.Unmarshal(payload, raw, func() error {
return h.boradcastCreate(session, payload)
}), "%s failed", header.Event)
case event.BORADCAST_DESTROY:
return errors.Wrapf(h.boradcastDestroy(session), "%s failed", header.Event)
// Admin Events // Admin Events
case event.ADMIN_LOCK: case event.ADMIN_LOCK:
return errors.Wrapf(h.adminLock(id, session), "%s failed", header.Event) return errors.Wrapf(h.adminLock(id, session), "%s failed", header.Event)

View File

@ -17,6 +17,11 @@ func (h *MessageHandler) SessionCreated(id string, session types.Session) error
if err := h.screenConfigurations(id, session); err != nil { if err := h.screenConfigurations(id, session); err != nil {
return err return err
} }
// send broadcast status if admin
if err := h.boradcastStatus(session); err != nil {
return err
}
} }
return nil return nil

View File

@ -16,26 +16,27 @@ import (
"n.eko.moe/neko/internal/utils" "n.eko.moe/neko/internal/utils"
) )
func New(sessions types.SessionManager, remote types.RemoteManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler { func New(sessions types.SessionManager, remote types.RemoteManager, broadcast types.BroadcastManager, webrtc types.WebRTCManager, conf *config.WebSocket) *WebSocketHandler {
logger := log.With().Str("module", "websocket").Logger() logger := log.With().Str("module", "websocket").Logger()
return &WebSocketHandler{ return &WebSocketHandler{
logger: logger, logger: logger,
conf: conf, conf: conf,
sessions: sessions, sessions: sessions,
remote: remote, remote: remote,
upgrader: websocket.Upgrader{ upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { CheckOrigin: func(r *http.Request) bool {
return true return true
}, },
}, },
handler: &MessageHandler{ handler: &MessageHandler{
logger: logger.With().Str("subsystem", "handler").Logger(), logger: logger.With().Str("subsystem", "handler").Logger(),
remote: remote, remote: remote,
sessions: sessions, broadcast: broadcast,
webrtc: webrtc, sessions: sessions,
banned: make(map[string]bool), webrtc: webrtc,
locked: false, banned: make(map[string]bool),
locked: false,
}, },
} }
} }
@ -44,13 +45,13 @@ func New(sessions types.SessionManager, remote types.RemoteManager, webrtc types
const pingPeriod = 60 * time.Second const pingPeriod = 60 * time.Second
type WebSocketHandler struct { type WebSocketHandler struct {
logger zerolog.Logger logger zerolog.Logger
upgrader websocket.Upgrader upgrader websocket.Upgrader
sessions types.SessionManager sessions types.SessionManager
remote types.RemoteManager remote types.RemoteManager
conf *config.WebSocket conf *config.WebSocket
handler *MessageHandler handler *MessageHandler
shutdown chan bool shutdown chan bool
} }
func (ws *WebSocketHandler) Start() error { func (ws *WebSocketHandler) Start() error {

View File

@ -130,7 +130,7 @@ func (neko *Neko) Start() {
webRTCManager := webrtc.New(sessionManager, remoteManager, neko.WebRTC) webRTCManager := webrtc.New(sessionManager, remoteManager, neko.WebRTC)
webRTCManager.Start() webRTCManager.Start()
webSocketHandler := websocket.New(sessionManager, remoteManager, webRTCManager, neko.WebSocket) webSocketHandler := websocket.New(sessionManager, remoteManager, broadcastManager, webRTCManager, neko.WebSocket)
webSocketHandler.Start() webSocketHandler.Start()
server := http.New(neko.Server, webSocketHandler) server := http.New(neko.Server, webSocketHandler)