move server to server directory.

This commit is contained in:
Miroslav Šedivý
2024-06-23 17:48:14 +02:00
parent da45f62ca8
commit 5b98344205
211 changed files with 18 additions and 10 deletions

View File

@ -0,0 +1,156 @@
package capture
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/demodesk/neko/pkg/gst"
"github.com/demodesk/neko/pkg/types"
)
type BroacastManagerCtx struct {
logger zerolog.Logger
mu sync.Mutex
pipeline gst.Pipeline
pipelineMu sync.Mutex
pipelineFn func(url string) (string, error)
url string
started bool
// metrics
pipelinesCounter prometheus.Counter
pipelinesActive prometheus.Gauge
}
func broadcastNew(pipelineFn func(url string) (string, error), defaultUrl string) *BroacastManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "broadcast").
Logger()
return &BroacastManagerCtx{
logger: logger,
pipelineFn: pipelineFn,
url: defaultUrl,
started: defaultUrl != "",
// metrics
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of created pipelines.",
ConstLabels: map[string]string{
"submodule": "broadcast",
"video_id": "main",
"codec_name": "-",
"codec_type": "-",
},
}),
pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "broadcast",
"video_id": "main",
"codec_name": "-",
"codec_type": "-",
},
}),
}
}
func (manager *BroacastManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")
manager.destroyPipeline()
}
func (manager *BroacastManagerCtx) Start(url string) error {
manager.mu.Lock()
defer manager.mu.Unlock()
err := manager.createPipeline()
if err != nil {
return err
}
manager.url = url
manager.started = true
return nil
}
func (manager *BroacastManagerCtx) Stop() {
manager.mu.Lock()
defer manager.mu.Unlock()
manager.started = false
manager.destroyPipeline()
}
func (manager *BroacastManagerCtx) Started() bool {
manager.mu.Lock()
defer manager.mu.Unlock()
return manager.started
}
func (manager *BroacastManagerCtx) Url() string {
manager.mu.Lock()
defer manager.mu.Unlock()
return manager.url
}
func (manager *BroacastManagerCtx) createPipeline() error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
}
pipelineStr, err := manager.pipelineFn(manager.url)
if err != nil {
return err
}
manager.logger.Info().
Str("url", manager.url).
Str("src", pipelineStr).
Msgf("starting pipeline")
manager.pipeline, err = gst.CreatePipeline(pipelineStr)
if err != nil {
return err
}
manager.pipeline.Play()
manager.pipelinesCounter.Inc()
manager.pipelinesActive.Set(1)
return nil
}
func (manager *BroacastManagerCtx) destroyPipeline() {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return
}
manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil
manager.pipelinesActive.Set(0)
}

View File

