keep appsrc / appsink in ctx.

This commit is contained in:
Miroslav Šedivý 2021-12-05 22:06:42 +01:00
parent 8de5cb2f4a
commit 9a6eb942a4
5 changed files with 50 additions and 22 deletions

View File

@ -114,10 +114,13 @@ static GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpoin
} }
void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName) { void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName) {
GstElement *appsink = gst_bin_get_by_name(GST_BIN(ctx->pipeline), sinkName); ctx->appsink = gst_bin_get_by_name(GST_BIN(ctx->pipeline), sinkName);
g_object_set(appsink, "emit-signals", TRUE, NULL); g_object_set(ctx->appsink, "emit-signals", TRUE, NULL);
g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), ctx); g_signal_connect(ctx->appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), ctx);
gst_object_unref(appsink); }
void gstreamer_pipeline_attach_appsrc(GstPipelineCtx *ctx, char *srcName) {
ctx->appsrc = gst_bin_get_by_name(GST_BIN(ctx->pipeline), srcName);
} }
void gstreamer_pipeline_play(GstPipelineCtx *ctx) { void gstreamer_pipeline_play(GstPipelineCtx *ctx) {
@ -129,17 +132,34 @@ void gstreamer_pipeline_pause(GstPipelineCtx *ctx) {
} }
void gstreamer_pipeline_destory(GstPipelineCtx *ctx) { void gstreamer_pipeline_destory(GstPipelineCtx *ctx) {
// end appsrc, if exists
if (ctx->appsrc) {
gst_app_src_end_of_stream(GST_APP_SRC(ctx->appsrc));
}
// send pipeline eos
gst_element_send_event(GST_ELEMENT(ctx->pipeline), gst_event_new_eos());
// set null state
gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_NULL); gst_element_set_state(GST_ELEMENT(ctx->pipeline), GST_STATE_NULL);
if (ctx->appsink) {
gst_object_unref(ctx->appsink);
ctx->appsink = NULL;
}
if (ctx->appsrc) {
gst_object_unref(ctx->appsrc);
ctx->appsrc = NULL;
}
gst_object_unref(ctx->pipeline); gst_object_unref(ctx->pipeline);
} }
void gstreamer_pipeline_push(GstPipelineCtx *ctx, char *srcName, void *buffer, int bufferLen) { void gstreamer_pipeline_push(GstPipelineCtx *ctx, void *buffer, int bufferLen) {
GstElement *src = gst_bin_get_by_name(GST_BIN(ctx->pipeline), srcName); if (ctx->appsrc != NULL) {
if (src != NULL) {
gpointer p = g_memdup(buffer, bufferLen); gpointer p = g_memdup(buffer, bufferLen);
GstBuffer *buffer = gst_buffer_new_wrapped(p, bufferLen); GstBuffer *buffer = gst_buffer_new_wrapped(p, bufferLen);
gst_app_src_push_buffer(GST_APP_SRC(src), buffer); gst_app_src_push_buffer(GST_APP_SRC(ctx->appsrc), buffer);
gst_object_unref(src);
} }
} }

View File

@ -73,6 +73,13 @@ func (p *Pipeline) AttachAppsink(sinkName string) {
C.gstreamer_pipeline_attach_appsink(p.Ctx, sinkNameUnsafe) C.gstreamer_pipeline_attach_appsink(p.Ctx, sinkNameUnsafe)
} }
func (p *Pipeline) AttachAppsrc(srcName string) {
srcNameUnsafe := C.CString(srcName)
defer C.free(unsafe.Pointer(srcNameUnsafe))
C.gstreamer_pipeline_attach_appsrc(p.Ctx, srcNameUnsafe)
}
func (p *Pipeline) Play() { func (p *Pipeline) Play() {
C.gstreamer_pipeline_play(p.Ctx) C.gstreamer_pipeline_play(p.Ctx)
} }
@ -93,14 +100,11 @@ func (p *Pipeline) Destroy() {
p = nil p = nil
} }
func (p *Pipeline) Push(srcName string, buffer []byte) { func (p *Pipeline) Push(buffer []byte) {
srcNameUnsafe := C.CString(srcName)
defer C.free(unsafe.Pointer(srcNameUnsafe))
bytes := C.CBytes(buffer) bytes := C.CBytes(buffer)
defer C.free(bytes) defer C.free(bytes)
C.gstreamer_pipeline_push(p.Ctx, srcNameUnsafe, bytes, C.int(len(buffer))) C.gstreamer_pipeline_push(p.Ctx, bytes, C.int(len(buffer)))
} }
// gst-inspect-1.0 // gst-inspect-1.0

