From b932e94f772e25f29fa37404a2993cd50fc75e47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sun, 5 Dec 2021 18:16:26 +0100 Subject: [PATCH] gst refactor to use ctx. --- internal/capture/broadcast.go | 2 +- internal/capture/gst/gst.c | 82 ++++++++++++++++++---------------- internal/capture/gst/gst.go | 60 ++++++++++++++++--------- internal/capture/gst/gst.h | 20 ++++++--- internal/capture/screencast.go | 2 +- internal/capture/streamsink.go | 2 +- internal/capture/streamsrc.go | 2 +- 7 files changed, 99 insertions(+), 71 deletions(-) diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index d5510ca0..22dfe2f2 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -113,7 +113,7 @@ func (manager *BroacastManagerCtx) destroyPipeline() { return } - manager.pipeline.Stop() + manager.pipeline.Destroy() manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil } diff --git a/internal/capture/gst/gst.c b/internal/capture/gst/gst.c index 49aa3700..60e675c1 100644 --- a/internal/capture/gst/gst.c +++ b/internal/capture/gst/gst.c @@ -1,9 +1,5 @@ #include "gst.h" -typedef struct SampleHandlerUserData { - int pipelineId; -} SampleHandlerUserData; - void gstreamer_init(void) { gst_init(NULL, NULL); } @@ -14,27 +10,29 @@ void gstreamer_loop(void) { g_main_loop_run(gstreamer_main_loop); } -static void gstreamer_pipeline_log(char* level, const char* format, ...) { +static void gstreamer_pipeline_log(GstPipelineCtx *ctx, char* level, const char* format, ...) { va_list argptr; va_start(argptr, format); char buffer[100]; vsprintf(buffer, format, argptr); va_end(argptr); - goPipelineLog(level, buffer); + goPipelineLog(level, buffer, ctx->pipelineId); } -static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer data) { +static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer user_data) { + GstPipelineCtx *ctx = (GstPipelineCtx *)user_data; + switch (GST_MESSAGE_TYPE(msg)) { case GST_MESSAGE_EOS: { - gstreamer_pipeline_log("panic", "end of stream"); - exit(1); + gstreamer_pipeline_log(ctx, "fatal", "end of stream"); + break; } case GST_MESSAGE_STATE_CHANGED: { GstState old_state, new_state; gst_message_parse_state_changed(msg, &old_state, &new_state, NULL); - gstreamer_pipeline_log("debug", + gstreamer_pipeline_log(ctx, "debug", "element %s changed state from %s to %s", GST_OBJECT_NAME(msg->src), gst_element_state_get_name(old_state), @@ -46,7 +44,7 @@ static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer data) GstTagList *tags = NULL; gst_message_parse_tag(msg, &tags); - gstreamer_pipeline_log("debug", + gstreamer_pipeline_log(ctx, "debug", "got tags from element %s", GST_OBJECT_NAME(msg->src)); @@ -59,10 +57,10 @@ static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer data) gchar *dbg_info = NULL; gst_message_parse_error(msg, &err, &dbg_info); - gstreamer_pipeline_log("error", + gstreamer_pipeline_log(ctx, "error", "error from element %s: %s", GST_OBJECT_NAME(msg->src), err->message); - gstreamer_pipeline_log("warn", + gstreamer_pipeline_log(ctx, "warn", "debugging info: %s", (dbg_info) ? dbg_info : "none"); @@ -72,26 +70,42 @@ static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer data) } default: - gstreamer_pipeline_log("trace", "unknown message"); + gstreamer_pipeline_log(ctx, "trace", "unknown message"); break; } return TRUE; } +GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error) { + GstElement *pipeline = gst_parse_launch(pipelineStr, error); + if (pipeline == NULL) return NULL; + + // create gstreamer pipeline context + GstPipelineCtx *ctx = calloc(1, sizeof(GstPipelineCtx)); + ctx->pipelineId = pipelineId; + ctx->pipeline = pipeline; + + GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, gstreamer_bus_call, ctx); + gst_object_unref(bus); + + return ctx; +} + static GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) { + GstPipelineCtx *ctx = (GstPipelineCtx *)user_data; GstSample *sample = NULL; GstBuffer *buffer = NULL; gpointer copy = NULL; gsize copy_size = 0; - SampleHandlerUserData *s = (SampleHandlerUserData *)user_data; g_signal_emit_by_name(object, "pull-sample", &sample); if (sample) { buffer = gst_sample_get_buffer(sample); if (buffer) { gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), ©, ©_size); - goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), s->pipelineId); + goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), ctx->pipelineId); } gst_sample_unref(sample); } @@ -99,39 +113,29 @@ static GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpoin return GST_FLOW_OK; } -void gstreamer_pipeline_attach_appsink(GstElement *pipeline, char *sinkName, int pipelineId) { - SampleHandlerUserData *s = calloc(1, sizeof(SampleHandlerUserData)); - s->pipelineId = pipelineId; - - GstElement *appsink = gst_bin_get_by_name(GST_BIN(pipeline), sinkName); +void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName) { + GstElement *appsink = gst_bin_get_by_name(GST_BIN(ctx->pipeline), sinkName); g_object_set(appsink, "emit-signals", TRUE, NULL); - g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), s); + g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), ctx); gst_object_unref(appsink); } -GstElement *gstreamer_pipeline_create(char *pipelineStr, GError **error) { - GstElement *pipeline = gst_parse_launch(pipelineStr, error); - - if (pipeline != NULL) { - GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); - gst_bus_add_watch(bus, gstreamer_bus_call, NULL); - gst_object_unref(bus); - } - - return pipeline; +void gstreamer_pipeline_play(GstPipelineCtx *ctx) { + gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_PLAYING); } -void gstreamer_pipeline_play(GstElement *pipeline) { - gst_element_set_state(pipeline, GST_STATE_PLAYING); +void gstreamer_pipeline_pause(GstPipelineCtx *ctx) { + gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_PAUSED); } -void gstreamer_pipeline_stop(GstElement *pipeline) { - gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(pipeline); +void gstreamer_pipeline_destory(GstPipelineCtx *ctx) { + gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_NULL); + gst_object_unref(ctx->pipeline); + free(ctx); } -void gstreamer_pipeline_push(GstElement *pipeline, char *srcName, void *buffer, int bufferLen) { - GstElement *src = gst_bin_get_by_name(GST_BIN(pipeline), srcName); +void gstreamer_pipeline_push(GstPipelineCtx *ctx, char *srcName, void *buffer, int bufferLen) { + GstElement *src = gst_bin_get_by_name(GST_BIN(ctx->pipeline), srcName); if (src != NULL) { gpointer p = g_memdup(buffer, bufferLen); diff --git a/internal/capture/gst/gst.go b/internal/capture/gst/gst.go index 2a92d691..158fa458 100644 --- a/internal/capture/gst/gst.go +++ b/internal/capture/gst/gst.go @@ -19,10 +19,10 @@ import ( ) type Pipeline struct { - Pipeline *C.GstElement - Sample chan types.Sample - Src string - id int + id int + Src string + Ctx *C.GstPipelineCtx + Sample chan types.Sample } var pipelines = make(map[int]*Pipeline) @@ -43,10 +43,10 @@ func CreatePipeline(pipelineStr string) (*Pipeline, error) { pipelinesLock.Lock() defer pipelinesLock.Unlock() - var gstPipeline *C.GstElement - var gstError *C.GError + id := len(pipelines) - gstPipeline = C.gstreamer_pipeline_create(pipelineStrUnsafe, &gstError) + var gstError *C.GError + ctx := C.gstreamer_pipeline_create(pipelineStrUnsafe, C.int(id), &gstError) if gstError != nil { defer C.g_error_free(gstError) @@ -54,10 +54,10 @@ func CreatePipeline(pipelineStr string) (*Pipeline, error) { } p := &Pipeline{ - Pipeline: gstPipeline, - Sample: make(chan types.Sample), - Src: pipelineStr, - id: len(pipelines), + id: id, + Src: pipelineStr, + Ctx: ctx, + Sample: make(chan types.Sample), } pipelines[p.id] = p @@ -68,15 +68,21 @@ func (p *Pipeline) AttachAppsink(sinkName string) { sinkNameUnsafe := C.CString(sinkName) defer C.free(unsafe.Pointer(sinkNameUnsafe)) - C.gstreamer_pipeline_attach_appsink(p.Pipeline, sinkNameUnsafe, C.int(p.id)) + C.gstreamer_pipeline_attach_appsink(p.Ctx, sinkNameUnsafe) } func (p *Pipeline) Play() { - C.gstreamer_pipeline_play(p.Pipeline) + C.gstreamer_pipeline_play(p.Ctx) } -func (p *Pipeline) Stop() { - C.gstreamer_pipeline_stop(p.Pipeline) +func (p *Pipeline) Pause() { + C.gstreamer_pipeline_pause(p.Ctx) +} + +func (p *Pipeline) Destroy() { + C.gstreamer_pipeline_destory(p.Ctx) + p.Ctx = nil + close(p.Sample) } func (p *Pipeline) Push(srcName string, buffer []byte) { @@ -86,7 +92,7 @@ func (p *Pipeline) Push(srcName string, buffer []byte) { bytes := C.CBytes(buffer) defer C.free(bytes) - C.gstreamer_pipeline_push(p.Pipeline, srcNameUnsafe, bytes, C.int(len(buffer))) + C.gstreamer_pipeline_push(p.Ctx, srcNameUnsafe, bytes, C.int(len(buffer))) } // gst-inspect-1.0 @@ -121,18 +127,30 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i log.Warn(). Str("module", "capture"). Str("submodule", "gstreamer"). - Msgf("discarding buffer, no pipeline with id %d", int(pipelineID)) + Msgf("discarding sample, no pipeline with id %d", int(pipelineID)) } } //export goPipelineLog -func goPipelineLog(levelUnsafe *C.char, msgUnsafe *C.char) { +func goPipelineLog(levelUnsafe *C.char, msgUnsafe *C.char, pipelineID C.int) { levelStr := C.GoString(levelUnsafe) msg := C.GoString(msgUnsafe) - level, _ := zerolog.ParseLevel(levelStr) - log.WithLevel(level). + logger := log.With(). Str("module", "capture"). Str("submodule", "gstreamer"). - Msg(msg) + Logger() + + pipelinesLock.Lock() + pipeline, ok := pipelines[int(pipelineID)] + pipelinesLock.Unlock() + + if ok { + logger = logger.With(). + Int("id", pipeline.id). + Logger() + } + + level, _ := zerolog.ParseLevel(levelStr) + logger.WithLevel(level).Msg(msg) } diff --git a/internal/capture/gst/gst.h b/internal/capture/gst/gst.h index fc72c4a0..7475290e 100644 --- a/internal/capture/gst/gst.h +++ b/internal/capture/gst/gst.h @@ -4,14 +4,20 @@ #include #include -extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); -extern void goPipelineLog(char *level, char *msg); +typedef struct GstPipelineCtx { + int pipelineId; + GstElement *pipeline; +} GstPipelineCtx; -void gstreamer_pipeline_attach_appsink(GstElement *pipeline, char *sinkName, int pipelineId); -GstElement *gstreamer_pipeline_create(char *pipelineStr, GError **error); -void gstreamer_pipeline_play(GstElement *pipeline); -void gstreamer_pipeline_stop(GstElement *pipeline); -void gstreamer_pipeline_push(GstElement *pipeline, char *srcName, void *buffer, int bufferLen); +extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); +extern void goPipelineLog(char *level, char *msg, int pipelineId); + +GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error); +void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName); +void gstreamer_pipeline_play(GstPipelineCtx *ctx); +void gstreamer_pipeline_pause(GstPipelineCtx *ctx); +void gstreamer_pipeline_destory(GstPipelineCtx *ctx); +void gstreamer_pipeline_push(GstPipelineCtx *ctx, char *srcName, void *buffer, int bufferLen); void gstreamer_init(void); void gstreamer_loop(void); diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index 528ac83b..8a78f35f 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -186,7 +186,7 @@ func (manager *ScreencastManagerCtx) destroyPipeline() { return } - manager.pipeline.Stop() + manager.pipeline.Destroy() manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil } diff --git a/internal/capture/streamsink.go b/internal/capture/streamsink.go index dfb40152..39b164a6 100644 --- a/internal/capture/streamsink.go +++ b/internal/capture/streamsink.go @@ -262,7 +262,7 @@ func (manager *StreamSinkManagerCtx) destroyPipeline() { return } - manager.pipeline.Stop() + manager.pipeline.Destroy() manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil } diff --git a/internal/capture/streamsrc.go b/internal/capture/streamsrc.go index 7630b8da..871bf6c9 100644 --- a/internal/capture/streamsrc.go +++ b/internal/capture/streamsrc.go @@ -90,7 +90,7 @@ func (manager *StreamSrcManagerCtx) Stop() { return } - manager.pipeline.Stop() + manager.pipeline.Destroy() manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil }