Sample listeners chan (#33)

* switch to sample listeners.

* add streamsink total bytes to metrics.

* on rtcp to chan.

* change logs.

* streamsink add real bitrate.

* add timing information to sample.

* bitrate buckets.

* switch to WriteSample.
This commit is contained in:
Miroslav Šedivý 2023-03-07 00:08:53 +01:00 committed by GitHub
parent 38fc21aabc
commit 17bfd2d58f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 178 additions and 72 deletions

View File

@ -4,6 +4,8 @@ import (
"errors" "errors"
"reflect" "reflect"
"sync" "sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
@ -23,6 +25,9 @@ type StreamSinkManagerCtx struct {
getBitrate func() (int, error) getBitrate func() (int, error)
waitForKf bool // wait for a keyframe before sending samples waitForKf bool // wait for a keyframe before sending samples
bitrate uint64 // atomic
brBuckets map[int]float64
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
@ -32,12 +37,13 @@ type StreamSinkManagerCtx struct {
pipelineMu sync.Mutex pipelineMu sync.Mutex
pipelineFn func() (string, error) pipelineFn func() (string, error)
listeners map[uintptr]*func(sample types.Sample) listeners map[uintptr]types.SampleListener
listenersKf map[uintptr]*func(sample types.Sample) // keyframe lobby listenersKf map[uintptr]types.SampleListener // keyframe lobby
listenersMu sync.Mutex listenersMu sync.Mutex
// metrics // metrics
currentListeners prometheus.Gauge currentListeners prometheus.Gauge
totalBytes prometheus.Counter
pipelinesCounter prometheus.Counter pipelinesCounter prometheus.Counter
pipelinesActive prometheus.Gauge pipelinesActive prometheus.Gauge
} }
@ -54,12 +60,14 @@ func streamSinkNew(c codec.RTPCodec, pipelineFn func() (string, error), id strin
// only wait for keyframes if the codec is video // only wait for keyframes if the codec is video
waitForKf: c.IsVideo(), waitForKf: c.IsVideo(),
brBuckets: map[int]float64{},
logger: logger, logger: logger,
codec: c, codec: c,
pipelineFn: pipelineFn, pipelineFn: pipelineFn,
listeners: map[uintptr]*func(sample types.Sample){}, listeners: map[uintptr]types.SampleListener{},
listenersKf: map[uintptr]*func(sample types.Sample){}, listenersKf: map[uintptr]types.SampleListener{},
// metrics // metrics
currentListeners: promauto.NewGauge(prometheus.GaugeOpts{ currentListeners: promauto.NewGauge(prometheus.GaugeOpts{
@ -73,6 +81,17 @@ func streamSinkNew(c codec.RTPCodec, pipelineFn func() (string, error), id strin
"codec_type": c.Type.String(), "codec_type": c.Type.String(),
}, },
}), }),
totalBytes: promauto.NewGauge(prometheus.GaugeOpts{
Name: "streamsink_bytes",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of bytes created by the pipeline.",
ConstLabels: map[string]string{
"video_id": id,
"codec_name": c.Name,
"codec_type": c.Type.String(),
},
}),
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{ pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total", Name: "pipelines_total",
Namespace: "neko", Namespace: "neko",
@ -123,6 +142,14 @@ func (manager *StreamSinkManagerCtx) ID() string {
} }
func (manager *StreamSinkManagerCtx) Bitrate() int { func (manager *StreamSinkManagerCtx) Bitrate() int {
// TODO: fix bitrate switching calculation
// return real bitrate if available
//realBitrate := atomic.LoadUint64(&manager.bitrate)
//if realBitrate != 0 {
// return int(realBitrate)
//}
// if we do not have function to estimate bitrate, return 0
if manager.getBitrate == nil { if manager.getBitrate == nil {
return 0 return 0
} }
@ -161,7 +188,7 @@ func (manager *StreamSinkManagerCtx) stop() {
} }
} }
func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sample)) { func (manager *StreamSinkManagerCtx) addListener(listener types.SampleListener) {
ptr := reflect.ValueOf(listener).Pointer() ptr := reflect.ValueOf(listener).Pointer()
emitKeyframe := false emitKeyframe := false
@ -186,7 +213,7 @@ func (manager *StreamSinkManagerCtx) addListener(listener *func(sample types.Sam
} }
} }
func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.Sample)) { func (manager *StreamSinkManagerCtx) removeListener(listener types.SampleListener) {
ptr := reflect.ValueOf(listener).Pointer() ptr := reflect.ValueOf(listener).Pointer()
manager.listenersMu.Lock() manager.listenersMu.Lock()
@ -198,7 +225,7 @@ func (manager *StreamSinkManagerCtx) removeListener(listener *func(sample types.
manager.currentListeners.Set(float64(manager.ListenersCount())) manager.currentListeners.Set(float64(manager.ListenersCount()))
} }
func (manager *StreamSinkManagerCtx) AddListener(listener *func(sample types.Sample)) error { func (manager *StreamSinkManagerCtx) AddListener(listener types.SampleListener) error {
manager.mu.Lock() manager.mu.Lock()
defer manager.mu.Unlock() defer manager.mu.Unlock()
@ -217,7 +244,7 @@ func (manager *StreamSinkManagerCtx) AddListener(listener *func(sample types.Sam
return nil return nil
} }
func (manager *StreamSinkManagerCtx) RemoveListener(listener *func(sample types.Sample)) error { func (manager *StreamSinkManagerCtx) RemoveListener(listener types.SampleListener) error {
manager.mu.Lock() manager.mu.Lock()
defer manager.mu.Unlock() defer manager.mu.Unlock()
@ -236,7 +263,7 @@ func (manager *StreamSinkManagerCtx) RemoveListener(listener *func(sample types.
// moving listeners between streams ensures, that target pipeline is running // moving listeners between streams ensures, that target pipeline is running
// before listener is added, and stops source pipeline if there are 0 listeners // before listener is added, and stops source pipeline if there are 0 listeners
func (manager *StreamSinkManagerCtx) MoveListenerTo(listener *func(sample types.Sample), stream types.StreamSinkManager) error { func (manager *StreamSinkManagerCtx) MoveListenerTo(listener types.SampleListener, stream types.StreamSinkManager) error {
if listener == nil { if listener == nil {
return errors.New("listener cannot be nil") return errors.New("listener cannot be nil")
} }
@ -330,20 +357,7 @@ func (manager *StreamSinkManagerCtx) CreatePipeline() error {
return return
} }
manager.listenersMu.Lock() manager.onSample(sample)
// if is not delta unit -> it can be decoded independently -> it is a keyframe
if manager.waitForKf && !sample.DeltaUnit && len(manager.listenersKf) > 0 {
// if current sample is a keyframe, move listeners from
// keyframe lobby to actual listeners map and clear lobby
for k, v := range manager.listenersKf {
manager.listeners[k] = v
}
manager.listenersKf = make(map[uintptr]*func(sample types.Sample))
}
for _, emit := range manager.listeners {
(*emit)(sample)
}
manager.listenersMu.Unlock()
} }
}() }()
@ -353,6 +367,51 @@ func (manager *StreamSinkManagerCtx) CreatePipeline() error {
return nil return nil
} }
func (manager *StreamSinkManagerCtx) saveSampleBitrate(timestamp time.Time, delta float64) {
// get unix timestamp in seconds
sec := timestamp.Unix()
// last bucket is timestamp rounded to 3 seconds - 1 second
last := int((sec - 1) % 3)
// current bucket is timestamp rounded to 3 seconds
curr := int(sec % 3)
// next bucket is timestamp rounded to 3 seconds + 1 second
next := int((sec + 1) % 3)
if manager.brBuckets[next] != 0 {
// atomic update bitrate
atomic.StoreUint64(&manager.bitrate, uint64(manager.brBuckets[last]))
// empty next bucket
manager.brBuckets[next] = 0
}
// add rate to current bucket
manager.brBuckets[curr] += delta
}
func (manager *StreamSinkManagerCtx) onSample(sample types.Sample) {
manager.listenersMu.Lock()
defer manager.listenersMu.Unlock()
// save to metrics
length := float64(sample.Length)
manager.totalBytes.Add(length)
manager.saveSampleBitrate(sample.Timestamp, length)
// if is not delta unit -> it can be decoded independently -> it is a keyframe
if manager.waitForKf && !sample.DeltaUnit && len(manager.listenersKf) > 0 {
// if current sample is a keyframe, move listeners from
// keyframe lobby to actual listeners map and clear lobby
for k, v := range manager.listenersKf {
manager.listeners[k] = v
}
manager.listenersKf = make(map[uintptr]types.SampleListener)
}
for _, l := range manager.listeners {
l.WriteSample(sample)
}
}
func (manager *StreamSinkManagerCtx) DestroyPipeline() { func (manager *StreamSinkManagerCtx) DestroyPipeline() {
manager.pipelineMu.Lock() manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock() defer manager.pipelineMu.Unlock()
@ -366,4 +425,7 @@ func (manager *StreamSinkManagerCtx) DestroyPipeline() {
manager.pipeline = nil manager.pipeline = nil
manager.pipelinesActive.Set(0) manager.pipelinesActive.Set(0)
manager.brBuckets = make(map[int]float64)
atomic.StoreUint64(&manager.bitrate, 0)
} }

View File

@ -321,8 +321,13 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int,
videoAuto = false videoAuto = false
} }
videoRtcp := make(chan []rtcp.Packet, 1)
// video track // video track
videoTrack, err := NewTrack(logger, videoCodec, connection, WithVideoAuto(videoAuto)) videoTrack, err := NewTrack(logger, videoCodec, connection,
WithVideoAuto(videoAuto),
WithRtcpChan(videoRtcp),
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -570,7 +575,9 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int,
if err = video.RemoveReceiver(videoTrack); err != nil { if err = video.RemoveReceiver(videoTrack); err != nil {
logger.Err(err).Msg("failed to remove video receiver") logger.Err(err).Msg("failed to remove video receiver")
} }
audioTrack.RemoveStream() audioTrack.Shutdown()
videoTrack.Shutdown()
close(videoRtcp)
} }
manager.metrics.SetState(session, state) manager.metrics.SetState(session, state)
@ -651,7 +658,13 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int,
}) })
}) })
videoTrack.OnRTCP(func(packets []rtcp.Packet) { go func() {
for {
packets, ok := <-videoRtcp
if !ok {
break
}
for _, p := range packets { for _, p := range packets {
if rtcpPacket, ok := p.(*rtcp.ReceiverReport); ok { if rtcpPacket, ok := p.(*rtcp.ReceiverReport); ok {
l := len(rtcpPacket.Reports) l := len(rtcpPacket.Reports)
@ -661,7 +674,8 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int,
} }
} }
} }
}) }
}()
go func() { go func() {
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(5 * time.Second)

View File

@ -18,7 +18,9 @@ import (
type Track struct { type Track struct {
logger zerolog.Logger logger zerolog.Logger
track *webrtc.TrackLocalStaticSample track *webrtc.TrackLocalStaticSample
listener func(sample types.Sample)
rtcpCh chan []rtcp.Packet
sample chan types.Sample
videoAuto bool videoAuto bool
videoAutoMu sync.RWMutex videoAutoMu sync.RWMutex
@ -27,9 +29,6 @@ type Track struct {
stream types.StreamSinkManager stream types.StreamSinkManager
streamMu sync.Mutex streamMu sync.Mutex
onRtcp func([]rtcp.Packet)
onRtcpMu sync.RWMutex
bitrateChange func(int) (bool, error) bitrateChange func(int) (bool, error)
videoChange func(string) (bool, error) videoChange func(string) (bool, error)
} }
@ -42,6 +41,12 @@ func WithVideoAuto(auto bool) option {
} }
} }
func WithRtcpChan(rtcp chan []rtcp.Packet) option {
return func(t *Track) {
t.rtcpCh = rtcp
}
}
func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.PeerConnection, opts ...option) (*Track, error) { func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.PeerConnection, opts ...option) (*Track, error) {
id := codec.Type.String() id := codec.Type.String()
track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream") track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream")
@ -54,49 +59,66 @@ func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.Pe
t := &Track{ t := &Track{
logger: logger, logger: logger,
track: track, track: track,
rtcpCh: make(chan []rtcp.Packet),
sample: make(chan types.Sample),
} }
for _, opt := range opts { for _, opt := range opts {
opt(t) opt(t)
} }
t.listener = func(sample types.Sample) {
err := track.WriteSample(media.Sample{
Data: sample.Data,
Duration: sample.Duration,
})
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
logger.Warn().Err(err).Msg("failed to write sample to track")
}
}
sender, err := connection.AddTrack(t.track) sender, err := connection.AddTrack(t.track)
if err != nil { if err != nil {
return nil, err return nil, err
} }
go t.rtcpReader(sender) go t.rtcpReader(sender)
go t.sampleReader()
return t, nil return t, nil
} }
func (t *Track) Shutdown() {
t.RemoveStream()
close(t.sample)
}
func (t *Track) rtcpReader(sender *webrtc.RTPSender) { func (t *Track) rtcpReader(sender *webrtc.RTPSender) {
for { for {
packets, _, err := sender.ReadRTCP() packets, _, err := sender.ReadRTCP()
if err != nil { if err != nil {
if err == io.EOF || err == io.ErrClosedPipe { if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
t.logger.Debug().Msg("track rtcp reader closed")
return return
} }
t.logger.Err(err).Msg("RTCP read error") t.logger.Warn().Err(err).Msg("failed to read track rtcp")
continue continue
} }
t.onRtcpMu.RLock() if t.rtcpCh != nil {
if t.onRtcp != nil { t.rtcpCh <- packets
go t.onRtcp(packets) }
}
}
func (t *Track) sampleReader() {
for {
sample, ok := <-t.sample
if !ok {
t.logger.Debug().Msg("track sample reader closed")
return
}
err := t.track.WriteSample(media.Sample{
Data: sample.Data,
Duration: sample.Duration,
Timestamp: sample.Timestamp,
})
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
t.logger.Warn().Err(err).Msg("failed to write sample to track")
} }
t.onRtcpMu.RUnlock()
} }
} }
@ -117,9 +139,9 @@ func (t *Track) SetStream(stream types.StreamSinkManager) (bool, error) {
var err error var err error
if t.stream != nil { if t.stream != nil {
err = t.stream.MoveListenerTo(&t.listener, stream) err = t.stream.MoveListenerTo(t, stream)
} else { } else {
err = stream.AddListener(&t.listener) err = stream.AddListener(t)
} }
if err != nil { if err != nil {
return false, err return false, err
@ -138,7 +160,7 @@ func (t *Track) RemoveStream() {
return return
} }
err := t.stream.RemoveListener(&t.listener) err := t.stream.RemoveListener(t)
if err != nil { if err != nil {
t.logger.Warn().Err(err).Msg("failed to remove listener from stream") t.logger.Warn().Err(err).Msg("failed to remove listener from stream")
} }
@ -157,9 +179,9 @@ func (t *Track) SetPaused(paused bool) {
var err error var err error
if paused { if paused {
err = t.stream.RemoveListener(&t.listener) err = t.stream.RemoveListener(t)
} else { } else {
err = t.stream.AddListener(&t.listener) err = t.stream.AddListener(t)
} }
if err != nil { if err != nil {
t.logger.Warn().Err(err).Msg("failed to change listener state") t.logger.Warn().Err(err).Msg("failed to change listener state")
@ -169,11 +191,8 @@ func (t *Track) SetPaused(paused bool) {
t.paused = paused t.paused = paused
} }
func (t *Track) OnRTCP(f func([]rtcp.Packet)) { func (t *Track) WriteSample(sample types.Sample) {
t.onRtcpMu.Lock() t.sample <- sample
defer t.onRtcpMu.Unlock()
t.onRtcp = f
} }
func (t *Track) SetBitrate(bitrate int) (bool, error) { func (t *Track) SetBitrate(bitrate int) (bool, error) {

View File

@ -200,8 +200,8 @@ func CheckPlugins(plugins []string) error {
} }
//export goHandlePipelineBuffer //export goHandlePipelineBuffer
func goHandlePipelineBuffer(pipelineID C.int, buf unsafe.Pointer, bufLen C.int, duration C.guint64, deltaUnit C.gboolean) { func goHandlePipelineBuffer(pipelineID C.int, buf C.gpointer, bufLen C.int, duration C.guint64, deltaUnit C.gboolean) {
defer C.free(buf) defer C.g_free(buf)
pipelinesLock.Lock() pipelinesLock.Lock()
pipeline, ok := pipelines[int(pipelineID)] pipeline, ok := pipelines[int(pipelineID)]
@ -209,7 +209,9 @@ func goHandlePipelineBuffer(pipelineID C.int, buf unsafe.Pointer, bufLen C.int,
if ok { if ok {
pipeline.sample <- types.Sample{ pipeline.sample <- types.Sample{
Data: C.GoBytes(buf, bufLen), Data: C.GoBytes(unsafe.Pointer(buf), bufLen),
Length: int(bufLen),
Timestamp: time.Now(),
Duration: time.Duration(duration), Duration: time.Duration(duration),
DeltaUnit: deltaUnit == C.TRUE, DeltaUnit: deltaUnit == C.TRUE,
} }

View File

@ -17,11 +17,20 @@ var (
) )
type Sample struct { type Sample struct {
// buffer with encoded media
Data []byte Data []byte
Length int
// timing information
Timestamp time.Time
Duration time.Duration Duration time.Duration
// metadata
DeltaUnit bool // this unit cannot be decoded independently. DeltaUnit bool // this unit cannot be decoded independently.
} }
type SampleListener interface {
WriteSample(Sample)
}
type Receiver interface { type Receiver interface {
SetStream(stream StreamSinkManager) (changed bool, err error) SetStream(stream StreamSinkManager) (changed bool, err error)
RemoveStream() RemoveStream()
@ -60,9 +69,9 @@ type StreamSinkManager interface {
Codec() codec.RTPCodec Codec() codec.RTPCodec
Bitrate() int Bitrate() int
AddListener(listener *func(sample Sample)) error AddListener(listener SampleListener) error
RemoveListener(listener *func(sample Sample)) error RemoveListener(listener SampleListener) error
MoveListenerTo(listener *func(sample Sample), targetStream StreamSinkManager) error MoveListenerTo(listener SampleListener, targetStream StreamSinkManager) error
ListenersCount() int ListenersCount() int
Started() bool Started() bool