@ -0,0 +1,269 @@
package capture
import (
"errors"
"fmt"
"strings"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/demodesk/neko/internal/config"
"github.com/demodesk/neko/pkg/types"
"github.com/demodesk/neko/pkg/types/codec"
)
type CaptureManagerCtx struct {
logger zerolog.Logger
desktop types.DesktopManager
config *config.Capture
// sinks
broadcast *BroacastManagerCtx
screencast *ScreencastManagerCtx
audio *StreamSinkManagerCtx
video *StreamSelectorManagerCtx
// sources
webcam *StreamSrcManagerCtx
microphone *StreamSrcManagerCtx
}
func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
logger := log.With().Str("module", "capture").Logger()
videos := map[string]types.StreamSinkManager{}
for video_id, cnf := range config.VideoPipelines {
pipelineConf := cnf
createPipeline := func() (string, error) {
if pipelineConf.GstPipeline != "" {
// replace {display} with valid display
return strings.Replace(pipelineConf.GstPipeline, "{display}", config.Display, 1), nil
}
screen := desktop.GetScreenSize()
pipeline, err := pipelineConf.GetPipeline(screen)
if err != nil {
return "", err
}
return fmt.Sprintf(
"ximagesrc display-name=%s show-pointer=false use-damage=false "+
"%s ! appsink name=appsink", config.Display, pipeline,
), nil
}
// trigger function to catch evaluation errors at startup
pipeline, err := createPipeline()
if err != nil {
logger.Panic().Err(err).
Str("video_id", video_id).
Msg("failed to create video pipeline")
}
logger.Info().
Str("video_id", video_id).
Str("pipeline", pipeline).
Msg("syntax check for video stream pipeline passed")
// append to videos
videos[video_id] = streamSinkNew(config.VideoCodec, createPipeline, video_id)
}
return &CaptureManagerCtx{
logger: logger,
desktop: desktop,
config: config,
// sinks
broadcast: broadcastNew(func(url string) (string, error) {
if config.BroadcastPipeline != "" {
var pipeline = config.BroadcastPipeline
// replace {display} with valid display
pipeline = strings.Replace(pipeline, "{display}", config.Display, 1)
// replace {device} with valid device
pipeline = strings.Replace(pipeline, "{device}", config.AudioDevice, 1)
// replace {url} with valid URL
return strings.Replace(pipeline, "{url}", url, 1), nil
}
return fmt.Sprintf(
"flvmux name=mux ! rtmpsink location='%s live=1' "+
"pulsesrc device=%s "+
"! audio/x-raw,channels=2 "+
"! audioconvert "+
"! queue "+
"! voaacenc bitrate=%d "+
"! mux. "+
"ximagesrc display-name=%s show-pointer=true use-damage=false "+
"! video/x-raw "+
"! videoconvert "+
"! queue "+
"! x264enc threads=4 bitrate=%d key-int-max=15 byte-stream=true tune=zerolatency speed-preset=%s "+
"! mux.", url, config.AudioDevice, config.BroadcastAudioBitrate*1000, config.Display, config.BroadcastVideoBitrate, config.BroadcastPreset,
), nil
}, config.BroadcastUrl),
screencast: screencastNew(config.ScreencastEnabled, func() string {
if config.ScreencastPipeline != "" {
// replace {display} with valid display
return strings.Replace(config.ScreencastPipeline, "{display}", config.Display, 1)
}
return fmt.Sprintf(
"ximagesrc display-name=%s show-pointer=true use-damage=false "+
"! video/x-raw,framerate=%s "+
"! videoconvert "+
"! queue "+
"! jpegenc quality=%s "+
"! appsink name=appsink", config.Display, config.ScreencastRate, config.ScreencastQuality,
)
}()),
audio: streamSinkNew(config.AudioCodec, func() (string, error) {
if config.AudioPipeline != "" {
// replace {device} with valid device
return strings.Replace(config.AudioPipeline, "{device}", config.AudioDevice, 1), nil
}
return fmt.Sprintf(
"pulsesrc device=%s "+
"! audio/x-raw,channels=2 "+
"! audioconvert "+
"! queue "+
"! %s "+
"! appsink name=appsink", config.AudioDevice, config.AudioCodec.Pipeline,
), nil
}, "audio"),
video: streamSelectorNew(config.VideoCodec, videos, config.VideoIDs),
// sources
webcam: streamSrcNew(config.WebcamEnabled, map[string]string{
codec.VP8().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=VP8-DRAFT-IETF-01 ", codec.VP8().PayloadType) +
"! rtpvp8depay " +
"! decodebin " +
"! videoconvert " +
"! videorate " +
"! videoscale " +
fmt.Sprintf("! video/x-raw,width=%d,height=%d ", config.WebcamWidth, config.WebcamHeight) +
"! identity drop-allocation=true " +
fmt.Sprintf("! v4l2sink sync=false device=%s", config.WebcamDevice),
// TODO: Test this pipeline.
codec.VP9().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
"! application/x-rtp " +
"! rtpvp9depay " +
"! decodebin " +
"! videoconvert " +
"! videorate " +
"! videoscale " +
fmt.Sprintf("! video/x-raw,width=%d,height=%d ", config.WebcamWidth, config.WebcamHeight) +
"! identity drop-allocation=true " +
fmt.Sprintf("! v4l2sink sync=false device=%s", config.WebcamDevice),
// TODO: Test this pipeline.
codec.H264().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
"! application/x-rtp " +
"! rtph264depay " +
"! decodebin " +
"! videoconvert " +
"! videorate " +
"! videoscale " +
fmt.Sprintf("! video/x-raw,width=%d,height=%d ", config.WebcamWidth, config.WebcamHeight) +
"! identity drop-allocation=true " +
fmt.Sprintf("! v4l2sink sync=false device=%s", config.WebcamDevice),
}, "webcam"),
microphone: streamSrcNew(config.MicrophoneEnabled, map[string]string{
codec.Opus().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=OPUS ", codec.Opus().PayloadType) +
"! rtpopusdepay " +
"! decodebin " +
fmt.Sprintf("! pulsesink device=%s", config.MicrophoneDevice),
// TODO: Test this pipeline.
codec.G722().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
"! application/x-rtp clock-rate=8000 " +
"! rtpg722depay " +
"! decodebin " +
fmt.Sprintf("! pulsesink device=%s", config.MicrophoneDevice),
}, "microphone"),
}
}
func (manager *CaptureManagerCtx) Start() {
if manager.broadcast.Started() {
if err := manager.broadcast.createPipeline(); err != nil {
manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline")
}
}
manager.desktop.OnBeforeScreenSizeChange(func() {
manager.video.destroyPipelines()
if manager.broadcast.Started() {
manager.broadcast.destroyPipeline()
}
if manager.screencast.Started() {
manager.screencast.destroyPipeline()
}
})
manager.desktop.OnAfterScreenSizeChange(func() {
err := manager.video.recreatePipelines()
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to recreate video pipelines")
}
if manager.broadcast.Started() {
err := manager.broadcast.createPipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
manager.logger.Panic().Err(err).Msg("unable to recreate broadcast pipeline")
}
}
if manager.screencast.Started() {
err := manager.screencast.createPipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
manager.logger.Panic().Err(err).Msg("unable to recreate screencast pipeline")
}
}
})
}
func (manager *CaptureManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("shutdown")
manager.broadcast.shutdown()
manager.screencast.shutdown()
manager.audio.shutdown()
manager.video.shutdown()
manager.webcam.shutdown()
manager.microphone.shutdown()
return nil
}
func (manager *CaptureManagerCtx) Broadcast() types.BroadcastManager {
return manager.broadcast
}
func (manager *CaptureManagerCtx) Screencast() types.ScreencastManager {
return manager.screencast
}
func (manager *CaptureManagerCtx) Audio() types.StreamSinkManager {
return manager.audio
}
func (manager *CaptureManagerCtx) Video() types.StreamSelectorManager {
return manager.video
}
func (manager *CaptureManagerCtx) Webcam() types.StreamSrcManager {
return manager.webcam
}
func (manager *CaptureManagerCtx) Microphone() types.StreamSrcManager {
return manager.microphone
}

