diff --git a/client/src/client/events.ts b/client/src/client/events.ts index fad0de51..deafb54a 100644 --- a/client/src/client/events.ts +++ b/client/src/client/events.ts @@ -1,10 +1,14 @@ export const EVENT = { + // Internal Events CONNECTING: 'CONNECTING', CONNECTED: 'CONNECTED', DISCONNECTED: 'DISCONNECTED', TRACK: 'TRACK', MESSAGE: 'MESSAGE', DATA: 'DATA', + + // Websocket Events + DISCONNECT: 'disconnect', SIGNAL: { ANSWER: 'signal/answer', PROVIDE: 'signal/provide', @@ -25,8 +29,7 @@ export const EVENT = { REQUESTING: 'control/requesting', }, CHAT: { - SEND: 'chat/send', - RECEIVE: 'chat/receive', + MESSAGE: 'chat/message', EMOJI: 'chat/emoji', }, ADMIN: { @@ -43,9 +46,10 @@ export const EVENT = { } as const export type Events = typeof EVENT -export type WebSocketEvents = ControlEvents | IdentityEvents | MemberEvents | SignalEvents | ChatEvents +export type WebSocketEvents = SystemEvents | ControlEvents | IdentityEvents | MemberEvents | SignalEvents | ChatEvents +export type SystemEvents = typeof EVENT.DISCONNECT export type ControlEvents = typeof EVENT.CONTROL.LOCKED | typeof EVENT.CONTROL.RELEASE | typeof EVENT.CONTROL.REQUEST export type IdentityEvents = typeof EVENT.IDENTITY.PROVIDE | typeof EVENT.IDENTITY.DETAILS export type MemberEvents = typeof EVENT.MEMBER.LIST | typeof EVENT.MEMBER.CONNECTED | typeof EVENT.MEMBER.DISCONNECTED export type SignalEvents = typeof EVENT.SIGNAL.ANSWER | typeof EVENT.SIGNAL.PROVIDE -export type ChatEvents = typeof EVENT.CHAT.SEND | typeof EVENT.CHAT.RECEIVE +export type ChatEvents = typeof EVENT.CHAT.MESSAGE | typeof EVENT.CHAT.EMOJI diff --git a/server/internal/event/events.go b/server/internal/event/events.go index c97a2708..7d4650a7 100644 --- a/server/internal/event/events.go +++ b/server/internal/event/events.go @@ -1,5 +1,7 @@ package event +const SYSTEM_DISCONNECT = "system/disconnect" + const SIGNAL_ANSWER = "signal/answer" const SIGNAL_PROVIDE = "signal/provide" @@ -15,11 +17,14 @@ const CONTROL_RELEASE = "control/release" const CONTROL_REQUEST = "control/request" const CONTROL_REQUESTING = "control/requesting" -// TODO +const CHAT_MESSAGE = "chat/message" +const CHAT_EMOJI = "chat/emoji" + const ADMIN_BAN = "admin/ban" const ADMIN_KICK = "admin/kick" const ADMIN_LOCK = "admin/lock" const ADMIN_MUTE = "admin/mute" +const ADMIN_UNLOCK = "admin/unlock" const ADMIN_UNMUTE = "admin/unmute" const ADMIN_FORCE_CONTROL = "admin/force/control" const ADMIN_FORCE_RELEASE = "admin/force/release" diff --git a/server/internal/message/messages.go b/server/internal/message/messages.go index 3b02229b..0c9d2ab8 100644 --- a/server/internal/message/messages.go +++ b/server/internal/message/messages.go @@ -6,42 +6,74 @@ type Message struct { Event string `json:"event"` } +type Disconnect struct { + Event string `json:"event"` + Message string `json:"message"` +} + type Identity struct { - Message - ID string `json:"id"` + Event string `json:"event"` + ID string `json:"id"` } type IdentityDetails struct { - Message + Event string `json:"event"` Username string `json:"username"` } type Signal struct { - Message - SDP string `json:"sdp"` + Event string `json:"event"` + SDP string `json:"sdp"` } -type Members struct { - Message +type MembersList struct { + Event string `json:"event"` Memebers []*session.Session `json:"members"` } type Member struct { - Message + Event string `json:"event"` *session.Session } type MemberDisconnected struct { - Message - ID string `json:"id"` + Event string `json:"event"` + ID string `json:"id"` } type Control struct { - Message - ID string `json:"id"` + Event string `json:"event"` + ID string `json:"id"` } -type Chat struct { - Message +type ChatRecieve struct { + Event string `json:"event"` + Content string `json:"content"` +} + +type ChatSend struct { + Event string `json:"event"` ID string `json:"id"` Content string `json:"content"` } + +type EmojiRecieve struct { + Event string `json:"event"` + Emoji string `json:"emoji"` +} + +type EmojiSend struct { + Event string `json:"event"` + ID string `json:"id"` + Emoji string `json:"emoji"` +} + +type Admin struct { + Event string `json:"event"` + ID string `json:"id"` +} + +type AdminSubject struct { + Event string `json:"event"` + Subject string `json:"subject"` + ID string `json:"id"` +} diff --git a/server/internal/session/manager.go b/server/internal/session/manager.go index 7b981620..a4daac8f 100644 --- a/server/internal/session/manager.go +++ b/server/internal/session/manager.go @@ -135,6 +135,30 @@ func (m *SessionManager) SetName(id string, name string) (bool, error) { return false, fmt.Errorf("invalid session id %s", id) } +func (m *SessionManager) Mute(id string) error { + session, ok := m.members[id] + if ok { + session.Muted = true + } + return nil +} + +func (m *SessionManager) Unmute(id string) error { + session, ok := m.members[id] + if ok { + session.Muted = false + } + return nil +} + +func (m *SessionManager) Kick(id string, v interface{}) error { + session, ok := m.members[id] + if ok { + return session.Kick(v) + } + return nil +} + func (m *SessionManager) Clear() error { return nil } diff --git a/server/internal/session/session.go b/server/internal/session/session.go index abe6d789..9456ea08 100644 --- a/server/internal/session/session.go +++ b/server/internal/session/session.go @@ -11,12 +11,21 @@ type Session struct { ID string `json:"id"` Name string `json:"username"` Admin bool `json:"admin"` + Muted bool `json:"-"` connected bool socket *websocket.Conn peer *webrtc.PeerConnection mu sync.Mutex } +func (session *Session) RemoteAddr() *string { + if session.socket != nil { + address := session.socket.RemoteAddr().String() + return &address + } + return nil +} + // TODO: write to peer data channel func (session *Session) Write(v interface{}) error { session.mu.Lock() @@ -24,6 +33,14 @@ func (session *Session) Write(v interface{}) error { return nil } +func (session *Session) Kick(v interface{}) error { + if err := session.Send(v); err != nil { + return err + } + + return session.destroy() +} + func (session *Session) Send(v interface{}) error { session.mu.Lock() defer session.mu.Unlock() diff --git a/server/internal/webrtc/manager.go b/server/internal/webrtc/manager.go index ead02457..f462343c 100644 --- a/server/internal/webrtc/manager.go +++ b/server/internal/webrtc/manager.go @@ -193,8 +193,8 @@ func (m *WebRTCManager) CreatePeer(id string, sdp string) error { } if err := session.Send(message.Signal{ - Message: message.Message{Event: event.SIGNAL_ANSWER}, - SDP: answer.SDP, + Event: event.SIGNAL_ANSWER, + SDP: answer.SDP, }); err != nil { return err } diff --git a/server/internal/websocket/admin.go b/server/internal/websocket/admin.go new file mode 100644 index 00000000..b1d321e8 --- /dev/null +++ b/server/internal/websocket/admin.go @@ -0,0 +1,189 @@ +package websocket + +import ( + "n.eko.moe/neko/internal/event" + "n.eko.moe/neko/internal/message" + "n.eko.moe/neko/internal/session" +) + +func (h *MessageHandler) adminLock(id string, session *session.Session) error { + if !session.Admin || !h.locked { + return nil + } + + h.locked = true + + if err := h.sessions.Brodcast( + message.Admin{ + Event: event.ADMIN_LOCK, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.ADMIN_LOCK) + return err + } + + return nil +} + +func (h *MessageHandler) adminUnlock(id string, session *session.Session) error { + if !session.Admin || !h.locked { + return nil + } + + h.locked = false + + if err := h.sessions.Brodcast( + message.Admin{ + Event: event.ADMIN_UNLOCK, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.ADMIN_UNLOCK) + return err + } + + return nil +} + +func (h *MessageHandler) adminControl(id string, session *session.Session) error { + if !session.Admin { + return nil + } + + h.sessions.SetHost(id) + + if err := h.sessions.Brodcast( + message.Admin{ + Event: event.ADMIN_FORCE_CONTROL, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.ADMIN_FORCE_CONTROL) + return err + } + + return nil +} + +func (h *MessageHandler) adminRelease(id string, session *session.Session) error { + if !session.Admin { + return nil + } + + h.sessions.ClearHost() + + if err := h.sessions.Brodcast( + message.Admin{ + Event: event.ADMIN_FORCE_RELEASE, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.ADMIN_FORCE_RELEASE) + return err + } + + return nil +} + +func (h *MessageHandler) adminBan(id string, session *session.Session, payload *message.Admin) error { + if !session.Admin { + return nil + } + + session, ok := h.sessions.Get(id) + if !ok { + return nil + } + + address := session.RemoteAddr() + if address == nil { + return nil + } + + h.banned[*address] = true + + if err := session.Kick(message.Disconnect{ + Event: event.SYSTEM_DISCONNECT, + Message: "You have been banned", + }); err != nil { + return err + } + + if err := h.sessions.Brodcast( + message.AdminSubject{ + Event: event.ADMIN_BAN, + Subject: payload.ID, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.ADMIN_BAN) + return err + } + + return nil +} + +func (h *MessageHandler) adminKick(id string, session *session.Session, payload *message.Admin) error { + if !session.Admin { + return nil + } + + if err := h.sessions.Kick(payload.ID, message.Disconnect{ + Event: event.SYSTEM_DISCONNECT, + Message: "You have been banned", + }); err != nil { + return err + } + + if err := h.sessions.Brodcast( + message.AdminSubject{ + Event: event.ADMIN_KICK, + Subject: payload.ID, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.ADMIN_KICK) + return err + } + + return nil +} + +func (h *MessageHandler) adminMute(id string, session *session.Session, payload *message.Admin) error { + if !session.Admin { + return nil + } + + if err := h.sessions.Mute(payload.ID); err != nil { + return err + } + + if err := h.sessions.Brodcast( + message.AdminSubject{ + Event: event.ADMIN_MUTE, + Subject: payload.ID, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.ADMIN_UNMUTE) + return err + } + + return nil +} + +func (h *MessageHandler) adminUnmute(id string, session *session.Session, payload *message.Admin) error { + if !session.Admin { + return nil + } + + if err := h.sessions.Unmute(payload.ID); err != nil { + return err + } + + if err := h.sessions.Brodcast( + message.AdminSubject{ + Event: event.ADMIN_UNMUTE, + Subject: payload.ID, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.ADMIN_UNMUTE) + return err + } + + return nil +} diff --git a/server/internal/websocket/chat.go b/server/internal/websocket/chat.go new file mode 100644 index 00000000..448c08ce --- /dev/null +++ b/server/internal/websocket/chat.go @@ -0,0 +1,33 @@ +package websocket + +import ( + "n.eko.moe/neko/internal/event" + "n.eko.moe/neko/internal/message" + "n.eko.moe/neko/internal/session" +) + +func (h *MessageHandler) chat(id string, session *session.Session, payload *message.ChatRecieve) error { + if err := h.sessions.Brodcast( + message.ChatSend{ + Event: event.CHAT_MESSAGE, + Content: payload.Content, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASE) + return err + } + return nil +} + +func (h *MessageHandler) chatEmoji(id string, session *session.Session, payload *message.EmojiRecieve) error { + if err := h.sessions.Brodcast( + message.EmojiSend{ + Event: event.CHAT_MESSAGE, + Emoji: payload.Emoji, + ID: id, + }, nil); err != nil { + h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASE) + return err + } + return nil +} diff --git a/server/internal/websocket/control.go b/server/internal/websocket/control.go index d3186f93..9afcecba 100644 --- a/server/internal/websocket/control.go +++ b/server/internal/websocket/control.go @@ -20,8 +20,8 @@ func (h *MessageHandler) controlRelease(id string, session *session.Session) err // tell everyone if err := h.sessions.Brodcast( message.Control{ - Message: message.Message{Event: event.CONTROL_RELEASE}, - ID: id, + Event: event.CONTROL_RELEASE, + ID: id, }, nil); err != nil { h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASE) return err @@ -41,8 +41,8 @@ func (h *MessageHandler) controlRequest(id string, session *session.Session) err // let everyone know if err := h.sessions.Brodcast( message.Control{ - Message: message.Message{Event: event.CONTROL_LOCKED}, - ID: id, + Event: event.CONTROL_LOCKED, + ID: id, }, nil); err != nil { h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_LOCKED) return err @@ -57,8 +57,8 @@ func (h *MessageHandler) controlRequest(id string, session *session.Session) err // tell session there is a host if err := session.Send(message.Control{ - Message: message.Message{Event: event.CONTROL_REQUEST}, - ID: host.ID, + Event: event.CONTROL_REQUEST, + ID: host.ID, }); err != nil { h.logger.Warn().Err(err).Str("id", id).Msgf("sending event %s has failed", event.CONTROL_REQUEST) return err @@ -66,8 +66,8 @@ func (h *MessageHandler) controlRequest(id string, session *session.Session) err // tell host session wants to be host if err := host.Send(message.Control{ - Message: message.Message{Event: event.CONTROL_REQUESTING}, - ID: id, + Event: event.CONTROL_REQUESTING, + ID: id, }); err != nil { h.logger.Warn().Err(err).Str("id", host.ID).Msgf("sending event %s has failed", event.CONTROL_REQUESTING) return err diff --git a/server/internal/websocket/handler.go b/server/internal/websocket/handler.go index 6b140a23..7f42eaef 100644 --- a/server/internal/websocket/handler.go +++ b/server/internal/websocket/handler.go @@ -10,6 +10,8 @@ import ( "github.com/rs/zerolog/log" "n.eko.moe/neko/internal/config" + "n.eko.moe/neko/internal/event" + "n.eko.moe/neko/internal/message" "n.eko.moe/neko/internal/session" "n.eko.moe/neko/internal/utils" "n.eko.moe/neko/internal/webrtc" @@ -31,6 +33,8 @@ func New(sessions *session.SessionManager, webrtc *webrtc.WebRTCManager, conf *c logger: logger.With().Str("subsystem", "handler").Logger(), sessions: sessions, webrtc: webrtc, + banned: make(map[string]bool), + locked: false, }, } } @@ -106,12 +110,41 @@ func (ws *WebSocketHandler) Upgrade(w http.ResponseWriter, r *http.Request) erro id, admin, err := ws.authenticate(r) if err != nil { ws.logger.Warn().Err(err).Msg("authenticatetion failed") + + if err = socket.WriteJSON(message.Disconnect{ + Event: event.SYSTEM_DISCONNECT, + Message: "invalid password", + }); err != nil { + ws.logger.Error().Err(err).Msg("failed to send disconnect") + } + if err = socket.Close(); err != nil { return err } return nil } + ok, reason, err := ws.handler.SocketConnected(id, socket) + if err != nil { + ws.logger.Error().Err(err).Msg("connection failed") + return err + } + + if !ok { + if err = socket.WriteJSON(message.Disconnect{ + Event: event.SYSTEM_DISCONNECT, + Message: reason, + }); err != nil { + ws.logger.Error().Err(err).Msg("failed to send disconnect") + } + + if err = socket.Close(); err != nil { + return err + } + + return nil + } + ws.sessions.New(id, admin, socket) ws.logger. @@ -128,14 +161,6 @@ func (ws *WebSocketHandler) Upgrade(w http.ResponseWriter, r *http.Request) erro Msg("session ended") }() - if err = ws.handler.SocketConnected(id, socket); err != nil { - ws.logger.Error().Err(err).Msg("connection failed") - if err = socket.Close(); err != nil { - return err - } - return nil - } - ws.handle(socket, id) return nil } diff --git a/server/internal/websocket/identity.go b/server/internal/websocket/identity.go index 6b1f8323..8d7114fd 100644 --- a/server/internal/websocket/identity.go +++ b/server/internal/websocket/identity.go @@ -9,6 +9,5 @@ func (h *MessageHandler) identityDetails(id string, session *session.Session, pa if _, err := h.sessions.SetName(id, payload.Username); err != nil { return err } - return nil } diff --git a/server/internal/websocket/messages.go b/server/internal/websocket/messages.go index fc00092a..0edafc48 100644 --- a/server/internal/websocket/messages.go +++ b/server/internal/websocket/messages.go @@ -18,10 +18,20 @@ type MessageHandler struct { logger zerolog.Logger sessions *session.SessionManager webrtc *webrtc.WebRTCManager + banned map[string]bool + locked bool } -func (h *MessageHandler) SocketConnected(id string, socket *websocket.Conn) error { - return nil +func (h *MessageHandler) SocketConnected(id string, socket *websocket.Conn) (bool, string, error) { + ok, banned := h.banned[socket.RemoteAddr().String()] + if ok && banned { + return false, "you are banned", nil + } + + if h.locked { + return false, "stream is currently locked", nil + } + return true, "", nil } func (h *MessageHandler) SocketDisconnected(id string) error { @@ -40,22 +50,69 @@ func (h *MessageHandler) Message(id string, raw []byte) error { } switch header.Event { + // Signal Events case event.SIGNAL_PROVIDE: payload := message.Signal{} return errors.Wrapf( utils.Unmarshal(&payload, raw, func() error { return h.webrtc.CreatePeer(id, payload.SDP) }), "%s failed", header.Event) + // Identity Events case event.IDENTITY_DETAILS: payload := &message.IdentityDetails{} return errors.Wrapf( utils.Unmarshal(payload, raw, func() error { return h.identityDetails(id, session, payload) }), "%s failed", header.Event) + // Control Events case event.CONTROL_RELEASE: return errors.Wrapf(h.controlRelease(id, session), "%s failed", header.Event) case event.CONTROL_REQUEST: return errors.Wrapf(h.controlRequest(id, session), "%s failed", header.Event) + // Chat Events + case event.CHAT_MESSAGE: + payload := &message.ChatRecieve{} + return errors.Wrapf( + utils.Unmarshal(payload, raw, func() error { + return h.chat(id, session, payload) + }), "%s failed", header.Event) + case event.CHAT_EMOJI: + payload := &message.EmojiRecieve{} + return errors.Wrapf( + utils.Unmarshal(payload, raw, func() error { + return h.chatEmoji(id, session, payload) + }), "%s failed", header.Event) + // Admin Events + case event.ADMIN_LOCK: + return errors.Wrapf(h.adminLock(id, session), "%s failed", header.Event) + case event.ADMIN_FORCE_CONTROL: + return errors.Wrapf(h.adminControl(id, session), "%s failed", header.Event) + case event.ADMIN_FORCE_RELEASE: + return errors.Wrapf(h.adminRelease(id, session), "%s failed", header.Event) + case event.ADMIN_BAN: + payload := &message.Admin{} + return errors.Wrapf( + utils.Unmarshal(payload, raw, func() error { + return h.adminBan(id, session, payload) + }), "%s failed", header.Event) + case event.ADMIN_KICK: + payload := &message.Admin{} + return errors.Wrapf( + utils.Unmarshal(payload, raw, func() error { + return h.adminKick(id, session, payload) + }), "%s failed", header.Event) + case event.ADMIN_MUTE: + payload := &message.Admin{} + return errors.Wrapf( + utils.Unmarshal(payload, raw, func() error { + return h.adminMute(id, session, payload) + }), "%s failed", header.Event) + case event.ADMIN_UNMUTE: + payload := &message.Admin{} + return errors.Wrapf( + utils.Unmarshal(payload, raw, func() error { + return h.adminUnmute(id, session, payload) + }), "%s failed", header.Event) default: return errors.Errorf("unknown message event %s", header.Event) } diff --git a/server/internal/websocket/session.go b/server/internal/websocket/session.go index 1eba39b0..ae2adfc9 100644 --- a/server/internal/websocket/session.go +++ b/server/internal/websocket/session.go @@ -8,8 +8,8 @@ import ( func (h *MessageHandler) SessionCreated(id string, session *session.Session) error { if err := session.Send(message.Identity{ - Message: message.Message{Event: event.IDENTITY_PROVIDE}, - ID: id, + Event: event.IDENTITY_PROVIDE, + ID: id, }); err != nil { return err } @@ -19,8 +19,8 @@ func (h *MessageHandler) SessionCreated(id string, session *session.Session) err func (h *MessageHandler) SessionConnected(id string, session *session.Session) error { // send list of members to session - if err := session.Send(message.Members{ - Message: message.Message{Event: event.MEMBER_LIST}, + if err := session.Send(message.MembersList{ + Event: event.MEMBER_LIST, Memebers: h.sessions.GetConnected(), }); err != nil { h.logger.Warn().Str("id", id).Err(err).Msgf("sending event %s has failed", event.MEMBER_LIST) @@ -31,8 +31,8 @@ func (h *MessageHandler) SessionConnected(id string, session *session.Session) e host, ok := h.sessions.GetHost() if ok { if err := session.Send(message.Control{ - Message: message.Message{Event: event.CONTROL_LOCKED}, - ID: host.ID, + Event: event.CONTROL_LOCKED, + ID: host.ID, }); err != nil { h.logger.Warn().Str("id", id).Err(err).Msgf("sending event %s has failed", event.CONTROL_LOCKED) return err @@ -42,7 +42,7 @@ func (h *MessageHandler) SessionConnected(id string, session *session.Session) e // let everyone know there is a new session if err := h.sessions.Brodcast( message.Member{ - Message: message.Message{Event: event.MEMBER_CONNECTED}, + Event: event.MEMBER_CONNECTED, Session: session, }, nil); err != nil { h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASE) @@ -57,8 +57,8 @@ func (h *MessageHandler) SessionDestroyed(id string) error { if h.sessions.IsHost(id) { h.sessions.ClearHost() if err := h.sessions.Brodcast(message.Control{ - Message: message.Message{Event: event.CONTROL_RELEASE}, - ID: id, + Event: event.CONTROL_RELEASE, + ID: id, }, nil); err != nil { h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.CONTROL_RELEASE) } @@ -67,8 +67,8 @@ func (h *MessageHandler) SessionDestroyed(id string) error { // let everyone know session disconnected if err := h.sessions.Brodcast( message.MemberDisconnected{ - Message: message.Message{Event: event.MEMBER_DISCONNECTED}, - ID: id, + Event: event.MEMBER_DISCONNECTED, + ID: id, }, nil); err != nil { h.logger.Warn().Err(err).Msgf("brodcasting event %s has failed", event.MEMBER_DISCONNECTED) return err