mirror of
https://github.com/m1k1o/neko.git
synced 2024-07-24 14:40:50 +12:00
2364facd60
* Add congestion control * Improve stream matching, add manual stream selection, add metrics * Use a ticker for bitrate estimation and make bandwidth drops switch to lower streams more aggressively * Missing signal response, fix video auto bug * Remove redundant mutex * Bitrate history queue * Get bitrate fn support h264 & float64 --------- Co-authored-by: Aleksandar Sukovic <aleksandar.sukovic@gmail.com>
146 lines
3.1 KiB
Go
146 lines
3.1 KiB
Go
package buckets
|
|
|
|
import (
|
|
"errors"
|
|
"sort"
|
|
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"github.com/demodesk/neko/pkg/types"
|
|
"github.com/demodesk/neko/pkg/types/codec"
|
|
)
|
|
|
|
type BucketsManagerCtx struct {
|
|
logger zerolog.Logger
|
|
codec codec.RTPCodec
|
|
streams map[string]types.StreamSinkManager
|
|
streamIDs []string
|
|
}
|
|
|
|
func BucketsNew(codec codec.RTPCodec, streams map[string]types.StreamSinkManager, streamIDs []string) *BucketsManagerCtx {
|
|
logger := log.With().
|
|
Str("module", "capture").
|
|
Str("submodule", "buckets").
|
|
Logger()
|
|
|
|
return &BucketsManagerCtx{
|
|
logger: logger,
|
|
codec: codec,
|
|
streams: streams,
|
|
streamIDs: streamIDs,
|
|
}
|
|
}
|
|
|
|
func (manager *BucketsManagerCtx) Shutdown() {
|
|
manager.logger.Info().Msgf("shutdown")
|
|
|
|
manager.DestroyAll()
|
|
}
|
|
|
|
func (manager *BucketsManagerCtx) DestroyAll() {
|
|
for _, stream := range manager.streams {
|
|
if stream.Started() {
|
|
stream.DestroyPipeline()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (manager *BucketsManagerCtx) RecreateAll() error {
|
|
for _, stream := range manager.streams {
|
|
if stream.Started() {
|
|
err := stream.CreatePipeline()
|
|
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (manager *BucketsManagerCtx) IDs() []string {
|
|
return manager.streamIDs
|
|
}
|
|
|
|
func (manager *BucketsManagerCtx) Codec() codec.RTPCodec {
|
|
return manager.codec
|
|
}
|
|
|
|
func (manager *BucketsManagerCtx) SetReceiver(receiver types.Receiver) {
|
|
// bitrate history is per receiver
|
|
bitrateHistory := &queue{}
|
|
|
|
receiver.OnBitrateChange(func(peerBitrate int) (bool, error) {
|
|
bitrate := peerBitrate
|
|
if receiver.VideoAuto() {
|
|
bitrate = bitrateHistory.normaliseBitrate(bitrate)
|
|
}
|
|
|
|
stream := manager.findNearestStream(bitrate)
|
|
streamID := stream.ID()
|
|
|
|
// TODO: make this less noisy in logs
|
|
manager.logger.Debug().
|
|
Str("video_id", streamID).
|
|
Int("len", bitrateHistory.len()).
|
|
Int("peer_bitrate", peerBitrate).
|
|
Int("bitrate", bitrate).
|
|
Msg("change video bitrate")
|
|
|
|
return receiver.SetStream(stream)
|
|
})
|
|
|
|
receiver.OnVideoChange(func(videoID string) (bool, error) {
|
|
stream := manager.streams[videoID]
|
|
manager.logger.Info().
|
|
Str("video_id", videoID).
|
|
Msg("video change")
|
|
|
|
return receiver.SetStream(stream)
|
|
})
|
|
}
|
|
|
|
func (manager *BucketsManagerCtx) findNearestStream(peerBitrate int) types.StreamSinkManager {
|
|
type streamDiff struct {
|
|
id string
|
|
bitrateDiff int
|
|
}
|
|
|
|
sortDiff := func(a, b int) bool {
|
|
switch {
|
|
case a < 0 && b < 0:
|
|
return a > b
|
|
case a >= 0:
|
|
if b >= 0 {
|
|
return a <= b
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
var diffs []streamDiff
|
|
|
|
for _, stream := range manager.streams {
|
|
diffs = append(diffs, streamDiff{
|
|
id: stream.ID(),
|
|
bitrateDiff: peerBitrate - stream.Bitrate(),
|
|
})
|
|
}
|
|
|
|
sort.Slice(diffs, func(i, j int) bool {
|
|
return sortDiff(diffs[i].bitrateDiff, diffs[j].bitrateDiff)
|
|
})
|
|
|
|
bestDiff := diffs[0]
|
|
|
|
return manager.streams[bestDiff.id]
|
|
}
|
|
|
|
func (manager *BucketsManagerCtx) RemoveReceiver(receiver types.Receiver) error {
|
|
receiver.OnBitrateChange(nil)
|
|
receiver.OnVideoChange(nil)
|
|
receiver.RemoveStream()
|
|
return nil
|
|
}
|