View File

@ -0,0 +1,257 @@
package capture
import (
"errors"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/demodesk/neko/pkg/gst"
"github.com/demodesk/neko/pkg/types"
)
// timeout between intervals, when screencast pipeline is checked
const screencastTimeout = 5 * time.Second
type ScreencastManagerCtx struct {
logger zerolog.Logger
mu sync.Mutex
wg sync.WaitGroup
pipeline gst.Pipeline
pipelineStr string
pipelineMu sync.Mutex
image types.Sample
imageMu sync.Mutex
tickerStop chan struct{}
enabled bool
started bool
expired int32
// metrics
imagesCounter prometheus.Counter
pipelinesCounter prometheus.Counter
pipelinesActive prometheus.Gauge
}
func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "screencast").
Logger()
manager := &ScreencastManagerCtx{
logger: logger,
pipelineStr: pipelineStr,
tickerStop: make(chan struct{}),
enabled: enabled,
started: false,
// metrics
imagesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "screencast_images_total",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of created images.",
}),
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of created pipelines.",
ConstLabels: map[string]string{
"submodule": "screencast",
"video_id": "main",
"codec_name": "-",
"codec_type": "-",
},
}),
pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "screencast",
"video_id": "main",
"codec_name": "-",
"codec_type": "-",
},
}),
}
manager.wg.Add(1)
go func() {
defer manager.wg.Done()
ticker := time.NewTicker(screencastTimeout)
defer ticker.Stop()
for {
select {
case <-manager.tickerStop:
return
case <-ticker.C:
if manager.Started() && !atomic.CompareAndSwapInt32(&manager.expired, 0, 1) {
manager.stop()
}
}
}
}()
return manager
}
func (manager *ScreencastManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")
manager.destroyPipeline()
close(manager.tickerStop)
manager.wg.Wait()
}
func (manager *ScreencastManagerCtx) Enabled() bool {
manager.mu.Lock()
defer manager.mu.Unlock()
return manager.enabled
}
func (manager *ScreencastManagerCtx) Started() bool {
manager.mu.Lock()
defer manager.mu.Unlock()
return manager.started
}
func (manager *ScreencastManagerCtx) Image() ([]byte, error) {
atomic.StoreInt32(&manager.expired, 0)
err := manager.start()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
return nil, err
}
manager.imageMu.Lock()
defer manager.imageMu.Unlock()
if manager.image.Data == nil {
return nil, errors.New("image data not found")
}
return manager.image.Data, nil
}
func (manager *ScreencastManagerCtx) start() error {
manager.mu.Lock()
defer manager.mu.Unlock()
if !manager.enabled {
return errors.New("screencast not enabled")
}
err := manager.createPipeline()
if err != nil {
return err
}
manager.started = true
return nil
}
func (manager *ScreencastManagerCtx) stop() {
manager.mu.Lock()
defer manager.mu.Unlock()
manager.started = false
manager.destroyPipeline()
}
func (manager *ScreencastManagerCtx) createPipeline() error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
}
var err error
manager.logger.Info().
Str("str", manager.pipelineStr).
Msgf("creating pipeline")
manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr)
if err != nil {
return err
}
manager.pipeline.AttachAppsink("appsink")
manager.pipeline.Play()
manager.pipelinesCounter.Inc()
manager.pipelinesActive.Set(1)
// get first image
select {
case image, ok := <-manager.pipeline.Sample():
if !ok {
return errors.New("unable to get first image")
} else {
manager.setImage(image)
}
case <-time.After(1 * time.Second):
return errors.New("timeouted while waiting for first image")
}
manager.wg.Add(1)
pipeline := manager.pipeline
go func() {
manager.logger.Debug().Msg("started receiving images")
defer manager.wg.Done()
for {
image, ok := <-pipeline.Sample()
if !ok {
manager.logger.Debug().Msg("stopped receiving images")
return
}
manager.setImage(image)
}
}()
return nil
}
func (manager *ScreencastManagerCtx) setImage(image types.Sample) {
manager.imageMu.Lock()
manager.image = image
manager.imageMu.Unlock()
manager.imagesCounter.Inc()
}
func (manager *ScreencastManagerCtx) destroyPipeline() {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return
}
manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil
manager.pipelinesActive.Set(0)
}

