mirror of
https://github.com/m1k1o/neko.git
synced 2024-07-24 14:40:50 +12:00
Wait for keyframe on switching streams (#28)
* stream sink add keyframe lobby. * change streamsink keyframe identifier. * add h264. * use gstreamers is delta unit for sample. * use delta unit.
This commit is contained in:
parent
f8b128e1e9
commit
124c5ae117
@ -21,6 +21,7 @@ var moveSinkListenerMu = sync.Mutex{}
|
|||||||
type StreamSinkManagerCtx struct {
|
type StreamSinkManagerCtx struct {
|
||||||
id string
|
id string
|
||||||
getBitrate func() (int, error)
|
getBitrate func() (int, error)
|
||||||
|
waitForKf bool // wait for a keyframe before sending samples
|
||||||
|
|
||||||
logger zerolog.Logger
|
logger zerolog.Logger
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
@ -32,6 +33,7 @@ type StreamSinkManagerCtx struct {
|
|||||||
pipelineFn func() (string, error)
|
pipelineFn func() (string, error)
|
||||||
|
|
||||||
listeners map[uintptr]*func(sample types.Sample)
|
listeners map[uintptr]*func(sample types.Sample)
|
||||||
|
listenersKf map[uintptr]*func(sample types.Sample) // keyframe lobby
|
||||||
listenersMu sync.Mutex
|
listenersMu sync.Mutex
|
||||||
|
|
||||||
// metrics
|
// metrics
|
||||||
@ -40,7 +42,7 @@ type StreamSinkManagerCtx struct {
|
|||||||
pipelinesActive prometheus.Gauge
|
pipelinesActive prometheus.Gauge
|
||||||
}
|
}
|
||||||
|
|
||||||
func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), id string, getBitrate func() (int, error)) *StreamSinkManagerCtx {
|
func streamSinkNew(c codec.RTPCodec, pipelineFn func() (string, error), id string, getBitrate func() (int, error)) *StreamSinkManagerCtx {
|
||||||
logger := log.With().
|
logger := log.With().
|
||||||
Str("module", "capture").
|
Str("module", "capture").
|
||||||
Str("submodule", "stream-sink").
|
Str("submodule", "stream-sink").
|
||||||
@ -49,11 +51,15 @@ func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), id s
|
|||||||
manager := &StreamSinkManagerCtx{
|
manager := &StreamSinkManagerCtx{
|
||||||
id: id,
|
id: id,
|
||||||
getBitrate: getBitrate,
|
getBitrate: getBitrate,
|
||||||
|
// only wait for keyframes if the codec is video
|
||||||
|
waitForKf: c.IsVideo(),
|
||||||
|
|
||||||
logger: logger,
|
logger: logger,
|
||||||
codec: codec,
|
codec: c,
|
||||||
pipelineFn: pipelineFn,
|
pipelineFn: pipelineFn,
|
||||||
listeners: map[uintptr]*func(sample types.Sample){},
|
|
||||||
|
listeners: map[uintptr]*func(sample types.Sample){},
|
||||||
|
listenersKf: map[uintptr]*func(sample types.Sample){},
|
||||||
|
|
||||||
// metrics
|
// metrics
|
||||||
currentListeners: promauto.NewGauge(prometheus.GaugeOpts{
|
currentListeners: promauto.NewGauge(prometheus.GaugeOpts{
|
||||||
@ -63,8 +69,8 @@ func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), id s
|
|||||||
Help: "Current number of listeners for a pipeline.",
|
Help: "Current number of listeners for a pipeline.",
|
||||||
ConstLabels: map[string]string{
|
ConstLabels: map[string]string{
|
||||||
"video_id": id,
|
"video_id": id,
|
||||||
"codec_name": codec.Name,
|
"codec_name": c.Name,
|
||||||
"codec_type": codec.Type.String(),
|
"codec_type": c.Type.String(),
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
|
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
|
||||||
@ -75,8 +81,8 @@ func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), id s
|
|||||||
ConstLabels: map[string]string{
|
ConstLabels: map[string]string{
|
||||||
"submodule": "streamsink",
|
"submodule": "streamsink",
|
||||||
"video_id": id,
|
"video_id": id,
|
||||||
"codec_name": codec.Name,
|
"codec_name": c.Name,
|
||||||
"codec_type": codec.Type.String(),
|
"codec_type": c.Type.String(),
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{
|
pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{
|
||||||
@ -87,8 +93,8 @@ func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), id s
|
|||||||
ConstLabels: map[string]string{
|
ConstLabels: map[string]string{
|
||||||
"submodule": "streamsink",
|
"submodule": "streamsink",
|
||||||
"video_id": id,
|
"video_id": id,
|
||||||
"codec_name": codec.Name,
|
"codec_name": c.Name,
|
||||||
"codec_type": codec.Type.String(),
|
"codec_type": c.Type.String(),
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
@ -103,6 +109,9 @@ func (manager *StreamSinkManagerCtx) shutdown() {
|
|||||||
for key := range manager.listeners {
|
for key := range manager.listeners {
|
||||||
delete(manager.listeners, key)
|
delete(manager.listeners, key)
|
||||||
}
|
}
|
||||||
|
for key := range manager.listenersKf {
|
||||||
|
delete(manager.listenersKf, key)
|
||||||
|
}
|
||||||
manager.listenersMu.Unlock()
|
manager.listenersMu.Unlock()
|
||||||
|
|
||||||
manager.DestroyPipeline()
|
manager.DestroyPipeline()
|
||||||
@ -133,7 +142,7 @@ func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (manager *StreamSinkManagerCtx) start() error {
|
func (manager *StreamSinkManagerCtx) start() error {
|
||||||
if len(manager.listeners) == 0 {
|
if len(manager.listeners)+len(manager.listenersKf) == 0 {
|
||||||
err := manager.CreatePipeline()
|
err := manager.CreatePipeline()
|
||||||
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
|
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
|
||||||
return err
|
return err
|
||||||
@ -146,7 +155,7 @@ func (manager *StreamSinkManagerCtx) start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (manager *StreamSinkManagerCtx) stop() {
|
func (manager *StreamSinkManagerCtx) stop() {
|
||||||
if len(manager.listeners) == 0 {
|
if len(manager.listeners)+len(manager.listenersKf) == 0 {
|
||||||
manager.DestroyPipeline()
|
manager.DestroyPipeline()
|
||||||
manager.logger.Info().Msgf("last listener, stopping")
|
manager.logger.Info().Msgf("last listener, stopping")
|
||||||
}
|
}
|
||||||
@ -156,11 +165,22 @@ func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sam
|
|||||||
ptr := reflect.ValueOf(listener).Pointer()
|
ptr := reflect.ValueOf(listener).Pointer()
|
||||||
|
|
||||||
manager.listenersMu.Lock()
|
manager.listenersMu.Lock()
|
||||||
manager.listeners[ptr] = listener
|
if manager.waitForKf {
|
||||||
|
// if we're waiting for a keyframe, add it to the keyframe lobby
|
||||||
|
manager.listenersKf[ptr] = listener
|
||||||
|
} else {
|
||||||
|
// otherwise, add it as a regular listener
|
||||||
|
manager.listeners[ptr] = listener
|
||||||
|
}
|
||||||
manager.listenersMu.Unlock()
|
manager.listenersMu.Unlock()
|
||||||
|
|
||||||
manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener")
|
manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener")
|
||||||
manager.currentListeners.Set(float64(manager.ListenersCount()))
|
manager.currentListeners.Set(float64(manager.ListenersCount()))
|
||||||
|
|
||||||
|
// if we will be waiting for a keyframe, emit one now
|
||||||
|
if manager.pipeline != nil && manager.waitForKf {
|
||||||
|
manager.pipeline.EmitVideoKeyframe()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.Sample)) {
|
func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.Sample)) {
|
||||||
@ -168,6 +188,7 @@ func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.
|
|||||||
|
|
||||||
manager.listenersMu.Lock()
|
manager.listenersMu.Lock()
|
||||||
delete(manager.listeners, ptr)
|
delete(manager.listeners, ptr)
|
||||||
|
delete(manager.listenersKf, ptr) // if it's a keyframe listener, remove it too
|
||||||
manager.listenersMu.Unlock()
|
manager.listenersMu.Unlock()
|
||||||
|
|
||||||
manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener")
|
manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener")
|
||||||
@ -259,7 +280,7 @@ func (manager *StreamSinkManagerCtx) ListenersCount() int {
|
|||||||
manager.listenersMu.Lock()
|
manager.listenersMu.Lock()
|
||||||
defer manager.listenersMu.Unlock()
|
defer manager.listenersMu.Unlock()
|
||||||
|
|
||||||
return len(manager.listeners)
|
return len(manager.listeners) + len(manager.listenersKf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *StreamSinkManagerCtx) Started() bool {
|
func (manager *StreamSinkManagerCtx) Started() bool {
|
||||||
@ -307,6 +328,15 @@ func (manager *StreamSinkManagerCtx) CreatePipeline() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
manager.listenersMu.Lock()
|
manager.listenersMu.Lock()
|
||||||
|
// if is not delta unit -> it can be decoded independently -> it is a keyframe
|
||||||
|
if manager.waitForKf && !sample.DeltaUnit && len(manager.listenersKf) > 0 {
|
||||||
|
// if current sample is a keyframe, move listeners from
|
||||||
|
// keyframe lobby to actual listeners map and clear lobby
|
||||||
|
for k, v := range manager.listenersKf {
|
||||||
|
manager.listeners[k] = v
|
||||||
|
}
|
||||||
|
manager.listenersKf = make(map[uintptr]*func(sample types.Sample))
|
||||||
|
}
|
||||||
for _, emit := range manager.listeners {
|
for _, emit := range manager.listeners {
|
||||||
(*emit)(sample)
|
(*emit)(sample)
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package config
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/pion/webrtc/v3"
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
@ -171,7 +170,7 @@ func (s *Capture) Set() {
|
|||||||
// video
|
// video
|
||||||
videoCodec := viper.GetString("capture.video.codec")
|
videoCodec := viper.GetString("capture.video.codec")
|
||||||
s.VideoCodec, ok = codec.ParseStr(videoCodec)
|
s.VideoCodec, ok = codec.ParseStr(videoCodec)
|
||||||
if !ok || s.VideoCodec.Type != webrtc.RTPCodecTypeVideo {
|
if !ok || !s.VideoCodec.IsVideo() {
|
||||||
log.Warn().Str("codec", videoCodec).Msgf("unknown video codec, using Vp8")
|
log.Warn().Str("codec", videoCodec).Msgf("unknown video codec, using Vp8")
|
||||||
s.VideoCodec = codec.VP8()
|
s.VideoCodec = codec.VP8()
|
||||||
}
|
}
|
||||||
@ -217,7 +216,7 @@ func (s *Capture) Set() {
|
|||||||
|
|
||||||
audioCodec := viper.GetString("capture.audio.codec")
|
audioCodec := viper.GetString("capture.audio.codec")
|
||||||
s.AudioCodec, ok = codec.ParseStr(audioCodec)
|
s.AudioCodec, ok = codec.ParseStr(audioCodec)
|
||||||
if !ok || s.AudioCodec.Type != webrtc.RTPCodecTypeAudio {
|
if !ok || !s.AudioCodec.IsAudio() {
|
||||||
log.Warn().Str("codec", audioCodec).Msgf("unknown audio codec, using Opus")
|
log.Warn().Str("codec", audioCodec).Msgf("unknown audio codec, using Opus")
|
||||||
s.AudioCodec = codec.Opus()
|
s.AudioCodec = codec.Opus()
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,10 @@ func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.Pe
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := track.WriteSample(media.Sample(sample))
|
err := track.WriteSample(media.Sample{
|
||||||
|
Data: sample.Data,
|
||||||
|
Duration: sample.Duration,
|
||||||
|
})
|
||||||
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
||||||
logger.Warn().Err(err).Msg("failed to write sample to track")
|
logger.Warn().Err(err).Msg("failed to write sample to track")
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ static void gstreamer_pipeline_log(GstPipelineCtx *ctx, char* level, const char*
|
|||||||
char buffer[100];
|
char buffer[100];
|
||||||
vsprintf(buffer, format, argptr);
|
vsprintf(buffer, format, argptr);
|
||||||
va_end(argptr);
|
va_end(argptr);
|
||||||
goPipelineLog(level, buffer, ctx->pipelineId);
|
goPipelineLog(ctx->pipelineId, level, buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer user_data) {
|
static gboolean gstreamer_bus_call(GstBus *bus, GstMessage *msg, gpointer user_data) {
|
||||||
@ -95,7 +95,10 @@ static GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpoin
|
|||||||
buffer = gst_sample_get_buffer(sample);
|
buffer = gst_sample_get_buffer(sample);
|
||||||
if (buffer) {
|
if (buffer) {
|
||||||
gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), ©, ©_size);
|
gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), ©, ©_size);
|
||||||
goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), ctx->pipelineId);
|
goHandlePipelineBuffer(ctx->pipelineId, copy, copy_size,
|
||||||
|
GST_BUFFER_DURATION(buffer),
|
||||||
|
GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
gst_sample_unref(sample);
|
gst_sample_unref(sample);
|
||||||
}
|
}
|
||||||
|
@ -200,8 +200,8 @@ func CheckPlugins(plugins []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//export goHandlePipelineBuffer
|
//export goHandlePipelineBuffer
|
||||||
func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, pipelineID C.int) {
|
func goHandlePipelineBuffer(pipelineID C.int, buf unsafe.Pointer, bufLen C.int, duration C.guint64, deltaUnit C.gboolean) {
|
||||||
defer C.free(buffer)
|
defer C.free(buf)
|
||||||
|
|
||||||
pipelinesLock.Lock()
|
pipelinesLock.Lock()
|
||||||
pipeline, ok := pipelines[int(pipelineID)]
|
pipeline, ok := pipelines[int(pipelineID)]
|
||||||
@ -209,8 +209,9 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i
|
|||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
pipeline.sample <- types.Sample{
|
pipeline.sample <- types.Sample{
|
||||||
Data: C.GoBytes(buffer, bufferLen),
|
Data: C.GoBytes(buf, bufLen),
|
||||||
Duration: time.Duration(duration),
|
Duration: time.Duration(duration),
|
||||||
|
DeltaUnit: deltaUnit == C.TRUE,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Warn().
|
log.Warn().
|
||||||
@ -222,7 +223,7 @@ func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.i
|
|||||||
}
|
}
|
||||||
|
|
||||||
//export goPipelineLog
|
//export goPipelineLog
|
||||||
func goPipelineLog(levelUnsafe *C.char, msgUnsafe *C.char, pipelineID C.int) {
|
func goPipelineLog(pipelineID C.int, levelUnsafe *C.char, msgUnsafe *C.char) {
|
||||||
levelStr := C.GoString(levelUnsafe)
|
levelStr := C.GoString(levelUnsafe)
|
||||||
msg := C.GoString(msgUnsafe)
|
msg := C.GoString(msgUnsafe)
|
||||||
|
|
||||||
|
@ -12,8 +12,8 @@ typedef struct GstPipelineCtx {
|
|||||||
GstElement *appsrc;
|
GstElement *appsrc;
|
||||||
} GstPipelineCtx;
|
} GstPipelineCtx;
|
||||||
|
|
||||||
extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId);
|
extern void goHandlePipelineBuffer(int pipelineId, void *buffer, int bufferLen, guint64 duration, gboolean deltaUnit);
|
||||||
extern void goPipelineLog(char *level, char *msg, int pipelineId);
|
extern void goPipelineLog(int pipelineId, char *level, char *msg);
|
||||||
|
|
||||||
GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error);
|
GstPipelineCtx *gstreamer_pipeline_create(char *pipelineStr, int pipelineId, GError **error);
|
||||||
void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName);
|
void gstreamer_pipeline_attach_appsink(GstPipelineCtx *ctx, char *sinkName);
|
||||||
|
@ -6,17 +6,21 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/PaesslerAG/gval"
|
"github.com/PaesslerAG/gval"
|
||||||
"github.com/demodesk/neko/pkg/types/codec"
|
"github.com/demodesk/neko/pkg/types/codec"
|
||||||
"github.com/pion/webrtc/v3/pkg/media"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrCapturePipelineAlreadyExists = errors.New("capture pipeline already exists")
|
ErrCapturePipelineAlreadyExists = errors.New("capture pipeline already exists")
|
||||||
)
|
)
|
||||||
|
|
||||||
type Sample media.Sample
|
type Sample struct {
|
||||||
|
Data []byte
|
||||||
|
Duration time.Duration
|
||||||
|
DeltaUnit bool // this unit cannot be decoded independently.
|
||||||
|
}
|
||||||
|
|
||||||
type Receiver interface {
|
type Receiver interface {
|
||||||
SetStream(stream StreamSinkManager) (changed bool, err error)
|
SetStream(stream StreamSinkManager) (changed bool, err error)
|
||||||
|
@ -63,6 +63,18 @@ func (codec *RTPCodec) Register(engine *webrtc.MediaEngine) error {
|
|||||||
}, codec.Type)
|
}, codec.Type)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (codec *RTPCodec) IsVideo() bool {
|
||||||
|
return codec.Type == webrtc.RTPCodecTypeVideo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (codec *RTPCodec) IsAudio() bool {
|
||||||
|
return codec.Type == webrtc.RTPCodecTypeAudio
|
||||||
|
}
|
||||||
|
|
||||||
|
func (codec *RTPCodec) String() string {
|
||||||
|
return codec.Type.String() + "/" + codec.Name
|
||||||
|
}
|
||||||
|
|
||||||
func VP8() RTPCodec {
|
func VP8() RTPCodec {
|
||||||
return RTPCodec{
|
return RTPCodec{
|
||||||
Name: "vp8",
|
Name: "vp8",
|
||||||
|
Loading…
Reference in New Issue
Block a user