channel direct from the pipeline
This commit is contained in:
parent
5690a849e2
commit
161d121e59
@ -17,7 +17,6 @@ import (
|
|||||||
type StreamSinkManagerCtx struct {
|
type StreamSinkManagerCtx struct {
|
||||||
logger zerolog.Logger
|
logger zerolog.Logger
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
wg sync.WaitGroup
|
|
||||||
|
|
||||||
codec codec.RTPCodec
|
codec codec.RTPCodec
|
||||||
pipeline *gst.Pipeline
|
pipeline *gst.Pipeline
|
||||||
@ -29,7 +28,6 @@ type StreamSinkManagerCtx struct {
|
|||||||
listenersMu sync.Mutex
|
listenersMu sync.Mutex
|
||||||
|
|
||||||
changeFramerate int16
|
changeFramerate int16
|
||||||
sampleChannel chan types.Sample
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), video_id string) *StreamSinkManagerCtx {
|
func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), video_id string) *StreamSinkManagerCtx {
|
||||||
@ -44,7 +42,6 @@ func streamSinkNew(codec codec.RTPCodec, pipelineFn func() (string, error), vide
|
|||||||
pipelineFn: pipelineFn,
|
pipelineFn: pipelineFn,
|
||||||
changeFramerate: 0,
|
changeFramerate: 0,
|
||||||
adaptiveFramerate: false,
|
adaptiveFramerate: false,
|
||||||
sampleChannel: make(chan types.Sample, 100),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return manager
|
return manager
|
||||||
@ -54,7 +51,6 @@ func (manager *StreamSinkManagerCtx) shutdown() {
|
|||||||
manager.logger.Info().Msgf("shutdown")
|
manager.logger.Info().Msgf("shutdown")
|
||||||
|
|
||||||
manager.destroyPipeline()
|
manager.destroyPipeline()
|
||||||
manager.wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec {
|
func (manager *StreamSinkManagerCtx) Codec() codec.RTPCodec {
|
||||||
@ -168,24 +164,6 @@ func (manager *StreamSinkManagerCtx) createPipeline() error {
|
|||||||
manager.pipeline.AttachAppsink("appsink" + appsinkSubfix)
|
manager.pipeline.AttachAppsink("appsink" + appsinkSubfix)
|
||||||
manager.pipeline.Play()
|
manager.pipeline.Play()
|
||||||
|
|
||||||
manager.wg.Add(1)
|
|
||||||
pipeline := manager.pipeline
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
manager.logger.Debug().Msg("started emitting samples")
|
|
||||||
defer manager.wg.Done()
|
|
||||||
|
|
||||||
for {
|
|
||||||
sample, ok := <-pipeline.Sample
|
|
||||||
if !ok {
|
|
||||||
manager.logger.Debug().Msg("stopped emitting samples")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
manager.sampleChannel <- sample
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,7 +181,11 @@ func (manager *StreamSinkManagerCtx) destroyPipeline() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (manager *StreamSinkManagerCtx) GetSampleChannel() (chan types.Sample) {
|
func (manager *StreamSinkManagerCtx) GetSampleChannel() (chan types.Sample) {
|
||||||
return manager.sampleChannel
|
if manager.pipeline != nil {
|
||||||
|
return manager.pipeline.Sample
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *StreamSinkManagerCtx) SetChangeFramerate(rate int16) {
|
func (manager *StreamSinkManagerCtx) SetChangeFramerate(rate int16) {
|
||||||
|
@ -57,6 +57,11 @@ func (manager *WebRTCManager) Start() {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
if manager.capture.Audio().GetSampleChannel() == nil {
|
||||||
|
// Pipeline not yet initialized
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
continue
|
||||||
|
}
|
||||||
newSample := <- manager.capture.Audio().GetSampleChannel()
|
newSample := <- manager.capture.Audio().GetSampleChannel()
|
||||||
err := manager.audioTrack.WriteSample(media.Sample(newSample))
|
err := manager.audioTrack.WriteSample(media.Sample(newSample))
|
||||||
if err != nil && errors.Is(err, io.ErrClosedPipe) {
|
if err != nil && errors.Is(err, io.ErrClosedPipe) {
|
||||||
@ -77,6 +82,11 @@ func (manager *WebRTCManager) Start() {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
if manager.capture.Video().GetSampleChannel() == nil {
|
||||||
|
// Pipeline not yet initialized
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
continue
|
||||||
|
}
|
||||||
newSample := <- manager.capture.Video().GetSampleChannel()
|
newSample := <- manager.capture.Video().GetSampleChannel()
|
||||||
err := manager.videoTrack.WriteSample(media.Sample(newSample))
|
err := manager.videoTrack.WriteSample(media.Sample(newSample))
|
||||||
if err != nil && errors.Is(err, io.ErrClosedPipe) {
|
if err != nil && errors.Is(err, io.ErrClosedPipe) {
|
||||||
|
Reference in New Issue
Block a user