decouple BroadcastManager from CaptureManager.

This commit is contained in:
Miroslav Šedivý 2021-01-22 14:09:47 +01:00
parent afd3dd2f56
commit 407853eeb1
5 changed files with 82 additions and 53 deletions

View File

@ -14,9 +14,10 @@ type BroadcastStatusPayload struct {
}
func (h *RoomHandler) broadcastStatus(w http.ResponseWriter, r *http.Request) {
broadcast := h.capture.Broadcast()
utils.HttpSuccess(w, BroadcastStatusPayload{
IsActive: h.capture.BroadcastEnabled(),
URL: h.capture.BroadcastUrl(),
IsActive: broadcast.Enabled(),
URL: broadcast.Url(),
})
}
@ -31,12 +32,13 @@ func (h *RoomHandler) boradcastStart(w http.ResponseWriter, r *http.Request) {
return
}
if h.capture.BroadcastEnabled() {
broadcast := h.capture.Broadcast()
if broadcast.Enabled() {
utils.HttpUnprocessableEntity(w, "Server is already broadcasting.")
return
}
if err := h.capture.StartBroadcast(data.URL); err != nil {
if err := broadcast.Start(data.URL); err != nil {
utils.HttpInternalServerError(w, err)
return
}
@ -44,26 +46,27 @@ func (h *RoomHandler) boradcastStart(w http.ResponseWriter, r *http.Request) {
h.sessions.AdminBroadcast(
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.capture.BroadcastEnabled(),
URL: h.capture.BroadcastUrl(),
IsActive: broadcast.Enabled(),
URL: broadcast.Url(),
}, nil)
utils.HttpSuccess(w)
}
func (h *RoomHandler) boradcastStop(w http.ResponseWriter, r *http.Request) {
if !h.capture.BroadcastEnabled() {
broadcast := h.capture.Broadcast()
if !broadcast.Enabled() {
utils.HttpUnprocessableEntity(w, "Server is not broadcasting.")
return
}
h.capture.StopBroadcast()
broadcast.Stop()
h.sessions.AdminBroadcast(
message.BroadcastStatus{
Event: event.BORADCAST_STATUS,
IsActive: h.capture.BroadcastEnabled(),
URL: h.capture.BroadcastUrl(),
IsActive: broadcast.Enabled(),
URL: broadcast.Url(),
}, nil)
utils.HttpSuccess(w)

View File

@ -1,59 +1,80 @@
package capture
import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"demodesk/neko/internal/config"
"demodesk/neko/internal/capture/gst"
)
func (manager *CaptureManagerCtx) StartBroadcast(url string) error {
manager.broadcast_url = url
manager.broadcasting = true
return manager.createBroadcastPipeline()
type BroacastManagerCtx struct {
logger zerolog.Logger
config *config.Capture
pipeline *gst.Pipeline
enabled bool
url string
}
func (manager *CaptureManagerCtx) StopBroadcast() {
manager.broadcasting = false
manager.destroyBroadcastPipeline()
func broadcastNew(config *config.Capture) *BroacastManagerCtx {
return &BroacastManagerCtx{
logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(),
config: config,
enabled: false,
url: "",
}
}
func (manager *CaptureManagerCtx) BroadcastEnabled() bool {
return manager.broadcasting
func (manager *BroacastManagerCtx) Start(url string) error {
manager.url = url
manager.enabled = true
return manager.createPipeline()
}
func (manager *CaptureManagerCtx) BroadcastUrl() string {
return manager.broadcast_url
func (manager *BroacastManagerCtx) Stop() {
manager.enabled = false
manager.destroyPipeline()
}
func (manager *CaptureManagerCtx) createBroadcastPipeline() error {
func (manager *BroacastManagerCtx) Enabled() bool {
return manager.enabled
}
func (manager *BroacastManagerCtx) Url() string {
return manager.url
}
func (manager *BroacastManagerCtx) createPipeline() error {
var err error
manager.logger.Info().
Str("audio_device", manager.config.Device).
Str("video_display", manager.config.Display).
Str("broadcast_pipeline", manager.config.BroadcastPipeline).
Msgf("creating broadcast pipeline")
Msgf("creating pipeline")
manager.broadcast, err = gst.CreateRTMPPipeline(
manager.pipeline, err = gst.CreateRTMPPipeline(
manager.config.Device,
manager.config.Display,
manager.config.BroadcastPipeline,
manager.broadcast_url,
manager.url,
)
if err != nil {
return err
}
manager.broadcast.Play()
manager.logger.Info().Msgf("starting broadcast pipeline")
manager.pipeline.Play()
manager.logger.Info().Msgf("starting pipeline")
return nil
}
func (manager *CaptureManagerCtx) destroyBroadcastPipeline() {
if manager.broadcast == nil {
func (manager *BroacastManagerCtx) destroyPipeline() {
if manager.pipeline == nil {
return
}
manager.broadcast.Stop()
manager.logger.Info().Msgf("stopping broadcast pipeline")
manager.broadcast = nil
manager.pipeline.Stop()
manager.logger.Info().Msgf("stopping pipeline")
manager.pipeline = nil
}

View File

@ -17,7 +17,6 @@ type CaptureManagerCtx struct {
mu sync.Mutex
video *gst.Pipeline
audio *gst.Pipeline
broadcast *gst.Pipeline
config *config.Capture
emit_update chan bool
emit_stop chan bool
@ -25,9 +24,8 @@ type CaptureManagerCtx struct {
audio_sample chan types.Sample
emmiter events.EventEmmiter
streaming bool
broadcasting bool
broadcast_url string
desktop types.DesktopManager
broadcast *BroacastManagerCtx
}
func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
@ -39,15 +37,14 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
emmiter: events.New(),
config: config,
streaming: false,
broadcasting: false,
broadcast_url: "",
desktop: desktop,
broadcast: broadcastNew(config),
}
}
func (manager *CaptureManagerCtx) Start() {
if manager.BroadcastEnabled() {
if err := manager.createBroadcastPipeline(); err != nil {
if manager.broadcast.Enabled() {
if err := manager.broadcast.createPipeline(); err != nil {
manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline")
}
}
@ -57,8 +54,8 @@ func (manager *CaptureManagerCtx) Start() {
manager.destroyVideoPipeline()
}
if manager.BroadcastEnabled() {
manager.destroyBroadcastPipeline()
if manager.broadcast.Enabled() {
manager.broadcast.destroyPipeline()
}
})
@ -67,8 +64,8 @@ func (manager *CaptureManagerCtx) Start() {
manager.createVideoPipeline()
}
if manager.BroadcastEnabled() {
if err := manager.createBroadcastPipeline(); err != nil {
if manager.broadcast.Enabled() {
if err := manager.broadcast.createPipeline(); err != nil {
manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline")
}
}
@ -100,14 +97,18 @@ func (manager *CaptureManagerCtx) Shutdown() error {
manager.StopStream()
}
if manager.BroadcastEnabled() {
manager.destroyBroadcastPipeline()
if manager.broadcast.Enabled() {
manager.broadcast.destroyPipeline()
}
manager.emit_stop <- true
return nil
}
func (manager *CaptureManagerCtx) Broadcast() types.BroadcastManager {
return manager.broadcast
}
func (manager *CaptureManagerCtx) VideoCodec() string {
return manager.config.VideoCodec
}

View File

@ -5,10 +5,19 @@ type Sample struct {
Samples uint32
}
type BroadcastManager interface {
Start(url string) error
Stop()
Enabled() bool
Url() string
}
type CaptureManager interface {
Start()
Shutdown() error
Broadcast() BroadcastManager
VideoCodec() string
AudioCodec() string
@ -18,10 +27,4 @@ type CaptureManager interface {
StartStream()
StopStream()
Streaming() bool
// broacast
StartBroadcast(url string) error
StopBroadcast()
BroadcastEnabled() bool
BroadcastUrl() string
}

View File

@ -75,13 +75,14 @@ func (h *MessageHandlerCtx) systemAdmin(session types.Session) error {
}
}
broadcast := h.capture.Broadcast()
return session.Send(
message.SystemAdmin{
Event: event.SYSTEM_ADMIN,
ScreenSizesList: screenSizesList,
BroadcastStatus: message.BroadcastStatus{
IsActive: h.capture.BroadcastEnabled(),
URL: h.capture.BroadcastUrl(),
IsActive: broadcast.Enabled(),
URL: broadcast.Url(),
},
})
}