neko/internal/capture/streamsrc.go

198 lines
4.6 KiB
Go
Raw Normal View History

2021-12-02 10:36:45 +13:00
package capture
import (
"errors"
"sync"
2022-06-15 10:23:16 +12:00
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
2021-12-02 10:36:45 +13:00
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/demodesk/neko/pkg/gst"
"github.com/demodesk/neko/pkg/types"
"github.com/demodesk/neko/pkg/types/codec"
2021-12-02 10:36:45 +13:00
)
type StreamSrcManagerCtx struct {
logger zerolog.Logger
2021-12-10 11:22:24 +13:00
enabled bool
2021-12-02 10:36:45 +13:00
codecPipeline map[string]string // codec -> pipeline
codec codec.RTPCodec
pipeline *gst.Pipeline
pipelineMu sync.Mutex
pipelineStr string
2022-06-15 10:23:16 +12:00
// metrics
pushedData map[string]prometheus.Summary
pipelinesCounter map[string]prometheus.Counter
2022-06-19 11:03:16 +12:00
pipelinesActive map[string]prometheus.Gauge
2021-12-02 10:36:45 +13:00
}
2021-12-10 11:22:24 +13:00
func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx {
2021-12-02 10:36:45 +13:00
logger := log.With().
Str("module", "capture").
Str("submodule", "stream-src").
Str("video_id", video_id).Logger()
2022-06-15 10:23:16 +12:00
pushedData := map[string]prometheus.Summary{}
pipelinesCounter := map[string]prometheus.Counter{}
2022-06-19 11:03:16 +12:00
pipelinesActive := map[string]prometheus.Gauge{}
2022-06-26 09:53:19 +12:00
2022-06-19 10:51:17 +12:00
for codecName, pipeline := range codecPipeline {
codec, ok := codec.ParseStr(codecName)
if !ok {
logger.Fatal().
Str("codec", codecName).
Str("pipeline", pipeline).
Msg("unknown codec name")
}
pushedData[codecName] = promauto.NewSummary(prometheus.SummaryOpts{
Name: "streamsrc_data_bytes",
2022-06-15 10:23:16 +12:00
Namespace: "neko",
2022-06-19 10:51:17 +12:00
Subsystem: "capture",
2022-06-15 10:23:16 +12:00
Help: "Data pushed to a pipeline (in bytes).",
ConstLabels: map[string]string{
2022-06-19 10:51:17 +12:00
"video_id": video_id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
2022-06-15 10:23:16 +12:00
},
})
2022-06-19 10:51:17 +12:00
pipelinesCounter[codecName] = promauto.NewCounter(prometheus.CounterOpts{
2022-06-15 10:23:16 +12:00
Name: "pipelines_total",
Namespace: "neko",
2022-06-19 10:51:17 +12:00
Subsystem: "capture",
2022-06-15 10:23:16 +12:00
Help: "Total number of created pipelines.",
ConstLabels: map[string]string{
2022-06-19 10:51:17 +12:00
"submodule": "streamsrc",
"video_id": video_id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
2022-06-15 10:23:16 +12:00
},
})
2022-06-19 11:03:16 +12:00
pipelinesActive[codecName] = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pipelines_active",
Namespace: "neko",
Subsystem: "capture",
Help: "Total number of active pipelines.",
ConstLabels: map[string]string{
"submodule": "streamsrc",
"video_id": video_id,
"codec_name": codec.Name,
"codec_type": codec.Type.String(),
},
})
2022-06-15 10:23:16 +12:00
}
2021-12-02 10:36:45 +13:00
return &StreamSrcManagerCtx{
logger: logger,
2021-12-10 11:22:24 +13:00
enabled: enabled,
2021-12-02 10:36:45 +13:00
codecPipeline: codecPipeline,
2022-06-15 10:23:16 +12:00
// metrics
pushedData: pushedData,
pipelinesCounter: pipelinesCounter,
2022-07-05 04:03:35 +12:00
pipelinesActive: pipelinesActive,
2021-12-02 10:36:45 +13:00
}
}
func (manager *StreamSrcManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")
manager.Stop()
}
func (manager *StreamSrcManagerCtx) Codec() codec.RTPCodec {
2022-06-26 09:53:19 +12:00
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
2021-12-02 10:36:45 +13:00
return manager.codec
}
func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
}
2021-12-10 11:22:24 +13:00
if !manager.enabled {
return errors.New("stream-src not enabled")
}
2021-12-02 10:36:45 +13:00
found := false
for codecName, pipeline := range manager.codecPipeline {
if codecName == codec.Name {
manager.pipelineStr = pipeline
manager.codec = codec
found = true
break
}
}
if !found {
return errors.New("no pipeline found for a codec")
}
var err error
manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipelineStr).
Msgf("creating pipeline")
manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr)
if err != nil {
return err
}
2021-12-06 10:06:42 +13:00
manager.pipeline.AttachAppsrc("appsrc")
2021-12-02 10:36:45 +13:00
manager.pipeline.Play()
2022-06-15 10:23:16 +12:00
2022-06-19 11:03:16 +12:00
manager.pipelinesCounter[manager.codec.Name].Inc()
manager.pipelinesActive[manager.codec.Name].Set(1)
2021-12-02 10:36:45 +13:00
return nil
}
func (manager *StreamSrcManagerCtx) Stop() {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return
}
2021-12-06 06:16:26 +13:00
manager.pipeline.Destroy()
2021-12-02 10:36:45 +13:00
manager.pipeline = nil
2022-06-19 11:03:16 +12:00
2022-07-05 04:03:35 +12:00
manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipelineStr).
Msgf("destroying pipeline")
2022-06-19 11:03:16 +12:00
manager.pipelinesActive[manager.codec.Name].Set(0)
2021-12-02 10:36:45 +13:00
}
func (manager *StreamSrcManagerCtx) Push(bytes []byte) {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
if manager.pipeline == nil {
return
}
2021-12-06 10:06:42 +13:00
manager.pipeline.Push(bytes)
2022-06-15 10:23:16 +12:00
manager.pushedData[manager.codec.Name].Observe(float64(len(bytes)))
2021-12-02 10:36:45 +13:00
}
func (manager *StreamSrcManagerCtx) Started() bool {
2022-06-15 10:23:16 +12:00
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()
2021-12-02 10:36:45 +13:00
return manager.pipeline != nil
}