separate disconnect from destroy.

This commit is contained in:
Miroslav Šedivý 2020-11-01 20:53:25 +01:00
parent 311ed987d8
commit e51541fe01
6 changed files with 41 additions and 38 deletions

View File

@ -57,10 +57,7 @@ func (manager *SessionManagerCtx) Destroy(id string) error {
session, ok := manager.members[id] session, ok := manager.members[id]
if ok { if ok {
delete(manager.members, id) delete(manager.members, id)
err := session.destroy() return session.destroy()
manager.emmiter.Emit("destroy", id)
return err
} }
return nil return nil
@ -151,17 +148,6 @@ func (manager *SessionManagerCtx) OnHostCleared(listener func(session types.Sess
}) })
} }
func (manager *SessionManagerCtx) OnDestroy(listener func(id string)) {
manager.emmiter.On("destroy", func(payload ...interface{}) {
// Stop streaming, if everyone left
if manager.capture.Streaming() && len(manager.members) == 0 {
manager.capture.StopStream()
}
listener(payload[0].(string))
})
}
func (manager *SessionManagerCtx) OnCreated(listener func(session types.Session)) { func (manager *SessionManagerCtx) OnCreated(listener func(session types.Session)) {
manager.emmiter.On("created", func(payload ...interface{}) { manager.emmiter.On("created", func(payload ...interface{}) {
// Start streaming, when first joins // Start streaming, when first joins
@ -178,3 +164,14 @@ func (manager *SessionManagerCtx) OnConnected(listener func(session types.Sessio
listener(payload[0].(*SessionCtx)) listener(payload[0].(*SessionCtx))
}) })
} }
func (manager *SessionManagerCtx) OnDisconnected(listener func(session types.Session)) {
manager.emmiter.On("disconnected", func(payload ...interface{}) {
// Stop streaming, if everyone left
if manager.capture.Streaming() && len(manager.members) == 0 {
manager.capture.StopStream()
}
listener(payload[0].(*SessionCtx))
})
}

View File

@ -74,6 +74,15 @@ func (session *SessionCtx) SetConnected() {
session.manager.emmiter.Emit("connected", session) session.manager.emmiter.Emit("connected", session)
} }
func (session *SessionCtx) SetDisconnected() {
session.connected = false
session.manager.emmiter.Emit("disconnected", session)
session.socket = nil
// TODO: Refactor.
session.manager.Destroy(session.id)
}
func (session *SessionCtx) Disconnect(reason string) error { func (session *SessionCtx) Disconnect(reason string) error {
if session.socket == nil { if session.socket == nil {
return nil return nil
@ -87,7 +96,8 @@ func (session *SessionCtx) Disconnect(reason string) error {
return err return err
} }
return session.manager.Destroy(session.id) session.SetDisconnected()
return nil
} }
func (session *SessionCtx) Send(v interface{}) error { func (session *SessionCtx) Send(v interface{}) error {

View File

@ -15,6 +15,7 @@ type Session interface {
SetSocket(socket WebSocket) SetSocket(socket WebSocket)
SetPeer(peer Peer) SetPeer(peer Peer)
SetConnected() SetConnected()
SetDisconnected()
Disconnect(reason string) error Disconnect(reason string) error
Send(v interface{}) error Send(v interface{}) error
SignalAnswer(sdp string) error SignalAnswer(sdp string) error
@ -37,9 +38,9 @@ type SessionManager interface {
OnHost(listener func(session Session)) OnHost(listener func(session Session))
OnHostCleared(listener func(session Session)) OnHostCleared(listener func(session Session))
OnDestroy(listener func(id string))
OnCreated(listener func(session Session)) OnCreated(listener func(session Session))
OnConnected(listener func(session Session)) OnConnected(listener func(session Session))
OnDisconnected(listener func(session Session))
// auth // auth
Authenticate(r *http.Request) (Session, error) Authenticate(r *http.Request) (Session, error)

View File

@ -156,9 +156,7 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session) (string, bool
case webrtc.PeerConnectionStateDisconnected: case webrtc.PeerConnectionStateDisconnected:
case webrtc.PeerConnectionStateFailed: case webrtc.PeerConnectionStateFailed:
manager.logger.Info().Str("id", session.ID()).Msg("peer disconnected") manager.logger.Info().Str("id", session.ID()).Msg("peer disconnected")
if err:= session.Disconnect("peer connection state failed"); err != nil { session.SetDisconnected()
manager.logger.Warn().Err(err).Msg("error while disconnecting session")
}
case webrtc.PeerConnectionStateConnected: case webrtc.PeerConnectionStateConnected:
manager.logger.Info().Str("id", session.ID()).Msg("peer connected") manager.logger.Info().Str("id", session.ID()).Msg("peer connected")
session.SetConnected() session.SetConnected()

View File

@ -81,14 +81,13 @@ func (h *MessageHandlerCtx) SessionConnected(session types.Session) error {
return nil return nil
} }
func (h *MessageHandlerCtx) SessionDestroyed(id string) error { func (h *MessageHandlerCtx) SessionDisconnected(session types.Session) error {
// clear host if exists // clear host if exists
host := h.sessions.GetHost() if session.IsHost() {
if host != nil && host.ID() == id {
h.sessions.ClearHost() h.sessions.ClearHost()
if err := h.sessions.Broadcast(message.Control{ if err := h.sessions.Broadcast(message.Control{
Event: event.CONTROL_RELEASE, Event: event.CONTROL_RELEASE,
ID: id, ID: session.ID(),
}, nil); err != nil { }, nil); err != nil {
h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.CONTROL_RELEASE) h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.CONTROL_RELEASE)
} }
@ -98,7 +97,7 @@ func (h *MessageHandlerCtx) SessionDestroyed(id string) error {
if err := h.sessions.Broadcast( if err := h.sessions.Broadcast(
message.MemberDisconnected{ message.MemberDisconnected{
Event: event.MEMBER_DISCONNECTED, Event: event.MEMBER_DISCONNECTED,
ID: id, ID: session.ID(),
}, nil); err != nil { }, nil); err != nil {
h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.MEMBER_DISCONNECTED) h.logger.Warn().Err(err).Msgf("broadcasting event %s has failed", event.MEMBER_DISCONNECTED)
return err return err

View File

@ -51,7 +51,7 @@ type WebSocketManagerCtx struct {
func (ws *WebSocketManagerCtx) Start() { func (ws *WebSocketManagerCtx) Start() {
ws.sessions.OnCreated(func(session types.Session) { ws.sessions.OnCreated(func(session types.Session) {
if err := ws.handler.SessionCreated(session); err != nil { if err := ws.handler.SessionCreated(session); err != nil {
ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session created with and error") ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session created with an error")
} else { } else {
ws.logger.Debug().Str("id", session.ID()).Msg("session created") ws.logger.Debug().Str("id", session.ID()).Msg("session created")
} }
@ -59,17 +59,17 @@ func (ws *WebSocketManagerCtx) Start() {
ws.sessions.OnConnected(func(session types.Session) { ws.sessions.OnConnected(func(session types.Session) {
if err := ws.handler.SessionConnected(session); err != nil { if err := ws.handler.SessionConnected(session); err != nil {
ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session connected with and error") ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session connected with an error")
} else { } else {
ws.logger.Debug().Str("id", session.ID()).Msg("session connected") ws.logger.Debug().Str("id", session.ID()).Msg("session connected")
} }
}) })
ws.sessions.OnDestroy(func(id string) { ws.sessions.OnDisconnected(func(session types.Session) {
if err := ws.handler.SessionDestroyed(id); err != nil { if err := ws.handler.SessionDisconnected(session); err != nil {
ws.logger.Warn().Str("id", id).Err(err).Msg("session destroyed with and error") ws.logger.Warn().Str("id", session.ID()).Err(err).Msg("session disconnected with an error")
} else { } else {
ws.logger.Debug().Str("id", id).Msg("session destroyed") ws.logger.Debug().Str("id", session.ID()).Msg("session disconnected")
} }
}) })
@ -184,11 +184,11 @@ func (ws *WebSocketManagerCtx) Upgrade(w http.ResponseWriter, r *http.Request) e
Msg("session ended") Msg("session ended")
}() }()
ws.handle(connection, session.ID()) ws.handle(connection, session)
return nil return nil
} }
func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, id string) { func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, session types.Session) {
bytes := make(chan []byte) bytes := make(chan []byte)
cancel := make(chan struct{}) cancel := make(chan struct{})
ticker := time.NewTicker(pingPeriod) ticker := time.NewTicker(pingPeriod)
@ -197,9 +197,7 @@ func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, id string) {
defer func() { defer func() {
ticker.Stop() ticker.Stop()
ws.logger.Debug().Str("address", connection.RemoteAddr().String()).Msg("handle socket ending") ws.logger.Debug().Str("address", connection.RemoteAddr().String()).Msg("handle socket ending")
if err := ws.handler.Disconnected(id); err != nil { session.SetDisconnected()
ws.logger.Warn().Err(err).Msg("socket disconnected with error")
}
}() }()
for { for {
@ -223,12 +221,12 @@ func (ws *WebSocketManagerCtx) handle(connection *websocket.Conn, id string) {
select { select {
case raw := <-bytes: case raw := <-bytes:
ws.logger.Debug(). ws.logger.Debug().
Str("session", id). Str("session", session.ID()).
Str("address", connection.RemoteAddr().String()). Str("address", connection.RemoteAddr().String()).
Str("raw", string(raw)). Str("raw", string(raw)).
Msg("received message from client") Msg("received message from client")
if err := ws.handler.Message(id, raw); err != nil { if err := ws.handler.Message(session.ID(), raw); err != nil {
ws.logger.Error().Err(err).Msg("message handler has failed") ws.logger.Error().Err(err).Msg("message handler has failed")
} }
case <-cancel: case <-cancel: