| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- package socketio
- import (
- "fmt"
- "reflect"
- "sync"
- )
- type baseHandler struct {
- events map[string]*caller
- name string
- broadcast BroadcastAdaptor
- evMu sync.Mutex
- }
- func newBaseHandler(name string, broadcast BroadcastAdaptor) *baseHandler {
- return &baseHandler{
- events: make(map[string]*caller),
- name: name,
- broadcast: broadcast,
- evMu: sync.Mutex{},
- }
- }
- // On registers the function f to handle an event.
- func (h *baseHandler) On(event string, f interface{}) error {
- c, err := newCaller(f)
- if err != nil {
- return err
- }
- h.evMu.Lock()
- h.events[event] = c
- h.evMu.Unlock()
- return nil
- }
- type socketHandler struct {
- *baseHandler
- acksmu sync.Mutex
- acks map[int]*caller
- socket *socket
- rooms map[string]struct{}
- }
- func newSocketHandler(s *socket, base *baseHandler) *socketHandler {
- events := make(map[string]*caller)
- base.evMu.Lock()
- for k, v := range base.events {
- events[k] = v
- }
- base.evMu.Unlock()
- return &socketHandler{
- baseHandler: &baseHandler{
- events: events,
- broadcast: base.broadcast,
- evMu: base.evMu,
- },
- acks: make(map[int]*caller),
- socket: s,
- rooms: make(map[string]struct{}),
- }
- }
- func (h *socketHandler) Emit(event string, args ...interface{}) error {
- var c *caller
- if l := len(args); l > 0 {
- fv := reflect.ValueOf(args[l-1])
- if fv.Kind() == reflect.Func {
- var err error
- c, err = newCaller(args[l-1])
- if err != nil {
- return err
- }
- args = args[:l-1]
- }
- }
- args = append([]interface{}{event}, args...)
- if c != nil {
- id, err := h.socket.sendId(args)
- if err != nil {
- return err
- }
- h.acksmu.Lock()
- h.acks[id] = c
- h.acksmu.Unlock()
- return nil
- }
- return h.socket.send(args)
- }
- func (h *socketHandler) Rooms() []string {
- ret := make([]string, len(h.rooms))
- i := 0
- for room := range h.rooms {
- ret[i] = room
- i++
- }
- return ret
- }
- func (h *socketHandler) Join(room string) error {
- if err := h.baseHandler.broadcast.Join(h.broadcastName(room), h.socket); err != nil {
- return err
- }
- h.rooms[room] = struct{}{}
- return nil
- }
- func (h *socketHandler) Leave(room string) error {
- if err := h.baseHandler.broadcast.Leave(h.broadcastName(room), h.socket); err != nil {
- return err
- }
- delete(h.rooms, room)
- return nil
- }
- func (h *socketHandler) LeaveAll() error {
- for room := range h.rooms {
- if err := h.baseHandler.broadcast.Leave(h.broadcastName(room), h.socket); err != nil {
- return err
- }
- }
- return nil
- }
- func (h *baseHandler) BroadcastTo(room, event string, args ...interface{}) error {
- return h.broadcast.Send(nil, h.broadcastName(room), event, args...)
- }
- func (h *socketHandler) BroadcastTo(room, event string, args ...interface{}) error {
- return h.baseHandler.broadcast.Send(h.socket, h.broadcastName(room), event, args...)
- }
- func (h *baseHandler) broadcastName(room string) string {
- return fmt.Sprintf("%s:%s", h.name, room)
- }
- func (h *socketHandler) onPacket(decoder *decoder, packet *packet) ([]interface{}, error) {
- defer func() {
- if decoder != nil {
- decoder.Close()
- }
- }()
- var message string
- switch packet.Type {
- case _CONNECT:
- message = "connection"
- case _DISCONNECT:
- message = "disconnection"
- case _ERROR:
- message = "error"
- case _ACK:
- fallthrough
- case _BINARY_ACK:
- return nil, h.onAck(packet.Id, decoder, packet)
- default:
- if decoder != nil {
- message = decoder.Message()
- }
- }
- h.evMu.Lock()
- c, ok := h.events[message]
- h.evMu.Unlock()
- if !ok {
- // If the message is not recognized by the server, the decoder.currentCloser
- // needs to be closed otherwise the server will be stuck until the e
- if decoder != nil {
- decoder.Close()
- }
- return nil, nil
- }
- args := c.GetArgs()
- olen := len(args)
- if olen > 0 && decoder != nil {
- packet.Data = &args
- if err := decoder.DecodeData(packet); err != nil {
- return nil, err
- }
- }
- for i := len(args); i < olen; i++ {
- args = append(args, nil)
- }
- retV := c.Call(h.socket, args)
- if len(retV) == 0 {
- return nil, nil
- }
- var err error
- if last, ok := retV[len(retV)-1].Interface().(error); ok {
- err = last
- retV = retV[0 : len(retV)-1]
- }
- ret := make([]interface{}, len(retV))
- for i, v := range retV {
- ret[i] = v.Interface()
- }
- return ret, err
- }
- func (h *socketHandler) onAck(id int, decoder *decoder, packet *packet) error {
- h.acksmu.Lock()
- c, ok := h.acks[id]
- if !ok {
- h.acksmu.Unlock()
- return nil
- }
- delete(h.acks, id)
- h.acksmu.Unlock()
- args := c.GetArgs()
- packet.Data = &args
- if err := decoder.DecodeData(packet); err != nil {
- return err
- }
- c.Call(h.socket, args)
- return nil
- }
|