diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index ef7e35b5..3f9ce0a5 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -37,7 +37,7 @@ func (manager *CaptureManagerCtx) StopBroadcastPipeline() { return } - manager.broadcast.DestroyPipeline() + manager.broadcast.Stop() manager.logger.Info().Msgf("Stopping broadcast pipeline...") manager.broadcast = nil } diff --git a/internal/capture/gst/gst.c b/internal/capture/gst/gst.c index b68dccee..4e78974d 100644 --- a/internal/capture/gst/gst.c +++ b/internal/capture/gst/gst.c @@ -68,11 +68,6 @@ GstElement *gstreamer_send_create_pipeline(char *pipeline) { return gst_parse_launch(pipeline, &error); } -void gstreamer_send_destroy_pipeline(GstElement *pipeline) { - gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(pipeline); -} - void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) { SampleHandlerUserData *s = calloc(1, sizeof(SampleHandlerUserData)); s->pipelineId = pipelineId; @@ -95,4 +90,5 @@ void gstreamer_send_play_pipeline(GstElement *pipeline) { void gstreamer_send_stop_pipeline(GstElement *pipeline) { gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(pipeline); } diff --git a/internal/capture/gst/gst.go b/internal/capture/gst/gst.go index 2a138a2d..a2c884b9 100644 --- a/internal/capture/gst/gst.go +++ b/internal/capture/gst/gst.go @@ -210,11 +210,6 @@ func CreatePipeline(pipelineStr string, codecName string, clockRate float32) (*P return p, nil } -// Destroy GStreamer Pipeline -func (p *Pipeline) DestroyPipeline() { - C.gstreamer_send_destroy_pipeline(p.Pipeline) -} - // Start starts the GStreamer Pipeline func (p *Pipeline) Start() { C.gstreamer_send_start_pipeline(p.Pipeline, C.int(p.id)) diff --git a/internal/capture/gst/gst.h b/internal/capture/gst/gst.h index 19b3b5b7..7c6965f4 100644 --- a/internal/capture/gst/gst.h +++ b/internal/capture/gst/gst.h @@ -9,7 +9,6 @@ extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); GstElement *gstreamer_send_create_pipeline(char *pipeline); -void gstreamer_send_destroy_pipeline(GstElement *pipeline); void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId); void gstreamer_send_play_pipeline(GstElement *pipeline); diff --git a/internal/capture/manager.go b/internal/capture/manager.go index 678d0ecf..cdf9d067 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -18,7 +18,8 @@ type CaptureManagerCtx struct { audio *gst.Pipeline broadcast *gst.Pipeline config *config.Capture - shutdown chan bool + audio_stop chan bool + video_stop chan bool emmiter events.EventEmmiter streaming bool broadcasting bool @@ -29,7 +30,8 @@ type CaptureManagerCtx struct { func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { return &CaptureManagerCtx{ logger: log.With().Str("module", "capture").Logger(), - shutdown: make(chan bool), + audio_stop: make(chan bool), + video_stop: make(chan bool), emmiter: events.New(), config: config, streaming: false, @@ -48,35 +50,15 @@ func (manager *CaptureManagerCtx) Start() { manager.logger.Warn().Err(err).Msg("unable to change screen size") } - manager.CreateVideoPipeline() - manager.CreateAudioPipeline() manager.StartBroadcastPipeline() - - go func() { - defer func() { - manager.logger.Info().Msg("shutdown") - }() - - for { - select { - case <-manager.shutdown: - return - case sample := <-manager.video.Sample: - manager.emmiter.Emit("video", sample) - case sample := <-manager.audio.Sample: - manager.emmiter.Emit("audio", sample) - } - } - }() } func (manager *CaptureManagerCtx) Shutdown() error { manager.logger.Info().Msgf("capture shutting down") - manager.video.DestroyPipeline() - manager.audio.DestroyPipeline() + manager.audio_stop <- true + manager.video_stop <- true manager.StopBroadcastPipeline() - manager.shutdown <- true return nil } @@ -103,16 +85,16 @@ func (manager *CaptureManagerCtx) OnAudioFrame(listener func(sample types.Sample func (manager *CaptureManagerCtx) StartStream() { manager.logger.Info().Msgf("Pipelines starting...") - manager.video.Start() - manager.audio.Start() + manager.createVideoPipeline() + manager.createAudioPipeline() manager.streaming = true } func (manager *CaptureManagerCtx) StopStream() { - manager.logger.Info().Msgf("Pipelines shutting down...") + manager.logger.Info().Msgf("Pipelines stopping...") - manager.video.Stop() - manager.audio.Stop() + manager.audio_stop <- true + manager.video_stop <- true manager.streaming = false } @@ -120,7 +102,19 @@ func (manager *CaptureManagerCtx) Streaming() bool { return manager.streaming } -func (manager *CaptureManagerCtx) CreateVideoPipeline() { +func (manager *CaptureManagerCtx) ChangeResolution(width int, height int, rate int) error { + manager.video_stop <- true + manager.StopBroadcastPipeline() + + defer func() { + manager.createVideoPipeline() + manager.StartBroadcastPipeline() + }() + + return manager.desktop.ChangeScreenSize(width, height, rate) +} + +func (manager *CaptureManagerCtx) createVideoPipeline() { var err error manager.logger.Info(). @@ -138,9 +132,34 @@ func (manager *CaptureManagerCtx) CreateVideoPipeline() { if err != nil { manager.logger.Panic().Err(err).Msg("unable to create video pipeline") } + + manager.logger.Info(). + Str("pipeline", manager.video.Src). + Msgf("Starting video pipeline...") + + manager.video.Start() + + go func() { + manager.logger.Debug().Msg("started emitting video data") + + defer func() { + manager.logger.Debug().Msg("stopped emitting video data") + }() + + for { + select { + case <-manager.video_stop: + manager.logger.Info().Msgf("Stopping video pipeline...") + manager.video.Stop() + return + case sample := <-manager.video.Sample: + manager.emmiter.Emit("video", sample) + } + } + }() } -func (manager *CaptureManagerCtx) CreateAudioPipeline() { +func (manager *CaptureManagerCtx) createAudioPipeline() { var err error manager.logger.Info(). @@ -158,20 +177,29 @@ func (manager *CaptureManagerCtx) CreateAudioPipeline() { if err != nil { manager.logger.Panic().Err(err).Msg("unable to create audio pipeline") } -} -func (manager *CaptureManagerCtx) ChangeResolution(width int, height int, rate int) error { - manager.video.DestroyPipeline() - manager.StopBroadcastPipeline() + manager.logger.Info(). + Str("pipeline", manager.audio.Src). + Msgf("Starting audio pipeline...") - defer func() { - manager.CreateVideoPipeline() - - manager.video.Start() - manager.logger.Info().Msg("starting video pipeline...") - - manager.StartBroadcastPipeline() + manager.audio.Start() + + go func() { + manager.logger.Debug().Msg("started emitting audio data") + + defer func() { + manager.logger.Debug().Msg("stopped emitting audio data") + }() + + for { + select { + case <-manager.audio_stop: + manager.logger.Info().Msgf("Stopping audio pipeline...") + manager.audio.Stop() + return + case sample := <-manager.audio.Sample: + manager.emmiter.Emit("audio", sample) + } + } }() - - return manager.desktop.ChangeScreenSize(width, height, rate) }