From 2649594c2ea184b556d566ed98291f1f97e4ce0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sun, 29 Jan 2023 18:18:37 +0100 Subject: [PATCH] strongly typed session events channel. --- server/internal/session/manager.go | 56 +++++++++++--------------- server/internal/session/session.go | 4 +- server/internal/types/session.go | 17 ++++++-- server/internal/websocket/websocket.go | 44 +++++++++++--------- 4 files changed, 62 insertions(+), 59 deletions(-) diff --git a/server/internal/session/manager.go b/server/internal/session/manager.go index 9d48b85..21ccdb7 100644 --- a/server/internal/session/manager.go +++ b/server/internal/session/manager.go @@ -13,23 +13,21 @@ import ( func New(capture types.CaptureManager) *SessionManager { return &SessionManager{ - logger: log.With().Str("module", "session").Logger(), - host: "", - capture: capture, - sessionChannel: make(chan types.SessionInformation, 10), - hostChannel: make(chan types.HostInformation, 10), - members: make(map[string]*Session), + logger: log.With().Str("module", "session").Logger(), + host: "", + capture: capture, + eventsChannel: make(chan types.SessionEvent, 10), + members: make(map[string]*Session), } } type SessionManager struct { - mu sync.Mutex - logger zerolog.Logger - host string - capture types.CaptureManager - members map[string]*Session - sessionChannel chan types.SessionInformation - hostChannel chan types.HostInformation + mu sync.Mutex + logger zerolog.Logger + host string + capture types.CaptureManager + members map[string]*Session + eventsChannel chan types.SessionEvent // TODO: Handle locks in sessions as flags. controlLocked bool } @@ -50,18 +48,12 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket manager.capture.Video().AddListener() manager.mu.Unlock() - manager.sessionChannel <- types.SessionInformation{ - Type: "created", + manager.eventsChannel <- types.SessionEvent{ + Type: types.SESSION_CREATED, Id: id, Session: session, } - go func() { - for { - // TODO: Unused. - <-manager.hostChannel - } - }() return session } @@ -81,10 +73,11 @@ func (manager *SessionManager) SetHost(id string) error { if ok { manager.host = id - manager.hostChannel <- types.HostInformation{ - Type: "host", + manager.eventsChannel <- types.SessionEvent{ + Type: types.SESSION_HOST_SET, Id: id, } + return nil } @@ -102,8 +95,9 @@ func (manager *SessionManager) GetHost() (types.Session, bool) { func (manager *SessionManager) ClearHost() { id := manager.host manager.host = "" - manager.hostChannel <- types.HostInformation{ - Type: "host_cleared", + + manager.eventsChannel <- types.SessionEvent{ + Type: types.SESSION_HOST_CLEARED, Id: id, } } @@ -182,8 +176,8 @@ func (manager *SessionManager) Destroy(id string) { manager.capture.Video().RemoveListener() manager.mu.Unlock() - manager.sessionChannel <- types.SessionInformation{ - Type: "destroyed", + manager.eventsChannel <- types.SessionEvent{ + Type: types.SESSION_DESTROYED, Id: id, Session: session, } @@ -244,10 +238,6 @@ func (manager *SessionManager) AdminBroadcast(v interface{}, exclude interface{} return nil } -func (manager *SessionManager) GetSessionChannel() chan types.SessionInformation { - return manager.sessionChannel -} - -func (manager *SessionManager) GetHostChannel() chan types.HostInformation { - return manager.hostChannel +func (manager *SessionManager) GetEventsChannel() chan types.SessionEvent { + return manager.eventsChannel } diff --git a/server/internal/session/session.go b/server/internal/session/session.go index d81c16e..1b1e0f7 100644 --- a/server/internal/session/session.go +++ b/server/internal/session/session.go @@ -78,8 +78,8 @@ func (session *Session) SetPeer(peer types.Peer) error { func (session *Session) SetConnected(connected bool) error { session.connected = connected if connected { - session.manager.sessionChannel <- types.SessionInformation{ - Type: "connected", + session.manager.eventsChannel <- types.SessionEvent{ + Type: types.SESSION_CONNECTED, Id: session.id, Session: session, } diff --git a/server/internal/types/session.go b/server/internal/types/session.go index 12e69f2..2969fee 100644 --- a/server/internal/types/session.go +++ b/server/internal/types/session.go @@ -7,8 +7,18 @@ type Member struct { Muted bool `json:"muted"` } -type SessionInformation struct { - Type string +type SessionEventType int + +const ( + SESSION_CREATED SessionEventType = iota + SESSION_CONNECTED + SESSION_DESTROYED + SESSION_HOST_SET + SESSION_HOST_CLEARED +) + +type SessionEvent struct { + Type SessionEventType Id string Session Session } @@ -57,6 +67,5 @@ type SessionManager interface { Clear() error Broadcast(v interface{}, exclude interface{}) error AdminBroadcast(v interface{}, exclude interface{}) error - GetSessionChannel() chan SessionInformation - GetHostChannel() chan HostInformation + GetEventsChannel() chan SessionEvent } diff --git a/server/internal/websocket/websocket.go b/server/internal/websocket/websocket.go index 5130260..6958c6f 100644 --- a/server/internal/websocket/websocket.go +++ b/server/internal/websocket/websocket.go @@ -103,24 +103,24 @@ type WebSocketHandler struct { func (ws *WebSocketHandler) Start() { go func() { for { - channelMessage, ok := <-ws.sessions.GetSessionChannel() + e, ok := <-ws.sessions.GetEventsChannel() if !ok { ws.logger.Info().Msg("session channel was closed") return } - switch channelMessage.Type { - case "created": - if err := ws.handler.SessionCreated(channelMessage.Id, channelMessage.Session); err != nil { - ws.logger.Warn().Str("id", channelMessage.Id).Err(err).Msg("session created with and error") + switch e.Type { + case types.SESSION_CREATED: + if err := ws.handler.SessionCreated(e.Id, e.Session); err != nil { + ws.logger.Warn().Str("id", e.Id).Err(err).Msg("session created with and error") } else { - ws.logger.Debug().Str("id", channelMessage.Id).Msg("session created") + ws.logger.Debug().Str("id", e.Id).Msg("session created") } - case "connected": - if err := ws.handler.SessionConnected(channelMessage.Id, channelMessage.Session); err != nil { - ws.logger.Warn().Str("id", channelMessage.Id).Err(err).Msg("session connected with and error") + case types.SESSION_CONNECTED: + if err := ws.handler.SessionConnected(e.Id, e.Session); err != nil { + ws.logger.Warn().Str("id", e.Id).Err(err).Msg("session connected with and error") } else { - ws.logger.Debug().Str("id", channelMessage.Id).Msg("session connected") + ws.logger.Debug().Str("id", e.Id).Msg("session connected") } // if control protection is enabled and at least one admin @@ -134,7 +134,7 @@ func (ws *WebSocketHandler) Start() { if err := ws.sessions.Broadcast( message.AdminLock{ Event: event.ADMIN_UNLOCK, - ID: channelMessage.Id, + ID: e.Id, Resource: "control", }, nil); err != nil { ws.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.ADMIN_UNLOCK) @@ -142,16 +142,16 @@ func (ws *WebSocketHandler) Start() { } // remove outdated stats - if channelMessage.Session.Admin() { + if e.Session.Admin() { ws.lastAdminLeftAt = nil } else { ws.lastUserLeftAt = nil } - case "destroyed": - if err := ws.handler.SessionDestroyed(channelMessage.Id); err != nil { - ws.logger.Warn().Str("id", channelMessage.Id).Err(err).Msg("session destroyed with and error") + case types.SESSION_DESTROYED: + if err := ws.handler.SessionDestroyed(e.Id); err != nil { + ws.logger.Warn().Str("id", e.Id).Err(err).Msg("session destroyed with and error") } else { - ws.logger.Debug().Str("id", channelMessage.Id).Msg("session destroyed") + ws.logger.Debug().Str("id", e.Id).Msg("session destroyed") } membersCount := len(ws.sessions.Members()) @@ -164,12 +164,12 @@ func (ws *WebSocketHandler) Start() { ws.state.Lock("control", CONTROL_PROTECTION_SESSION) ws.sessions.SetControlLocked(true) // TODO: Handle locks in sessions as flags. ws.logger.Info().Msgf("control locked and released on behalf of control protection") - ws.handler.AdminRelease(channelMessage.Id, channelMessage.Session) + ws.handler.AdminRelease(e.Id, e.Session) if err := ws.sessions.Broadcast( message.AdminLock{ Event: event.ADMIN_LOCK, - ID: channelMessage.Id, + ID: e.Id, Resource: "control", }, nil); err != nil { ws.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.ADMIN_LOCK) @@ -177,16 +177,20 @@ func (ws *WebSocketHandler) Start() { } // if this was the last admin - if channelMessage.Session.Admin() && adminCount == 0 { + if e.Session.Admin() && adminCount == 0 { now := time.Now() ws.lastAdminLeftAt = &now } // if this was the last user - if !channelMessage.Session.Admin() && membersCount-adminCount == 0 { + if !e.Session.Admin() && membersCount-adminCount == 0 { now := time.Now() ws.lastUserLeftAt = &now } + case types.SESSION_HOST_SET: + // TODO: Unused. + case types.SESSION_HOST_CLEARED: + // TODO: Unused. } } }()