View File

@ -7,6 +7,8 @@
typedef struct GstPipelineCtx { typedef struct GstPipelineCtx {
int pipelineId; int pipelineId;
GstElement *pipeline; GstElement *pipeline;
GstElement *appsink;
GstElement *appsrc;
} GstPipelineCtx; } GstPipelineCtx;
extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId);
@ -14,10 +16,11 @@ extern void goPipelineLog(char *level, char *msg, int pipelineId);
GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error); GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error);
void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName); void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName);
void gstreamer_pipeline_attach_appsrc(GstPipelineCtx *ctx, char *srcName);
void gstreamer_pipeline_play(GstPipelineCtx *ctx); void gstreamer_pipeline_play(GstPipelineCtx *ctx);
void gstreamer_pipeline_pause(GstPipelineCtx *ctx); void gstreamer_pipeline_pause(GstPipelineCtx *ctx);
void gstreamer_pipeline_destory(GstPipelineCtx *ctx); void gstreamer_pipeline_destory(GstPipelineCtx *ctx);
void gstreamer_pipeline_push(GstPipelineCtx *ctx, char *srcName, void *buffer, int bufferLen); void gstreamer_pipeline_push(GstPipelineCtx *ctx, void *buffer, int bufferLen);
void gstreamer_init(void); void gstreamer_init(void);
void gstreamer_loop(void); void gstreamer_loop(void);

View File

@ -123,19 +123,19 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
// sources // sources
webcam: streamSrcNew(map[string]string{ webcam: streamSrcNew(map[string]string{
codec.VP8().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + codec.VP8().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=VP8-DRAFT-IETF-01 ", codec.VP8().PayloadType) + fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=VP8-DRAFT-IETF-01 ", codec.VP8().PayloadType) +
"! rtpvp8depay " + "! rtpvp8depay " +
"! decodebin " + "! decodebin " +
"! videoconvert " + "! videoconvert " +
"! v4l2sink device=/dev/video0", "! v4l2sink device=/dev/video0",
codec.VP9().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + codec.VP9().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
"! application/x-rtp " + "! application/x-rtp " +
"! rtpvp9depay " + "! rtpvp9depay " +
"! decodebin " + "! decodebin " +
"! videoconvert " + "! videoconvert " +
"! v4l2sink device=/dev/video0", "! v4l2sink device=/dev/video0",
codec.H264().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + codec.H264().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
"! application/x-rtp " + "! application/x-rtp " +
"! rtph264depay " + "! rtph264depay " +
"! decodebin " + "! decodebin " +
@ -143,12 +143,12 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
"! v4l2sink device=/dev/video0", "! v4l2sink device=/dev/video0",
}, "webcam"), }, "webcam"),
microphone: streamSrcNew(map[string]string{ microphone: streamSrcNew(map[string]string{
codec.Opus().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + codec.Opus().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=OPUS ", codec.Opus().PayloadType) + fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=OPUS ", codec.Opus().PayloadType) +
"! rtpopusdepay " + "! rtpopusdepay " +
"! decodebin " + "! decodebin " +
"! pulsesink device=audio_input", "! pulsesink device=audio_input",
codec.G722().Name: "appsrc format=time is-live=true do-timestamp=true name=src " + codec.G722().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
"! application/x-rtp clock-rate=8000 " + "! application/x-rtp clock-rate=8000 " +
"! rtpg722depay " + "! rtpg722depay " +
"! decodebin " + "! decodebin " +

View File

@ -78,6 +78,7 @@ func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error {
return err return err
} }
manager.pipeline.AttachAppsrc("appsrc")
manager.pipeline.Play() manager.pipeline.Play()
return nil return nil
} }
@ -103,7 +104,7 @@ func (manager *StreamSrcManagerCtx) Push(bytes []byte) {
return return
} }
manager.pipeline.Push("src", bytes) manager.pipeline.Push(bytes)
} }
func (manager *StreamSrcManagerCtx) Started() bool { func (manager *StreamSrcManagerCtx) Started() bool {