add broadcast endpoint & broadcast pipeline return error.

This commit is contained in:
Miroslav Šedivý 2020-11-18 21:34:39 +01:00
parent 6fbb1a2cc7
commit db820806a4
8 changed files with 124 additions and 15 deletions

View File

@ -1 +1,70 @@
package room
import (
"net/http"
"demodesk/neko/internal/utils"
"demodesk/neko/internal/types/event"
"demodesk/neko/internal/types/message"
)
type BroadcastStatusPayload struct {
URL string `json:"url,omitempty"`
IsActive bool `json:"is_active"`
}
func (h *RoomHandler) BroadcastStatus(w http.ResponseWriter, r *http.Request) {
utils.HttpSuccess(w, BroadcastStatusPayload{
IsActive: h.capture.BroadcastEnabled(),
URL: h.capture.BroadcastUrl(),
})
}
func (h *RoomHandler) BoradcastStart(w http.ResponseWriter, r *http.Request) {
data := &BroadcastStatusPayload{}
if !utils.HttpJsonRequest(w, r, data) {
return
}
if data.URL == "" {
utils.HttpBadRequest(w, "Missing broadcast URL.")
return
}
if h.capture.BroadcastEnabled() {
utils.HttpBadRequest(w, "Server is already broadcasting.")
return
}
if err := h.capture.StartBroadcast(data.URL); err != nil {
utils.HttpInternalServer(w, err)
return
}
h.sessions.AdminBroadcast(
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.capture.BroadcastEnabled(),
URL: h.capture.BroadcastUrl(),
}, nil)
utils.HttpSuccess(w)
}
func (h *RoomHandler) BoradcastStop(w http.ResponseWriter, r *http.Request) {
if !h.capture.BroadcastEnabled() {
utils.HttpBadRequest(w, "Server is not broadcasting.")
return
}
h.capture.StopBroadcast()
h.sessions.AdminBroadcast(
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.capture.BroadcastEnabled(),
URL: h.capture.BroadcastUrl(),
}, nil)
utils.HttpSuccess(w)
}

View File

@ -54,4 +54,10 @@ func (h *RoomHandler) Route(r chi.Router) {
r.With(auth.AdminsOnly).Post("/take", h.ControlTake)
r.With(auth.AdminsOnly).Post("/give", h.ControlGive)
})
r.With(auth.AdminsOnly).Route("/broadcast", func(r chi.Router) {
r.Get("/", h.BroadcastStatus)
r.Post("/start", h.BoradcastStart)
r.Post("/stop", h.BoradcastStop)
})
}

View File

@ -4,10 +4,10 @@ import (
"demodesk/neko/internal/capture/gst"
)
func (manager *CaptureManagerCtx) StartBroadcast(url string) {
func (manager *CaptureManagerCtx) StartBroadcast(url string) error {
manager.broadcast_url = url
manager.broadcasting = true
manager.createBroadcastPipeline()
return manager.createBroadcastPipeline()
}
func (manager *CaptureManagerCtx) StopBroadcast() {
@ -23,13 +23,9 @@ func (manager *CaptureManagerCtx) BroadcastUrl() string {
return manager.broadcast_url
}
func (manager *CaptureManagerCtx) createBroadcastPipeline() {
func (manager *CaptureManagerCtx) createBroadcastPipeline() error {
var err error
if manager.broadcast != nil || !manager.BroadcastEnabled() {
return
}
manager.logger.Info().
Str("audio_device", manager.config.Device).
Str("video_display", manager.config.Display).
@ -44,11 +40,12 @@ func (manager *CaptureManagerCtx) createBroadcastPipeline() {
)
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline")
return err
}
manager.broadcast.Play()
manager.logger.Info().Msgf("starting broadcast pipeline")
return nil
}
func (manager *CaptureManagerCtx) destroyBroadcastPipeline() {

View File

@ -46,14 +46,18 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
}
func (manager *CaptureManagerCtx) Start() {
manager.createBroadcastPipeline()
if manager.BroadcastEnabled() {
manager.createBroadcastPipeline()
}
manager.desktop.OnBeforeScreenSizeChange(func() {
if manager.Streaming() {
manager.destroyVideoPipeline()
}
manager.destroyBroadcastPipeline()
if manager.BroadcastEnabled() {
manager.destroyBroadcastPipeline()
}
})
manager.desktop.OnAfterScreenSizeChange(func() {
@ -61,7 +65,9 @@ func (manager *CaptureManagerCtx) Start() {
manager.createVideoPipeline()
}
manager.createBroadcastPipeline()
if manager.BroadcastEnabled() {
manager.createBroadcastPipeline()
}
})
go func() {
@ -85,9 +91,15 @@ func (manager *CaptureManagerCtx) Start() {
func (manager *CaptureManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("capture shutting down")
manager.StopStream()
if manager.Streaming() {
manager.StopStream()
}
if manager.BroadcastEnabled() {
manager.createBroadcastPipeline()
}
manager.destroyBroadcastPipeline()
manager.emit_stop <- true
return nil
}

View File

@ -163,6 +163,27 @@ func (manager *SessionManagerCtx) Broadcast(v interface{}, exclude interface{})
}
}
func (manager *SessionManagerCtx) AdminBroadcast(v interface{}, exclude interface{}) {
manager.membersMu.Lock()
defer manager.membersMu.Unlock()
for id, session := range manager.members {
if !session.connected || !session.Admin() {
continue
}
if exclude != nil {
if in, _ := utils.ArrayIn(id, exclude); in {
continue
}
}
if err := session.Send(v); err != nil {
manager.logger.Warn().Err(err).Msgf("broadcasting admin event has failed")
}
}
}
// ---
// events
// ---

View File

@ -20,7 +20,7 @@ type CaptureManager interface {
Streaming() bool
// broacast
StartBroadcast(url string)
StartBroadcast(url string) error
StopBroadcast()
BroadcastEnabled() bool
BroadcastUrl() string

View File

@ -30,6 +30,7 @@ type SessionManager interface {
Admins() []Session
Members() []Session
Broadcast(v interface{}, exclude interface{})
AdminBroadcast(v interface{}, exclude interface{})
OnHost(listener func(session Session))
OnHostCleared(listener func(session Session))

View File

@ -12,7 +12,10 @@ func (h *MessageHandlerCtx) boradcastCreate(session types.Session, payload *mess
return nil
}
h.capture.StartBroadcast(payload.URL)
if err := h.capture.StartBroadcast(payload.URL); err != nil {
return err
}
return h.boradcastStatus(session)
}