View File

@ -0,0 +1,206 @@
package capture
import (
"errors"
"sort"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/demodesk/neko/pkg/types"
"github.com/demodesk/neko/pkg/types/codec"
)
type StreamSelectorManagerCtx struct {
logger zerolog.Logger
codec codec.RTPCodec
streams map[string]types.StreamSinkManager
streamIDs []string
}
func streamSelectorNew(codec codec.RTPCodec, streams map[string]types.StreamSinkManager, streamIDs []string) *StreamSelectorManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "stream-selector").
Logger()
return &StreamSelectorManagerCtx{
logger: logger,
codec: codec,
streams: streams,
streamIDs: streamIDs,
}
}
func (manager *StreamSelectorManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")
manager.destroyPipelines()
}
func (manager *StreamSelectorManagerCtx) destroyPipelines() {
for _, stream := range manager.streams {
if stream.Started() {
stream.DestroyPipeline()
}
}
}
func (manager *StreamSelectorManagerCtx) recreatePipelines() error {
for _, stream := range manager.streams {
if stream.Started() {
err := stream.CreatePipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
return err
}
}
}
return nil
}
func (manager *StreamSelectorManagerCtx) IDs() []string {
return manager.streamIDs
}
func (manager *StreamSelectorManagerCtx) Codec() codec.RTPCodec {
return manager.codec
}
func (manager *StreamSelectorManagerCtx) GetStream(selector types.StreamSelector) (types.StreamSinkManager, bool) {
// select stream by ID
if selector.ID != "" {
// select lower stream
if selector.Type == types.StreamSelectorTypeLower {
var lastStream types.StreamSinkManager
for i := len(manager.streamIDs) - 1; i >= 0; i-- {
streamID := manager.streamIDs[i]
if streamID == selector.ID {
return lastStream, lastStream != nil
}
stream, ok := manager.streams[streamID]
if ok {
lastStream = stream
}
}
// we couldn't find a lower stream
return nil, false
}
// select higher stream
if selector.Type == types.StreamSelectorTypeHigher {
var lastStream types.StreamSinkManager
for _, streamID := range manager.streamIDs {
if streamID == selector.ID {
return lastStream, lastStream != nil
}
stream, ok := manager.streams[streamID]
if ok {
lastStream = stream
}
}
// we couldn't find a higher stream
return nil, false
}
// select exact stream
stream, ok := manager.streams[selector.ID]
return stream, ok
}
// select stream by bitrate
if selector.Bitrate != 0 {
// select stream by nearest bitrate
if selector.Type == types.StreamSelectorTypeNearest {
return manager.nearestBitrate(selector.Bitrate), true
}
// select lower stream
if selector.Type == types.StreamSelectorTypeLower {
// start from the highest stream, and go down, until we find a lower stream
for i := len(manager.streamIDs) - 1; i >= 0; i-- {
streamID := manager.streamIDs[i]
stream := manager.streams[streamID]
// if stream should be considered in calculation
considered := stream.Bitrate() != 0 && stream.Started()
if considered && stream.Bitrate() < selector.Bitrate {
return stream, true
}
}
// we couldn't find a lower stream
return nil, false
}
// select higher stream
if selector.Type == types.StreamSelectorTypeHigher {
// start from the lowest stream, and go up, until we find a higher stream
for _, streamID := range manager.streamIDs {
stream := manager.streams[streamID]
// if stream should be considered in calculation
considered := stream.Bitrate() != 0 && stream.Started()
if considered && stream.Bitrate() > selector.Bitrate {
return stream, true
}
}
// we couldn't find a higher stream
return nil, false
}
// select stream by exact bitrate
for _, stream := range manager.streams {
if stream.Bitrate() == selector.Bitrate {
return stream, true
}
}
}
// we couldn't find a stream
return nil, false
}
// TODO: This is a very naive implementation, we should use a binary search instead.
func (manager *StreamSelectorManagerCtx) nearestBitrate(bitrate uint64) types.StreamSinkManager {
type streamDiff struct {
id string
bitrateDiff int
}
sortDiff := func(a, b int) bool {
switch {
case a < 0 && b < 0:
return a > b
case a >= 0:
if b >= 0 {
return a <= b
}
return true
}
return false
}
var diffs []streamDiff
for _, stream := range manager.streams {
// if stream should be considered in calculation
considered := stream.Bitrate() != 0 && stream.Started()
if !considered {
continue
}
diffs = append(diffs, streamDiff{
id: stream.ID(),
bitrateDiff: int(bitrate) - int(stream.Bitrate()),
})
}
// no streams available
if len(diffs) == 0 {
// return first (lowest) stream
return manager.streams[manager.streamIDs[0]]
}
sort.Slice(diffs, func(i, j int) bool {
return sortDiff(diffs[i].bitrateDiff, diffs[j].bitrateDiff)
})
bestDiff := diffs[0]
return manager.streams[bestDiff.id]
}

