add waitgroups.

This commit is contained in:
Miroslav Šedivý 2021-09-02 00:00:56 +02:00
parent 7e2f173460
commit f18ae2b972
3 changed files with 25 additions and 9 deletions

View File

@ -16,6 +16,7 @@ import (
type ScreencastManagerCtx struct { type ScreencastManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup
pipelineStr string pipelineStr string
pipeline *gst.Pipeline pipeline *gst.Pipeline
enabled bool enabled bool
@ -39,8 +40,11 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx {
emitUpdate: make(chan bool), emitUpdate: make(chan bool),
} }
manager.wg.Add(1)
go func() { go func() {
manager.logger.Debug().Msg("started emitting samples") manager.logger.Debug().Msg("started emitting samples")
defer manager.wg.Done()
ticker := time.NewTicker(screencastTimeout) ticker := time.NewTicker(screencastTimeout)
defer ticker.Stop() defer ticker.Stop()
@ -66,10 +70,12 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx {
} }
func (manager *ScreencastManagerCtx) shutdown() { func (manager *ScreencastManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutting down") manager.logger.Info().Msgf("shutdown")
manager.destroyPipeline() manager.destroyPipeline()
manager.emitStop <- true manager.emitStop <- true
manager.wg.Wait()
} }
func (manager *ScreencastManagerCtx) Enabled() bool { func (manager *ScreencastManagerCtx) Enabled() bool {

View File

@ -15,6 +15,7 @@ import (
type StreamManagerCtx struct { type StreamManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup
codec codec.RTPCodec codec codec.RTPCodec
pipelineStr func() string pipelineStr func() string
pipeline *gst.Pipeline pipeline *gst.Pipeline
@ -42,8 +43,11 @@ func streamNew(codec codec.RTPCodec, pipelineStr func() string, video_id string)
started: false, started: false,
} }
manager.wg.Add(1)
go func() { go func() {
manager.logger.Debug().Msg("started emitting samples") manager.logger.Debug().Msg("started emitting samples")
defer manager.wg.Done()
for { for {
select { select {
@ -66,7 +70,7 @@ func streamNew(codec codec.RTPCodec, pipelineStr func() string, video_id string)
} }
func (manager *StreamManagerCtx) shutdown() { func (manager *StreamManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutting down") manager.logger.Info().Msgf("shutdown")
manager.emitMu.Lock() manager.emitMu.Lock()
for key := range manager.listeners { for key := range manager.listeners {
@ -75,7 +79,9 @@ func (manager *StreamManagerCtx) shutdown() {
manager.emitMu.Unlock() manager.emitMu.Unlock()
manager.destroyPipeline() manager.destroyPipeline()
manager.emitStop <- true manager.emitStop <- true
manager.wg.Wait()
} }
func (manager *StreamManagerCtx) Codec() codec.RTPCodec { func (manager *StreamManagerCtx) Codec() codec.RTPCodec {

View File

@ -18,6 +18,7 @@ var mu = sync.Mutex{}
type DesktopManagerCtx struct { type DesktopManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
wg sync.WaitGroup
shutdown chan bool shutdown chan bool
emmiter events.EventEmmiter emmiter events.EventEmmiter
config *config.Desktop config *config.Desktop
@ -39,14 +40,11 @@ func (manager *DesktopManagerCtx) Start() {
xorg.GetScreenConfigurations() xorg.GetScreenConfigurations()
manager.logger.Info(). err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)
manager.logger.Err(err).
Str("screen_size", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)). Str("screen_size", fmt.Sprintf("%dx%d@%d", manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate)).
Msgf("setting initial screen size") Msgf("setting initial screen size")
if err := xorg.ChangeScreenSize(manager.config.ScreenWidth, manager.config.ScreenHeight, manager.config.ScreenRate); err != nil {
manager.logger.Err(err).Msg("unable to set initial screen size")
}
go xevent.EventLoop(manager.config.Display) go xevent.EventLoop(manager.config.Display)
// In case it was opened // In case it was opened
@ -61,14 +59,17 @@ func (manager *DesktopManagerCtx) Start() {
Msg("X event error occured") Msg("X event error occured")
}) })
manager.wg.Add(1)
go func() { go func() {
defer manager.wg.Done()
ticker := time.NewTicker(1 * time.Second) ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-manager.shutdown: case <-manager.shutdown:
xorg.DisplayClose()
return return
case <-ticker.C: case <-ticker.C:
xorg.CheckKeys(time.Second * 10) xorg.CheckKeys(time.Second * 10)
@ -90,8 +91,11 @@ func (manager *DesktopManagerCtx) OnAfterScreenSizeChange(listener func()) {
} }
func (manager *DesktopManagerCtx) Shutdown() error { func (manager *DesktopManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("desktop shutting down") manager.logger.Info().Msgf("shutdown")
manager.shutdown <- true manager.shutdown <- true
manager.wg.Wait()
xorg.DisplayClose()
return nil return nil
} }