diff --git a/internal/capture/gst/gst.c b/internal/capture/gst/gst.c index 22cf1f97..1bc9cab8 100644 --- a/internal/capture/gst/gst.c +++ b/internal/capture/gst/gst.c @@ -114,10 +114,13 @@ static GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpoin } void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName) { - GstElement *appsink = gst_bin_get_by_name(GST_BIN(ctx->pipeline), sinkName); - g_object_set(appsink, "emit-signals", TRUE, NULL); - g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), ctx); - gst_object_unref(appsink); + ctx->appsink = gst_bin_get_by_name(GST_BIN(ctx->pipeline), sinkName); + g_object_set(ctx->appsink, "emit-signals", TRUE, NULL); + g_signal_connect(ctx->appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), ctx); +} + +void gstreamer_pipeline_attach_appsrc(GstPipelineCtx *ctx, char *srcName) { + ctx->appsrc = gst_bin_get_by_name(GST_BIN(ctx->pipeline), srcName); } void gstreamer_pipeline_play(GstPipelineCtx *ctx) { @@ -129,17 +132,34 @@ void gstreamer_pipeline_pause(GstPipelineCtx *ctx) { } void gstreamer_pipeline_destory(GstPipelineCtx *ctx) { + // end appsrc, if exists + if (ctx->appsrc) { + gst_app_src_end_of_stream(GST_APP_SRC(ctx->appsrc)); + } + + // send pipeline eos + gst_element_send_event(GST_ELEMENT(ctx->pipeline), gst_event_new_eos()); + + // set null state gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_NULL); + + if (ctx->appsink) { + gst_object_unref(ctx->appsink); + ctx->appsink = NULL; + } + + if (ctx->appsrc) { + gst_object_unref(ctx->appsrc); + ctx->appsrc = NULL; + } + gst_object_unref(ctx->pipeline); } -void gstreamer_pipeline_push(GstPipelineCtx *ctx, char *srcName, void *buffer, int bufferLen) { - GstElement *src = gst_bin_get_by_name(GST_BIN(ctx->pipeline), srcName); - - if (src != NULL) { +void gstreamer_pipeline_push(GstPipelineCtx *ctx, void *buffer, int bufferLen) { + if (ctx->appsrc != NULL) { gpointer p = g_memdup(buffer, bufferLen); GstBuffer *buffer = gst_buffer_new_wrapped(p, bufferLen); - gst_app_src_push_buffer(GST_APP_SRC(src), buffer); - gst_object_unref(src); + gst_app_src_push_buffer(GST_APP_SRC(ctx->appsrc), buffer); } } diff --git a/internal/capture/gst/gst.go b/internal/capture/gst/gst.go index b4b0238e..e0a50ae9 100644 --- a/internal/capture/gst/gst.go +++ b/internal/capture/gst/gst.go @@ -73,6 +73,13 @@ func (p *Pipeline) AttachAppsink(sinkName string) { C.gstreamer_pipeline_attach_appsink(p.Ctx, sinkNameUnsafe) } +func (p *Pipeline) AttachAppsrc(srcName string) { + srcNameUnsafe := C.CString(srcName) + defer C.free(unsafe.Pointer(srcNameUnsafe)) + + C.gstreamer_pipeline_attach_appsrc(p.Ctx, srcNameUnsafe) +} + func (p *Pipeline) Play() { C.gstreamer_pipeline_play(p.Ctx) } @@ -93,14 +100,11 @@ func (p *Pipeline) Destroy() { p = nil } -func (p *Pipeline) Push(srcName string, buffer []byte) { - srcNameUnsafe := C.CString(srcName) - defer C.free(unsafe.Pointer(srcNameUnsafe)) - +func (p *Pipeline) Push(buffer []byte) { bytes := C.CBytes(buffer) defer C.free(bytes) - C.gstreamer_pipeline_push(p.Ctx, srcNameUnsafe, bytes, C.int(len(buffer))) + C.gstreamer_pipeline_push(p.Ctx, bytes, C.int(len(buffer))) } // gst-inspect-1.0 diff --git a/internal/capture/gst/gst.h b/internal/capture/gst/gst.h index 7475290e..f2af6a2e 100644 --- a/internal/capture/gst/gst.h +++ b/internal/capture/gst/gst.h @@ -7,6 +7,8 @@ typedef struct GstPipelineCtx { int pipelineId; GstElement *pipeline; + GstElement *appsink; + GstElement *appsrc; } GstPipelineCtx; extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); @@ -14,10 +16,11 @@ extern void goPipelineLog(char *level, char *msg, int pipelineId); GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error); void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName); +void gstreamer_pipeline_attach_appsrc(GstPipelineCtx *ctx, char *srcName); void gstreamer_pipeline_play(GstPipelineCtx *ctx); void gstreamer_pipeline_pause(GstPipelineCtx *ctx); void gstreamer_pipeline_destory(GstPipelineCtx *ctx); -void gstreamer_pipeline_push(GstPipelineCtx *ctx, char *srcName, void *buffer, int bufferLen); +void gstreamer_pipeline_push(GstPipelineCtx *ctx, void *buffer, int bufferLen); void gstreamer_init(void); void gstreamer_loop(void); diff --git a/internal/capture/manager.go b/internal/capture/manager.go index 8837319f..19b078d8 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -123,19 +123,19 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt // sources webcam: streamSrcNew(map[string]string{ - codec.VP8().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + + codec.VP8().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=VP8-DRAFT-IETF-01 ", codec.VP8().PayloadType) + "! rtpvp8depay " + "! decodebin " + "! videoconvert " + "! v4l2sink device=/dev/video0", - codec.VP9().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + + codec.VP9().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + "! application/x-rtp " + "! rtpvp9depay " + "! decodebin " + "! videoconvert " + "! v4l2sink device=/dev/video0", - codec.H264().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + + codec.H264().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + "! application/x-rtp " + "! rtph264depay " + "! decodebin " + @@ -143,12 +143,12 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt "! v4l2sink device=/dev/video0", }, "webcam"), microphone: streamSrcNew(map[string]string{ - codec.Opus().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + + codec.Opus().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=OPUS ", codec.Opus().PayloadType) + "! rtpopusdepay " + "! decodebin " + "! pulsesink device=audio_input", - codec.G722().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + + codec.G722().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " + "! application/x-rtp clock-rate=8000 " + "! rtpg722depay " + "! decodebin " + diff --git a/internal/capture/streamsrc.go b/internal/capture/streamsrc.go index 871bf6c9..bbb5ce30 100644 --- a/internal/capture/streamsrc.go +++ b/internal/capture/streamsrc.go @@ -78,6 +78,7 @@ func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error { return err } + manager.pipeline.AttachAppsrc("appsrc") manager.pipeline.Play() return nil } @@ -103,7 +104,7 @@ func (manager *StreamSrcManagerCtx) Push(bytes []byte) { return } - manager.pipeline.Push("src", bytes) + manager.pipeline.Push(bytes) } func (manager *StreamSrcManagerCtx) Started() bool {