View File

@ -0,0 +1,414 @@
package capture
import (
"errors"
"reflect"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/demodesk/neko/pkg/gst"
"github.com/demodesk/neko/pkg/types"
"github.com/demodesk/neko/pkg/types/codec"
)
var moveSinkListenerMu = sync.Mutex{}
type StreamSinkManagerCtx struct {
id string
// wait for a keyframe before sending samples
waitForKf bool
bitrate uint64 // atomic
brBuckets map[int]float64
logger zerolog.Logger
mu sync.Mutex
wg sync.WaitGroup
codec codec.RTPCodec
pipeline gst.Pipeline
pipelineMu sync.Mutex
pipelineFn func() (string, error)
listeners map[uintptr]types.SampleListener
listenersKf map[uintptr]types.SampleListener // keyframe lobby
listenersMu sync.Mutex
// metrics
currentListeners prometheus.Gauge
totalBytes prometheus.Counter
pipelinesCounter prometheus.Counter
pipelinesActive prometheus.Gauge
}
func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), id string) *StreamSinkManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "stream-sink").
Str("id", id).Logger()
manager := &StreamSinkManagerCtx{
id: id,
// only wait for keyframes if the codec is video
waitForKf: codec.IsVideo(),
bitrate: 0,
brBuckets: map[int]float64{},
logger: logger,
codec: codec,
pipelineFn: pipelineFn,
listeners: map[uintptr]types.SampleListener{},
listenersKf: map[uintptr]types.SampleListener{},
// metrics
currentListeners: promauto.NewGauge(prometheus.GaugeOpts{
Name: "streamsink_listeners",
Namespace: "neko",
Subsystem: "capture",
Help: "Current number of listeners for a pipeline.",
ConstLabels: map[string]string{
"video_id": id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
}),
totalBytes: promauto.NewCounter(prometheus.CounterOpts{
Name: "streamsink_bytes",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of bytes created by the pipeline.",
ConstLabels: map[string]string{
"video_id": id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
}),
pipelinesCounter: promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of created pipelines.",
ConstLabels: map[string]string{
"submodule": "streamsink",
"video_id": id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
}),
pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "streamsink",
"video_id": id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
}),
}
return manager
}
func (manager *StreamSinkManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")
manager.listenersMu.Lock()
for key := range manager.listeners {
delete(manager.listeners, key)
}
for key := range manager.listenersKf {
delete(manager.listenersKf, key)
}
manager.listenersMu.Unlock()
manager.DestroyPipeline()
manager.wg.Wait()
}
func (manager *StreamSinkManagerCtx) ID() string {
return manager.id
}
func (manager *StreamSinkManagerCtx) Bitrate() uint64 {
return atomic.LoadUint64(&manager.bitrate)
}
func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec {
return manager.codec
}
func (manager *StreamSinkManagerCtx) start() error {
if len(manager.listeners)+len(manager.listenersKf) == 0 {
err := manager.CreatePipeline()
if err != nil && !errors.Is(err, types.ErrCapturePipelineAlreadyExists) {
return err
}
manager.logger.Info().Msgf("first listener, starting")
}
return nil
}
func (manager *StreamSinkManagerCtx) stop() {
if len(manager.listeners)+len(manager.listenersKf) == 0 {
manager.DestroyPipeline()
manager.logger.Info().Msgf("last listener, stopping")
}
}
func (manager *StreamSinkManagerCtx) addListener(listener types.SampleListener) {
ptr := reflect.ValueOf(listener).Pointer()
emitKeyframe := false
manager.listenersMu.Lock()
if manager.waitForKf {
// if this is the first listener, we need to emit a keyframe
emitKeyframe = len(manager.listenersKf) == 0
// if we're waiting for a keyframe, add it to the keyframe lobby
manager.listenersKf[ptr] = listener
} else {
// otherwise, add it as a regular listener
manager.listeners[ptr] = listener
}
manager.listenersMu.Unlock()
manager.logger.Debug().Interface("ptr", ptr).Msgf("adding listener")
manager.currentListeners.Set(float64(manager.ListenersCount()))
// if we will be waiting for a keyframe, emit one now
if manager.pipeline != nil && emitKeyframe {
manager.pipeline.EmitVideoKeyframe()
}
}
func (manager *StreamSinkManagerCtx) removeListener(listener types.SampleListener) {
ptr := reflect.ValueOf(listener).Pointer()
manager.listenersMu.Lock()
delete(manager.listeners, ptr)
delete(manager.listenersKf, ptr) // if it's a keyframe listener, remove it too
manager.listenersMu.Unlock()
manager.logger.Debug().Interface("ptr", ptr).Msgf("removing listener")
manager.currentListeners.Set(float64(manager.ListenersCount()))
}
func (manager *StreamSinkManagerCtx) AddListener(listener types.SampleListener) error {
manager.mu.Lock()
defer manager.mu.Unlock()
if listener == nil {
return errors.New("listener cannot be nil")
}
// start if stopped
if err := manager.start(); err != nil {
return err
}
// add listener
manager.addListener(listener)
return nil
}
func (manager *StreamSinkManagerCtx) RemoveListener(listener types.SampleListener) error {
manager.mu.Lock()
defer manager.mu.Unlock()
if listener == nil {
return errors.New("listener cannot be nil")
}
// remove listener
manager.removeListener(listener)
// stop if started
manager.stop()
return nil
}
// moving listeners between streams ensures, that target pipeline is running
// before listener is added, and stops source pipeline if there are 0 listeners
func (manager *StreamSinkManagerCtx) MoveListenerTo(listener types.SampleListener, stream types.StreamSinkManager) error {
if listener == nil {
return errors.New("listener cannot be nil")
}
targetStream, ok := stream.(*StreamSinkManagerCtx)
if !ok {
return errors.New("target stream manager does not support moving listeners")
}
// we need to acquire both mutextes, from source stream and from target stream
// in order to do that safely (without possibility of deadlock) we need third
// global mutex, that ensures atomic locking
// lock global mutex
moveSinkListenerMu.Lock()
// lock source stream
manager.mu.Lock()
defer manager.mu.Unlock()
// lock target stream
targetStream.mu.Lock()
defer targetStream.mu.Unlock()
// unlock global mutex
moveSinkListenerMu.Unlock()
// start if stopped
if err := targetStream.start(); err != nil {
return err
}
// swap listeners
manager.removeListener(listener)
targetStream.addListener(listener)
// stop if started
manager.stop()
return nil
}
func (manager *StreamSinkManagerCtx) ListenersCount() int {
manager.listenersMu.Lock()
defer manager.listenersMu.Unlock()
return len(manager.listeners) + len(manager.listenersKf)
}
func (manager *StreamSinkManagerCtx) Started() bool {
return manager.ListenersCount() > 0
}
func (manager *StreamSinkManagerCtx) CreatePipeline() error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
}
pipelineStr, err := manager.pipelineFn()
if err != nil {
return err
}
manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", pipelineStr).
Msgf("creating pipeline")
manager.pipeline, err = gst.CreatePipeline(pipelineStr)
if err != nil {
return err
}
manager.pipeline.AttachAppsink("appsink")
manager.pipeline.Play()
manager.wg.Add(1)
pipeline := manager.pipeline
go func() {
manager.logger.Debug().Msg("started emitting samples")
defer manager.wg.Done()
for {
sample, ok := <-pipeline.Sample()
if !ok {
manager.logger.Debug().Msg("stopped emitting samples")
return
}
manager.onSample(sample)
}
}()
manager.pipelinesCounter.Inc()
manager.pipelinesActive.Set(1)
return nil
}
func (manager *StreamSinkManagerCtx) saveSampleBitrate(timestamp time.Time, delta float64) {
// get unix timestamp in seconds
sec := timestamp.Unix()
// last bucket is timestamp rounded to 3 seconds - 1 second
last := int((sec - 1) % 3)
// current bucket is timestamp rounded to 3 seconds
curr := int(sec % 3)
// next bucket is timestamp rounded to 3 seconds + 1 second
next := int((sec + 1) % 3)
if manager.brBuckets[next] != 0 {
// atomic update bitrate
atomic.StoreUint64(&manager.bitrate, uint64(manager.brBuckets[last]))
// empty next bucket
manager.brBuckets[next] = 0
}
// add rate to current bucket
manager.brBuckets[curr] += delta
}
func (manager *StreamSinkManagerCtx) onSample(sample types.Sample) {
manager.listenersMu.Lock()
defer manager.listenersMu.Unlock()
// save to metrics
length := float64(sample.Length)
manager.totalBytes.Add(length)
manager.saveSampleBitrate(sample.Timestamp, length)
// if is not delta unit -> it can be decoded independently -> it is a keyframe
if manager.waitForKf && !sample.DeltaUnit && len(manager.listenersKf) > 0 {
// if current sample is a keyframe, move listeners from
// keyframe lobby to actual listeners map and clear lobby
for k, v := range manager.listenersKf {
manager.listeners[k] = v
}
manager.listenersKf = make(map[uintptr]types.SampleListener)
}
for _, l := range manager.listeners {
l.WriteSample(sample)
}
}
func (manager *StreamSinkManagerCtx) DestroyPipeline() {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return
}
manager.pipeline.Destroy()
manager.logger.Info().Msgf("destroying pipeline")
manager.pipeline = nil
manager.pipelinesActive.Set(0)
manager.brBuckets = make(map[int]float64)
atomic.StoreUint64(&manager.bitrate, 0)
}

