2021-12-01 22:36:45 +01:00
|
|
|
package capture
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"sync"
|
|
|
|
|
2022-06-15 00:23:16 +02:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
2021-12-01 22:36:45 +01:00
|
|
|
"github.com/rs/zerolog"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
|
2022-07-14 00:58:22 +02:00
|
|
|
"github.com/demodesk/neko/pkg/gst"
|
|
|
|
"github.com/demodesk/neko/pkg/types"
|
|
|
|
"github.com/demodesk/neko/pkg/types/codec"
|
2021-12-01 22:36:45 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
type StreamSrcManagerCtx struct {
|
|
|
|
logger zerolog.Logger
|
2021-12-09 23:22:24 +01:00
|
|
|
enabled bool
|
2021-12-01 22:36:45 +01:00
|
|
|
codecPipeline map[string]string // codec -> pipeline
|
|
|
|
|
|
|
|
codec codec.RTPCodec
|
2022-10-13 19:57:31 +02:00
|
|
|
pipeline gst.Pipeline
|
2021-12-01 22:36:45 +01:00
|
|
|
pipelineMu sync.Mutex
|
|
|
|
pipelineStr string
|
2022-06-15 00:23:16 +02:00
|
|
|
|
|
|
|
// metrics
|
|
|
|
pushedData map[string]prometheus.Summary
|
|
|
|
pipelinesCounter map[string]prometheus.Counter
|
2022-06-19 01:03:16 +02:00
|
|
|
pipelinesActive map[string]prometheus.Gauge
|
2021-12-01 22:36:45 +01:00
|
|
|
}
|
|
|
|
|
2021-12-09 23:22:24 +01:00
|
|
|
func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx {
|
2021-12-01 22:36:45 +01:00
|
|
|
logger := log.With().
|
|
|
|
Str("module", "capture").
|
|
|
|
Str("submodule", "stream-src").
|
|
|
|
Str("video_id", video_id).Logger()
|
|
|
|
|
2022-06-15 00:23:16 +02:00
|
|
|
pushedData := map[string]prometheus.Summary{}
|
|
|
|
pipelinesCounter := map[string]prometheus.Counter{}
|
2022-06-19 01:03:16 +02:00
|
|
|
pipelinesActive := map[string]prometheus.Gauge{}
|
2022-06-25 23:53:19 +02:00
|
|
|
|
2022-06-19 00:51:17 +02:00
|
|
|
for codecName, pipeline := range codecPipeline {
|
|
|
|
codec, ok := codec.ParseStr(codecName)
|
|
|
|
if !ok {
|
|
|
|
logger.Fatal().
|
|
|
|
Str("codec", codecName).
|
|
|
|
Str("pipeline", pipeline).
|
|
|
|
Msg("unknown codec name")
|
|
|
|
}
|
|
|
|
|
|
|
|
pushedData[codecName] = promauto.NewSummary(prometheus.SummaryOpts{
|
|
|
|
Name: "streamsrc_data_bytes",
|
2022-06-15 00:23:16 +02:00
|
|
|
Namespace: "neko",
|
2022-06-19 00:51:17 +02:00
|
|
|
Subsystem: "capture",
|
2022-06-15 00:23:16 +02:00
|
|
|
Help: "Data pushed to a pipeline (in bytes).",
|
|
|
|
ConstLabels: map[string]string{
|
2022-06-19 00:51:17 +02:00
|
|
|
"video_id": video_id,
|
|
|
|
"codec_name": codec.Name,
|
|
|
|
"codec_type": codec.Type.String(),
|
2022-06-15 00:23:16 +02:00
|
|
|
},
|
|
|
|
})
|
2022-06-19 00:51:17 +02:00
|
|
|
pipelinesCounter[codecName] = promauto.NewCounter(prometheus.CounterOpts{
|
2022-06-15 00:23:16 +02:00
|
|
|
Name: "pipelines_total",
|
|
|
|
Namespace: "neko",
|
2022-06-19 00:51:17 +02:00
|
|
|
Subsystem: "capture",
|
2022-06-15 00:23:16 +02:00
|
|
|
Help: "Total number of created pipelines.",
|
|
|
|
ConstLabels: map[string]string{
|
2022-06-19 00:51:17 +02:00
|
|
|
"submodule": "streamsrc",
|
|
|
|
"video_id": video_id,
|
|
|
|
"codec_name": codec.Name,
|
|
|
|
"codec_type": codec.Type.String(),
|
2022-06-15 00:23:16 +02:00
|
|
|
},
|
|
|
|
})
|
2022-06-19 01:03:16 +02:00
|
|
|
pipelinesActive[codecName] = promauto.NewGauge(prometheus.GaugeOpts{
|
|
|
|
Name: "pipelines_active",
|
|
|
|
Namespace: "neko",
|
|
|
|
Subsystem: "capture",
|
|
|
|
Help: "Total number of active pipelines.",
|
|
|
|
ConstLabels: map[string]string{
|
|
|
|
"submodule": "streamsrc",
|
|
|
|
"video_id": video_id,
|
|
|
|
"codec_name": codec.Name,
|
|
|
|
"codec_type": codec.Type.String(),
|
|
|
|
},
|
|
|
|
})
|
2022-06-15 00:23:16 +02:00
|
|
|
}
|
|
|
|
|
2021-12-01 22:36:45 +01:00
|
|
|
return &StreamSrcManagerCtx{
|
|
|
|
logger: logger,
|
2021-12-09 23:22:24 +01:00
|
|
|
enabled: enabled,
|
2021-12-01 22:36:45 +01:00
|
|
|
codecPipeline: codecPipeline,
|
2022-06-15 00:23:16 +02:00
|
|
|
|
|
|
|
// metrics
|
|
|
|
pushedData: pushedData,
|
|
|
|
pipelinesCounter: pipelinesCounter,
|
2022-07-04 18:03:35 +02:00
|
|
|
pipelinesActive: pipelinesActive,
|
2021-12-01 22:36:45 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (manager *StreamSrcManagerCtx) shutdown() {
|
|
|
|
manager.logger.Info().Msgf("shutdown")
|
|
|
|
|
|
|
|
manager.Stop()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (manager *StreamSrcManagerCtx) Codec() codec.RTPCodec {
|
2022-06-25 23:53:19 +02:00
|
|
|
manager.pipelineMu.Lock()
|
|
|
|
defer manager.pipelineMu.Unlock()
|
|
|
|
|
2021-12-01 22:36:45 +01:00
|
|
|
return manager.codec
|
|
|
|
}
|
|
|
|
|
|
|
|
func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error {
|
|
|
|
manager.pipelineMu.Lock()
|
|
|
|
defer manager.pipelineMu.Unlock()
|
|
|
|
|
|
|
|
if manager.pipeline != nil {
|
|
|
|
return types.ErrCapturePipelineAlreadyExists
|
|
|
|
}
|
|
|
|
|
2021-12-09 23:22:24 +01:00
|
|
|
if !manager.enabled {
|
|
|
|
return errors.New("stream-src not enabled")
|
|
|
|
}
|
|
|
|
|
2021-12-01 22:36:45 +01:00
|
|
|
found := false
|
|
|
|
for codecName, pipeline := range manager.codecPipeline {
|
|
|
|
if codecName == codec.Name {
|
|
|
|
manager.pipelineStr = pipeline
|
|
|
|
manager.codec = codec
|
|
|
|
found = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !found {
|
|
|
|
return errors.New("no pipeline found for a codec")
|
|
|
|
}
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
manager.logger.Info().
|
|
|
|
Str("codec", manager.codec.Name).
|
|
|
|
Str("src", manager.pipelineStr).
|
|
|
|
Msgf("creating pipeline")
|
|
|
|
|
|
|
|
manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-12-05 22:06:42 +01:00
|
|
|
manager.pipeline.AttachAppsrc("appsrc")
|
2021-12-01 22:36:45 +01:00
|
|
|
manager.pipeline.Play()
|
2022-06-15 00:23:16 +02:00
|
|
|
|
2022-06-19 01:03:16 +02:00
|
|
|
manager.pipelinesCounter[manager.codec.Name].Inc()
|
|
|
|
manager.pipelinesActive[manager.codec.Name].Set(1)
|
|
|
|
|
2021-12-01 22:36:45 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (manager *StreamSrcManagerCtx) Stop() {
|
|
|
|
manager.pipelineMu.Lock()
|
|
|
|
defer manager.pipelineMu.Unlock()
|
|
|
|
|
|
|
|
if manager.pipeline == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-12-05 18:16:26 +01:00
|
|
|
manager.pipeline.Destroy()
|
2021-12-01 22:36:45 +01:00
|
|
|
manager.pipeline = nil
|
2022-06-19 01:03:16 +02:00
|
|
|
|
2022-07-04 18:03:35 +02:00
|
|
|
manager.logger.Info().
|
|
|
|
Str("codec", manager.codec.Name).
|
|
|
|
Str("src", manager.pipelineStr).
|
|
|
|
Msgf("destroying pipeline")
|
|
|
|
|
2022-06-19 01:03:16 +02:00
|
|
|
manager.pipelinesActive[manager.codec.Name].Set(0)
|
2021-12-01 22:36:45 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (manager *StreamSrcManagerCtx) Push(bytes []byte) {
|
|
|
|
manager.pipelineMu.Lock()
|
|
|
|
defer manager.pipelineMu.Unlock()
|
|
|
|
|
|
|
|
if manager.pipeline == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-12-05 22:06:42 +01:00
|
|
|
manager.pipeline.Push(bytes)
|
2022-06-15 00:23:16 +02:00
|
|
|
manager.pushedData[manager.codec.Name].Observe(float64(len(bytes)))
|
2021-12-01 22:36:45 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (manager *StreamSrcManagerCtx) Started() bool {
|
2022-06-15 00:23:16 +02:00
|
|
|
manager.pipelineMu.Lock()
|
|
|
|
defer manager.pipelineMu.Unlock()
|
|
|
|
|
2021-12-01 22:36:45 +01:00
|
|
|
return manager.pipeline != nil
|
|
|
|
}
|