From 9e466b08cc1e6a545da6ac601e83dceffa42fadb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sun, 15 Aug 2021 15:37:27 +0200 Subject: [PATCH] catch errors from gst pipeline. --- server/internal/broadcast/manager.go | 34 +++++++++++++++++++------- server/internal/gst/gst.c | 5 ++-- server/internal/gst/gst.go | 9 ++++++- server/internal/gst/gst.h | 2 +- server/internal/remote/manager.go | 8 ++++-- server/internal/types/broadcast.go | 4 +-- server/internal/websocket/broadcast.go | 4 ++- 7 files changed, 47 insertions(+), 19 deletions(-) diff --git a/server/internal/broadcast/manager.go b/server/internal/broadcast/manager.go index d6ebbe0b..5f7ac834 100644 --- a/server/internal/broadcast/manager.go +++ b/server/internal/broadcast/manager.go @@ -1,6 +1,8 @@ package broadcast import ( + "sync" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -9,6 +11,7 @@ import ( ) type BroadcastManager struct { + mu sync.Mutex logger zerolog.Logger pipeline *gst.Pipeline remote *config.Remote @@ -27,9 +30,9 @@ func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager { } } -func (manager *BroadcastManager) Start() { +func (manager *BroadcastManager) Start() error { if !manager.enabled || manager.IsActive() { - return + return nil } var err error @@ -40,18 +43,19 @@ func (manager *BroadcastManager) Start() { manager.url, ) + if err != nil { + manager.pipeline = nil + return err + } + manager.logger.Info(). Str("audio_device", manager.remote.Device). Str("video_display", manager.remote.Display). Str("rtmp_pipeline_src", manager.pipeline.Src). Msgf("RTMP pipeline is starting...") - if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") - return - } - manager.pipeline.Play() + return nil } func (manager *BroadcastManager) Stop() { @@ -67,13 +71,25 @@ func (manager *BroadcastManager) IsActive() bool { return manager.pipeline != nil } -func (manager *BroadcastManager) Create(url string) { +func (manager *BroadcastManager) Create(url string) error { + manager.mu.Lock() + defer manager.mu.Unlock() + manager.url = url manager.enabled = true - manager.Start() + + err := manager.Start() + if err != nil { + manager.enabled = false + } + + return err } func (manager *BroadcastManager) Destroy() { + manager.mu.Lock() + defer manager.mu.Unlock() + manager.Stop() manager.enabled = false } diff --git a/server/internal/gst/gst.c b/server/internal/gst/gst.c index 924787fb..0c4c4ff3 100644 --- a/server/internal/gst/gst.c +++ b/server/internal/gst/gst.c @@ -63,9 +63,8 @@ GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer use return GST_FLOW_OK; } -GstElement *gstreamer_send_create_pipeline(char *pipeline) { - GError *error = NULL; - return gst_parse_launch(pipeline, &error); +GstElement *gstreamer_send_create_pipeline(char *pipeline, GError **error) { + return gst_parse_launch(pipeline, error); } void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) { diff --git a/server/internal/gst/gst.go b/server/internal/gst/gst.go index ed207ea4..fe23bcb4 100644 --- a/server/internal/gst/gst.go +++ b/server/internal/gst/gst.go @@ -208,8 +208,15 @@ func CreatePipeline(pipelineStr string) (*Pipeline, error) { pipelinesLock.Lock() defer pipelinesLock.Unlock() + var err *C.GError + gstPipeline := C.gstreamer_send_create_pipeline(pipelineStrUnsafe, &err) + if err != nil { + defer C.g_error_free(err) + return nil, fmt.Errorf("%s", C.GoString(err.message)) + } + p := &Pipeline{ - Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe), + Pipeline: gstPipeline, Sample: make(chan types.Sample), Src: pipelineStr, id: len(pipelines), diff --git a/server/internal/gst/gst.h b/server/internal/gst/gst.h index 7c6965f4..b07f3e8e 100644 --- a/server/internal/gst/gst.h +++ b/server/internal/gst/gst.h @@ -8,7 +8,7 @@ extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); -GstElement *gstreamer_send_create_pipeline(char *pipeline); +GstElement *gstreamer_send_create_pipeline(char *pipeline, GError **error); void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId); void gstreamer_send_play_pipeline(GstElement *pipeline); diff --git a/server/internal/remote/manager.go b/server/internal/remote/manager.go index 02db80fd..e78f9444 100644 --- a/server/internal/remote/manager.go +++ b/server/internal/remote/manager.go @@ -56,7 +56,9 @@ func (manager *RemoteManager) Start() { } manager.createPipelines() - manager.broadcast.Start() + if err := manager.broadcast.Start(); err != nil { + manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") + } go func() { defer func() { @@ -171,7 +173,9 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int) defer func() { manager.video.Start() - manager.broadcast.Start() + if err := manager.broadcast.Start(); err != nil { + manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline") + } manager.logger.Info().Msg("starting video pipeline...") }() diff --git a/server/internal/types/broadcast.go b/server/internal/types/broadcast.go index fee3fd52..896bed17 100644 --- a/server/internal/types/broadcast.go +++ b/server/internal/types/broadcast.go @@ -1,10 +1,10 @@ package types type BroadcastManager interface { - Start() + Start() error Stop() IsActive() bool - Create(url string) + Create(url string) error Destroy() GetUrl() string } diff --git a/server/internal/websocket/broadcast.go b/server/internal/websocket/broadcast.go index c2d6a884..fc7259ad 100644 --- a/server/internal/websocket/broadcast.go +++ b/server/internal/websocket/broadcast.go @@ -12,7 +12,9 @@ func (h *MessageHandler) boradcastCreate(session types.Session, payload *message return nil } - h.broadcast.Create(payload.URL) + if err := h.broadcast.Create(payload.URL); err != nil { + return err + } if err := h.boradcastStatus(session); err != nil { return err