From 595259b30c7ac61c740646a9b341c4af4d6fa4ed Mon Sep 17 00:00:00 2001 From: m1k1o Date: Mon, 15 Feb 2021 15:41:08 +0100 Subject: [PATCH] fix sessions manager thread safety. --- README.md | 1 + server/internal/session/manager.go | 33 ++++++++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9949ec7c..7d077880 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ This app uses Web RTC to stream a desktop inside of a docker container. This is - Fixed minor gst pipeline bug. - Locked screen only for users, admins can still join. - Fixed h264 pipelines bugs (by @mbattista). +- Fixed sessions manager thread safety by adding mutexes (caused panic in rare edge cases). ### Misc - Custom docker workflow. diff --git a/server/internal/session/manager.go b/server/internal/session/manager.go index 8e51bc5a..de8ab8c8 100644 --- a/server/internal/session/manager.go +++ b/server/internal/session/manager.go @@ -2,6 +2,7 @@ package session import ( "fmt" + "sync" "github.com/kataras/go-events" "github.com/rs/zerolog" @@ -22,6 +23,7 @@ func New(remote types.RemoteManager) *SessionManager { } type SessionManager struct { + mu sync.Mutex logger zerolog.Logger host string remote types.RemoteManager @@ -39,13 +41,14 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket connected: false, } + manager.mu.Lock() manager.members[id] = session - manager.emmiter.Emit("created", id, session) - if manager.remote.Streaming() != true && len(manager.members) > 0 { manager.remote.StartStream() } + manager.mu.Unlock() + manager.emmiter.Emit("created", id, session) return session } @@ -58,16 +61,23 @@ func (manager *SessionManager) IsHost(id string) bool { } func (manager *SessionManager) SetHost(id string) error { + manager.mu.Lock() _, ok := manager.members[id] + manager.mu.Unlock() + if ok { manager.host = id manager.emmiter.Emit("host", id) return nil } + return fmt.Errorf("invalid session id %s", id) } func (manager *SessionManager) GetHost() (types.Session, bool) { + manager.mu.Lock() + defer manager.mu.Unlock() + host, ok := manager.members[manager.host] return host, ok } @@ -79,16 +89,25 @@ func (manager *SessionManager) ClearHost() { } func (manager *SessionManager) Has(id string) bool { + manager.mu.Lock() + defer manager.mu.Unlock() + _, ok := manager.members[id] return ok } func (manager *SessionManager) Get(id string) (types.Session, bool) { + manager.mu.Lock() + defer manager.mu.Unlock() + session, ok := manager.members[id] return session, ok } func (manager *SessionManager) Admins() []*types.Member { + manager.mu.Lock() + defer manager.mu.Unlock() + members := []*types.Member{} for _, session := range manager.members { if !session.connected || !session.admin { @@ -100,10 +119,14 @@ func (manager *SessionManager) Admins() []*types.Member { members = append(members, member) } } + return members } func (manager *SessionManager) Members() []*types.Member { + manager.mu.Lock() + defer manager.mu.Unlock() + members := []*types.Member{} for _, session := range manager.members { if !session.connected { @@ -119,6 +142,7 @@ func (manager *SessionManager) Members() []*types.Member { } func (manager *SessionManager) Destroy(id string) error { + manager.mu.Lock() session, ok := manager.members[id] if ok { err := session.destroy() @@ -127,11 +151,13 @@ func (manager *SessionManager) Destroy(id string) error { if manager.remote.Streaming() != false && len(manager.members) <= 0 { manager.remote.StopStream() } + manager.mu.Unlock() manager.emmiter.Emit("destroyed", id, session) return err } + manager.mu.Unlock() return nil } @@ -140,6 +166,9 @@ func (manager *SessionManager) Clear() error { } func (manager *SessionManager) Broadcast(v interface{}, exclude interface{}) error { + manager.mu.Lock() + defer manager.mu.Unlock() + for id, session := range manager.members { if !session.connected { continue