2022-10-17 13:39:31 +02:00
|
|
|
package capture
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
2022-10-25 20:25:00 +02:00
|
|
|
"fmt"
|
|
|
|
"math"
|
2022-10-17 13:39:31 +02:00
|
|
|
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
|
|
|
|
"github.com/demodesk/neko/pkg/types"
|
|
|
|
"github.com/demodesk/neko/pkg/types/codec"
|
|
|
|
)
|
|
|
|
|
|
|
|
type BucketsManagerCtx struct {
|
|
|
|
logger zerolog.Logger
|
|
|
|
codec codec.RTPCodec
|
|
|
|
streams map[string]*StreamSinkManagerCtx
|
|
|
|
streamIDs []string
|
|
|
|
}
|
|
|
|
|
|
|
|
func bucketsNew(codec codec.RTPCodec, streams map[string]*StreamSinkManagerCtx, streamIDs []string) *BucketsManagerCtx {
|
|
|
|
logger := log.With().
|
|
|
|
Str("module", "capture").
|
|
|
|
Str("submodule", "buckets").
|
|
|
|
Logger()
|
|
|
|
|
|
|
|
return &BucketsManagerCtx{
|
|
|
|
logger: logger,
|
|
|
|
codec: codec,
|
|
|
|
streams: streams,
|
|
|
|
streamIDs: streamIDs,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BucketsManagerCtx) shutdown() {
|
|
|
|
m.logger.Info().Msgf("shutdown")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BucketsManagerCtx) destroyAll() {
|
2022-10-25 20:25:00 +02:00
|
|
|
for _, stream := range m.streams {
|
|
|
|
if stream.Started() {
|
|
|
|
stream.destroyPipeline()
|
2022-10-17 13:39:31 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BucketsManagerCtx) recreateAll() error {
|
2022-10-25 20:25:00 +02:00
|
|
|
for _, stream := range m.streams {
|
|
|
|
if stream.Started() {
|
|
|
|
err := stream.createPipeline()
|
2022-10-17 13:39:31 +02:00
|
|
|
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BucketsManagerCtx) IDs() []string {
|
|
|
|
return m.streamIDs
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BucketsManagerCtx) Codec() codec.RTPCodec {
|
|
|
|
return m.codec
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *BucketsManagerCtx) SetReceiver(receiver types.Receiver) error {
|
2022-10-25 20:25:00 +02:00
|
|
|
receiver.OnBitrateChange(func(bitrate int) error {
|
|
|
|
stream, ok := m.findNearestStream(bitrate)
|
2022-10-17 13:39:31 +02:00
|
|
|
if !ok {
|
2022-10-25 20:25:00 +02:00
|
|
|
return fmt.Errorf("no stream found for bitrate %d", bitrate)
|
2022-10-17 13:39:31 +02:00
|
|
|
}
|
|
|
|
|
2022-10-25 20:25:00 +02:00
|
|
|
return receiver.SetStream(stream)
|
2022-10-17 13:39:31 +02:00
|
|
|
})
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-10-25 20:25:00 +02:00
|
|
|
func (m *BucketsManagerCtx) findNearestStream(bitrate int) (ss *StreamSinkManagerCtx, ok bool) {
|
|
|
|
minDiff := math.MaxInt
|
|
|
|
for _, s := range m.streams {
|
|
|
|
streamBitrate, err := s.Bitrate()
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Error().Err(err).Msgf("failed to get bitrate for stream %s", s.ID())
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
diffAbs := int(math.Abs(float64(bitrate - streamBitrate)))
|
|
|
|
|
|
|
|
if diffAbs < minDiff {
|
|
|
|
minDiff, ss = diffAbs, s
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ok = ss != nil
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-10-17 13:39:31 +02:00
|
|
|
func (m *BucketsManagerCtx) RemoveReceiver(receiver types.Receiver) error {
|
2022-10-25 20:25:00 +02:00
|
|
|
receiver.OnBitrateChange(nil)
|
2022-10-17 13:39:31 +02:00
|
|
|
receiver.RemoveStream()
|
|
|
|
return nil
|
|
|
|
}
|