strongly typed session events channel.

This commit is contained in:
Miroslav Šedivý 2023-01-29 18:18:37 +01:00
parent f3080713ce
commit 2649594c2e
4 changed files with 62 additions and 59 deletions

View File

@ -13,23 +13,21 @@ import (
func New(capture types.CaptureManager) *SessionManager { func New(capture types.CaptureManager) *SessionManager {
return &SessionManager{ return &SessionManager{
logger: log.With().Str("module", "session").Logger(), logger: log.With().Str("module", "session").Logger(),
host: "", host: "",
capture: capture, capture: capture,
sessionChannel: make(chan types.SessionInformation, 10), eventsChannel: make(chan types.SessionEvent, 10),
hostChannel: make(chan types.HostInformation, 10), members: make(map[string]*Session),
members: make(map[string]*Session),
} }
} }
type SessionManager struct { type SessionManager struct {
mu sync.Mutex mu sync.Mutex
logger zerolog.Logger logger zerolog.Logger
host string host string
capture types.CaptureManager capture types.CaptureManager
members map[string]*Session members map[string]*Session
sessionChannel chan types.SessionInformation eventsChannel chan types.SessionEvent
hostChannel chan types.HostInformation
// TODO: Handle locks in sessions as flags. // TODO: Handle locks in sessions as flags.
controlLocked bool controlLocked bool
} }
@ -50,18 +48,12 @@ func (manager *SessionManager) New(id string, admin bool, socket types.WebSocket
manager.capture.Video().AddListener() manager.capture.Video().AddListener()
manager.mu.Unlock() manager.mu.Unlock()
manager.sessionChannel <- types.SessionInformation{ manager.eventsChannel <- types.SessionEvent{
Type: "created", Type: types.SESSION_CREATED,
Id: id, Id: id,
Session: session, Session: session,
} }
go func() {
for {
// TODO: Unused.
<-manager.hostChannel
}
}()
return session return session
} }
@ -81,10 +73,11 @@ func (manager *SessionManager) SetHost(id string) error {
if ok { if ok {
manager.host = id manager.host = id
manager.hostChannel <- types.HostInformation{ manager.eventsChannel <- types.SessionEvent{
Type: "host", Type: types.SESSION_HOST_SET,
Id: id, Id: id,
} }
return nil return nil
} }
@ -102,8 +95,9 @@ func (manager *SessionManager) GetHost() (types.Session, bool) {
func (manager *SessionManager) ClearHost() { func (manager *SessionManager) ClearHost() {
id := manager.host id := manager.host
manager.host = "" manager.host = ""
manager.hostChannel <- types.HostInformation{
Type: "host_cleared", manager.eventsChannel <- types.SessionEvent{
Type: types.SESSION_HOST_CLEARED,
Id: id, Id: id,
} }
} }
@ -182,8 +176,8 @@ func (manager *SessionManager) Destroy(id string) {
manager.capture.Video().RemoveListener() manager.capture.Video().RemoveListener()
manager.mu.Unlock() manager.mu.Unlock()
manager.sessionChannel <- types.SessionInformation{ manager.eventsChannel <- types.SessionEvent{
Type: "destroyed", Type: types.SESSION_DESTROYED,
Id: id, Id: id,
Session: session, Session: session,
} }
@ -244,10 +238,6 @@ func (manager *SessionManager) AdminBroadcast(v interface{}, exclude interface{}
return nil return nil
} }
func (manager *SessionManager) GetSessionChannel() chan types.SessionInformation { func (manager *SessionManager) GetEventsChannel() chan types.SessionEvent {
return manager.sessionChannel return manager.eventsChannel
}
func (manager *SessionManager) GetHostChannel() chan types.HostInformation {
return manager.hostChannel
} }

View File

@ -78,8 +78,8 @@ func (session *Session) SetPeer(peer types.Peer) error {
func (session *Session) SetConnected(connected bool) error { func (session *Session) SetConnected(connected bool) error {
session.connected = connected session.connected = connected
if connected { if connected {
session.manager.sessionChannel <- types.SessionInformation{ session.manager.eventsChannel <- types.SessionEvent{
Type: "connected", Type: types.SESSION_CONNECTED,
Id: session.id, Id: session.id,
Session: session, Session: session,
} }

View File

@ -7,8 +7,18 @@ type Member struct {
Muted bool `json:"muted"` Muted bool `json:"muted"`
} }
type SessionInformation struct { type SessionEventType int
Type string
const (
SESSION_CREATED SessionEventType = iota
SESSION_CONNECTED
SESSION_DESTROYED
SESSION_HOST_SET
SESSION_HOST_CLEARED
)
type SessionEvent struct {
Type SessionEventType
Id string Id string
Session Session Session Session
} }
@ -57,6 +67,5 @@ type SessionManager interface {
Clear() error Clear() error
Broadcast(v interface{}, exclude interface{}) error Broadcast(v interface{}, exclude interface{}) error
AdminBroadcast(v interface{}, exclude interface{}) error AdminBroadcast(v interface{}, exclude interface{}) error
GetSessionChannel() chan SessionInformation GetEventsChannel() chan SessionEvent
GetHostChannel() chan HostInformation
} }

View File

@ -103,24 +103,24 @@ type WebSocketHandler struct {
func (ws *WebSocketHandler) Start() { func (ws *WebSocketHandler) Start() {
go func() { go func() {
for { for {
channelMessage, ok := <-ws.sessions.GetSessionChannel() e, ok := <-ws.sessions.GetEventsChannel()
if !ok { if !ok {
ws.logger.Info().Msg("session channel was closed") ws.logger.Info().Msg("session channel was closed")
return return
} }
switch channelMessage.Type { switch e.Type {
case "created": case types.SESSION_CREATED:
if err := ws.handler.SessionCreated(channelMessage.Id, channelMessage.Session); err != nil { if err := ws.handler.SessionCreated(e.Id, e.Session); err != nil {
ws.logger.Warn().Str("id", channelMessage.Id).Err(err).Msg("session created with and error") ws.logger.Warn().Str("id", e.Id).Err(err).Msg("session created with and error")
} else { } else {
ws.logger.Debug().Str("id", channelMessage.Id).Msg("session created") ws.logger.Debug().Str("id", e.Id).Msg("session created")
} }
case "connected": case types.SESSION_CONNECTED:
if err := ws.handler.SessionConnected(channelMessage.Id, channelMessage.Session); err != nil { if err := ws.handler.SessionConnected(e.Id, e.Session); err != nil {
ws.logger.Warn().Str("id", channelMessage.Id).Err(err).Msg("session connected with and error") ws.logger.Warn().Str("id", e.Id).Err(err).Msg("session connected with and error")
} else { } else {
ws.logger.Debug().Str("id", channelMessage.Id).Msg("session connected") ws.logger.Debug().Str("id", e.Id).Msg("session connected")
} }
// if control protection is enabled and at least one admin // if control protection is enabled and at least one admin
@ -134,7 +134,7 @@ func (ws *WebSocketHandler) Start() {
if err := ws.sessions.Broadcast( if err := ws.sessions.Broadcast(
message.AdminLock{ message.AdminLock{
Event: event.ADMIN_UNLOCK, Event: event.ADMIN_UNLOCK,
ID: channelMessage.Id, ID: e.Id,
Resource: "control", Resource: "control",
}, nil); err != nil { }, nil); err != nil {
ws.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.ADMIN_UNLOCK) ws.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.ADMIN_UNLOCK)
@ -142,16 +142,16 @@ func (ws *WebSocketHandler) Start() {
} }
// remove outdated stats // remove outdated stats
if channelMessage.Session.Admin() { if e.Session.Admin() {
ws.lastAdminLeftAt = nil ws.lastAdminLeftAt = nil
} else { } else {
ws.lastUserLeftAt = nil ws.lastUserLeftAt = nil
} }
case "destroyed": case types.SESSION_DESTROYED:
if err := ws.handler.SessionDestroyed(channelMessage.Id); err != nil { if err := ws.handler.SessionDestroyed(e.Id); err != nil {
ws.logger.Warn().Str("id", channelMessage.Id).Err(err).Msg("session destroyed with and error") ws.logger.Warn().Str("id", e.Id).Err(err).Msg("session destroyed with and error")
} else { } else {
ws.logger.Debug().Str("id", channelMessage.Id).Msg("session destroyed") ws.logger.Debug().Str("id", e.Id).Msg("session destroyed")
} }
membersCount := len(ws.sessions.Members()) membersCount := len(ws.sessions.Members())
@ -164,12 +164,12 @@ func (ws *WebSocketHandler) Start() {
ws.state.Lock("control", CONTROL_PROTECTION_SESSION) ws.state.Lock("control", CONTROL_PROTECTION_SESSION)
ws.sessions.SetControlLocked(true) // TODO: Handle locks in sessions as flags. ws.sessions.SetControlLocked(true) // TODO: Handle locks in sessions as flags.
ws.logger.Info().Msgf("control locked and released on behalf of control protection") ws.logger.Info().Msgf("control locked and released on behalf of control protection")
ws.handler.AdminRelease(channelMessage.Id, channelMessage.Session) ws.handler.AdminRelease(e.Id, e.Session)
if err := ws.sessions.Broadcast( if err := ws.sessions.Broadcast(
message.AdminLock{ message.AdminLock{
Event: event.ADMIN_LOCK, Event: event.ADMIN_LOCK,
ID: channelMessage.Id, ID: e.Id,
Resource: "control", Resource: "control",
}, nil); err != nil { }, nil); err != nil {
ws.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.ADMIN_LOCK) ws.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.ADMIN_LOCK)
@ -177,16 +177,20 @@ func (ws *WebSocketHandler) Start() {
} }
// if this was the last admin // if this was the last admin
if channelMessage.Session.Admin() && adminCount == 0 { if e.Session.Admin() && adminCount == 0 {
now := time.Now() now := time.Now()
ws.lastAdminLeftAt = &now ws.lastAdminLeftAt = &now
} }
// if this was the last user // if this was the last user
if !channelMessage.Session.Admin() && membersCount-adminCount == 0 { if !e.Session.Admin() && membersCount-adminCount == 0 {
now := time.Now() now := time.Now()
ws.lastUserLeftAt = &now ws.lastUserLeftAt = &now
} }
case types.SESSION_HOST_SET:
// TODO: Unused.
case types.SESSION_HOST_CLEARED:
// TODO: Unused.
} }
} }
}() }()