View File

@ -0,0 +1,197 @@
package capture
import (
"errors"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/demodesk/neko/pkg/gst"
"github.com/demodesk/neko/pkg/types"
"github.com/demodesk/neko/pkg/types/codec"
)
type StreamSrcManagerCtx struct {
logger zerolog.Logger
enabled bool
codecPipeline map[string]string // codec -> pipeline
codec codec.RTPCodec
pipeline gst.Pipeline
pipelineMu sync.Mutex
pipelineStr string
// metrics
pushedData map[string]prometheus.Summary
pipelinesCounter map[string]prometheus.Counter
pipelinesActive map[string]prometheus.Gauge
}
func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "stream-src").
Str("video_id", video_id).Logger()
pushedData := map[string]prometheus.Summary{}
pipelinesCounter := map[string]prometheus.Counter{}
pipelinesActive := map[string]prometheus.Gauge{}
for codecName, pipeline := range codecPipeline {
codec, ok := codec.ParseStr(codecName)
if !ok {
logger.Fatal().
Str("codec", codecName).
Str("pipeline", pipeline).
Msg("unknown codec name")
}
pushedData[codecName] = promauto.NewSummary(prometheus.SummaryOpts{
Name: "streamsrc_data_bytes",
Namespace: "neko",
Subsystem: "capture",
Help: "Data pushed to a pipeline (in bytes).",
ConstLabels: map[string]string{
"video_id": video_id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
})
pipelinesCounter[codecName] = promauto.NewCounter(prometheus.CounterOpts{
Name: "pipelines_total",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of created pipelines.",
ConstLabels: map[string]string{
"submodule": "streamsrc",
"video_id": video_id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
})
pipelinesActive[codecName] = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "streamsrc",
"video_id": video_id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
})
}
return &StreamSrcManagerCtx{
logger: logger,
enabled: enabled,
codecPipeline: codecPipeline,
// metrics
pushedData: pushedData,
pipelinesCounter: pipelinesCounter,
pipelinesActive: pipelinesActive,
}
}
func (manager *StreamSrcManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")
manager.Stop()
}
func (manager *StreamSrcManagerCtx) Codec() codec.RTPCodec {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
return manager.codec
}
func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
}
if !manager.enabled {
return errors.New("stream-src not enabled")
}
found := false
for codecName, pipeline := range manager.codecPipeline {
if codecName == codec.Name {
manager.pipelineStr = pipeline
manager.codec = codec
found = true
break
}
}
if !found {
return errors.New("no pipeline found for a codec")
}
var err error
manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipelineStr).
Msgf("creating pipeline")
manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr)
if err != nil {
return err
}
manager.pipeline.AttachAppsrc("appsrc")
manager.pipeline.Play()
manager.pipelinesCounter[manager.codec.Name].Inc()
manager.pipelinesActive[manager.codec.Name].Set(1)
return nil
}
func (manager *StreamSrcManagerCtx) Stop() {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return
}
manager.pipeline.Destroy()
manager.pipeline = nil
manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipelineStr).
Msgf("destroying pipeline")
manager.pipelinesActive[manager.codec.Name].Set(0)
}
func (manager *StreamSrcManagerCtx) Push(bytes []byte) {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return
}
manager.pipeline.Push(bytes)
manager.pushedData[manager.codec.Name].Observe(float64(len(bytes)))
}
func (manager *StreamSrcManagerCtx) Started() bool {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
return manager.pipeline != nil
}