catch errors from gst pipeline.
This commit is contained in:
parent
6708ce2caf
commit
9e466b08cc
@ -1,6 +1,8 @@
|
||||
package broadcast
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
@ -9,6 +11,7 @@ import (
|
||||
)
|
||||
|
||||
type BroadcastManager struct {
|
||||
mu sync.Mutex
|
||||
logger zerolog.Logger
|
||||
pipeline *gst.Pipeline
|
||||
remote *config.Remote
|
||||
@ -27,9 +30,9 @@ func New(remote *config.Remote, config *config.Broadcast) *BroadcastManager {
|
||||
}
|
||||
}
|
||||
|
||||
func (manager *BroadcastManager) Start() {
|
||||
func (manager *BroadcastManager) Start() error {
|
||||
if !manager.enabled || manager.IsActive() {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
@ -40,18 +43,19 @@ func (manager *BroadcastManager) Start() {
|
||||
manager.url,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
manager.pipeline = nil
|
||||
return err
|
||||
}
|
||||
|
||||
manager.logger.Info().
|
||||
Str("audio_device", manager.remote.Device).
|
||||
Str("video_display", manager.remote.Display).
|
||||
Str("rtmp_pipeline_src", manager.pipeline.Src).
|
||||
Msgf("RTMP pipeline is starting...")
|
||||
|
||||
if err != nil {
|
||||
manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline")
|
||||
return
|
||||
}
|
||||
|
||||
manager.pipeline.Play()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (manager *BroadcastManager) Stop() {
|
||||
@ -67,13 +71,25 @@ func (manager *BroadcastManager) IsActive() bool {
|
||||
return manager.pipeline != nil
|
||||
}
|
||||
|
||||
func (manager *BroadcastManager) Create(url string) {
|
||||
func (manager *BroadcastManager) Create(url string) error {
|
||||
manager.mu.Lock()
|
||||
defer manager.mu.Unlock()
|
||||
|
||||
manager.url = url
|
||||
manager.enabled = true
|
||||
manager.Start()
|
||||
|
||||
err := manager.Start()
|
||||
if err != nil {
|
||||
manager.enabled = false
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (manager *BroadcastManager) Destroy() {
|
||||
manager.mu.Lock()
|
||||
defer manager.mu.Unlock()
|
||||
|
||||
manager.Stop()
|
||||
manager.enabled = false
|
||||
}
|
||||
|
@ -63,9 +63,8 @@ GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer use
|
||||
return GST_FLOW_OK;
|
||||
}
|
||||
|
||||
GstElement *gstreamer_send_create_pipeline(char *pipeline) {
|
||||
GError *error = NULL;
|
||||
return gst_parse_launch(pipeline, &error);
|
||||
GstElement *gstreamer_send_create_pipeline(char *pipeline, GError **error) {
|
||||
return gst_parse_launch(pipeline, error);
|
||||
}
|
||||
|
||||
void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) {
|
||||
|
@ -208,8 +208,15 @@ func CreatePipeline(pipelineStr string) (*Pipeline, error) {
|
||||
pipelinesLock.Lock()
|
||||
defer pipelinesLock.Unlock()
|
||||
|
||||
var err *C.GError
|
||||
gstPipeline := C.gstreamer_send_create_pipeline(pipelineStrUnsafe, &err)
|
||||
if err != nil {
|
||||
defer C.g_error_free(err)
|
||||
return nil, fmt.Errorf("%s", C.GoString(err.message))
|
||||
}
|
||||
|
||||
p := &Pipeline{
|
||||
Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe),
|
||||
Pipeline: gstPipeline,
|
||||
Sample: make(chan types.Sample),
|
||||
Src: pipelineStr,
|
||||
id: len(pipelines),
|
||||
|
@ -8,7 +8,7 @@
|
||||
|
||||
extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId);
|
||||
|
||||
GstElement *gstreamer_send_create_pipeline(char *pipeline);
|
||||
GstElement *gstreamer_send_create_pipeline(char *pipeline, GError **error);
|
||||
|
||||
void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId);
|
||||
void gstreamer_send_play_pipeline(GstElement *pipeline);
|
||||
|
@ -56,7 +56,9 @@ func (manager *RemoteManager) Start() {
|
||||
}
|
||||
|
||||
manager.createPipelines()
|
||||
manager.broadcast.Start()
|
||||
if err := manager.broadcast.Start(); err != nil {
|
||||
manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline")
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
@ -171,7 +173,9 @@ func (manager *RemoteManager) ChangeResolution(width int, height int, rate int)
|
||||
|
||||
defer func() {
|
||||
manager.video.Start()
|
||||
manager.broadcast.Start()
|
||||
if err := manager.broadcast.Start(); err != nil {
|
||||
manager.logger.Panic().Err(err).Msg("unable to create rtmp pipeline")
|
||||
}
|
||||
|
||||
manager.logger.Info().Msg("starting video pipeline...")
|
||||
}()
|
||||
|
@ -1,10 +1,10 @@
|
||||
package types
|
||||
|
||||
type BroadcastManager interface {
|
||||
Start()
|
||||
Start() error
|
||||
Stop()
|
||||
IsActive() bool
|
||||
Create(url string)
|
||||
Create(url string) error
|
||||
Destroy()
|
||||
GetUrl() string
|
||||
}
|
||||
|
@ -12,7 +12,9 @@ func (h *MessageHandler) boradcastCreate(session types.Session, payload *message
|
||||
return nil
|
||||
}
|
||||
|
||||
h.broadcast.Create(payload.URL)
|
||||
if err := h.broadcast.Create(payload.URL); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := h.boradcastStatus(session); err != nil {
|
||||
return err
|
||||
|
Reference in New Issue
Block a user