handler.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package socketio
  2. import (
  3. "fmt"
  4. "reflect"
  5. "sync"
  6. )
  7. type baseHandler struct {
  8. events map[string]*caller
  9. name string
  10. broadcast BroadcastAdaptor
  11. evMu sync.Mutex
  12. }
  13. func newBaseHandler(name string, broadcast BroadcastAdaptor) *baseHandler {
  14. return &baseHandler{
  15. events: make(map[string]*caller),
  16. name: name,
  17. broadcast: broadcast,
  18. evMu: sync.Mutex{},
  19. }
  20. }
  21. // On registers the function f to handle an event.
  22. func (h *baseHandler) On(event string, f interface{}) error {
  23. c, err := newCaller(f)
  24. if err != nil {
  25. return err
  26. }
  27. h.evMu.Lock()
  28. h.events[event] = c
  29. h.evMu.Unlock()
  30. return nil
  31. }
  32. type socketHandler struct {
  33. *baseHandler
  34. acksmu sync.Mutex
  35. acks map[int]*caller
  36. socket *socket
  37. rooms map[string]struct{}
  38. }
  39. func newSocketHandler(s *socket, base *baseHandler) *socketHandler {
  40. events := make(map[string]*caller)
  41. base.evMu.Lock()
  42. for k, v := range base.events {
  43. events[k] = v
  44. }
  45. base.evMu.Unlock()
  46. return &socketHandler{
  47. baseHandler: &baseHandler{
  48. events: events,
  49. broadcast: base.broadcast,
  50. evMu: base.evMu,
  51. },
  52. acks: make(map[int]*caller),
  53. socket: s,
  54. rooms: make(map[string]struct{}),
  55. }
  56. }
  57. func (h *socketHandler) Emit(event string, args ...interface{}) error {
  58. var c *caller
  59. if l := len(args); l > 0 {
  60. fv := reflect.ValueOf(args[l-1])
  61. if fv.Kind() == reflect.Func {
  62. var err error
  63. c, err = newCaller(args[l-1])
  64. if err != nil {
  65. return err
  66. }
  67. args = args[:l-1]
  68. }
  69. }
  70. args = append([]interface{}{event}, args...)
  71. if c != nil {
  72. id, err := h.socket.sendId(args)
  73. if err != nil {
  74. return err
  75. }
  76. h.acksmu.Lock()
  77. h.acks[id] = c
  78. h.acksmu.Unlock()
  79. return nil
  80. }
  81. return h.socket.send(args)
  82. }
  83. func (h *socketHandler) Rooms() []string {
  84. ret := make([]string, len(h.rooms))
  85. i := 0
  86. for room := range h.rooms {
  87. ret[i] = room
  88. i++
  89. }
  90. return ret
  91. }
  92. func (h *socketHandler) Join(room string) error {
  93. if err := h.baseHandler.broadcast.Join(h.broadcastName(room), h.socket); err != nil {
  94. return err
  95. }
  96. h.rooms[room] = struct{}{}
  97. return nil
  98. }
  99. func (h *socketHandler) Leave(room string) error {
  100. if err := h.baseHandler.broadcast.Leave(h.broadcastName(room), h.socket); err != nil {
  101. return err
  102. }
  103. delete(h.rooms, room)
  104. return nil
  105. }
  106. func (h *socketHandler) LeaveAll() error {
  107. for room := range h.rooms {
  108. if err := h.baseHandler.broadcast.Leave(h.broadcastName(room), h.socket); err != nil {
  109. return err
  110. }
  111. }
  112. return nil
  113. }
  114. func (h *baseHandler) BroadcastTo(room, event string, args ...interface{}) error {
  115. return h.broadcast.Send(nil, h.broadcastName(room), event, args...)
  116. }
  117. func (h *socketHandler) BroadcastTo(room, event string, args ...interface{}) error {
  118. return h.baseHandler.broadcast.Send(h.socket, h.broadcastName(room), event, args...)
  119. }
  120. func (h *baseHandler) broadcastName(room string) string {
  121. return fmt.Sprintf("%s:%s", h.name, room)
  122. }
  123. func (h *socketHandler) onPacket(decoder *decoder, packet *packet) ([]interface{}, error) {
  124. defer func() {
  125. if decoder != nil {
  126. decoder.Close()
  127. }
  128. }()
  129. var message string
  130. switch packet.Type {
  131. case _CONNECT:
  132. message = "connection"
  133. case _DISCONNECT:
  134. message = "disconnection"
  135. case _ERROR:
  136. message = "error"
  137. case _ACK:
  138. fallthrough
  139. case _BINARY_ACK:
  140. return nil, h.onAck(packet.Id, decoder, packet)
  141. default:
  142. if decoder != nil {
  143. message = decoder.Message()
  144. }
  145. }
  146. h.evMu.Lock()
  147. c, ok := h.events[message]
  148. h.evMu.Unlock()
  149. if !ok {
  150. // If the message is not recognized by the server, the decoder.currentCloser
  151. // needs to be closed otherwise the server will be stuck until the e
  152. if decoder != nil {
  153. decoder.Close()
  154. }
  155. return nil, nil
  156. }
  157. args := c.GetArgs()
  158. olen := len(args)
  159. if olen > 0 && decoder != nil {
  160. packet.Data = &args
  161. if err := decoder.DecodeData(packet); err != nil {
  162. return nil, err
  163. }
  164. }
  165. for i := len(args); i < olen; i++ {
  166. args = append(args, nil)
  167. }
  168. retV := c.Call(h.socket, args)
  169. if len(retV) == 0 {
  170. return nil, nil
  171. }
  172. var err error
  173. if last, ok := retV[len(retV)-1].Interface().(error); ok {
  174. err = last
  175. retV = retV[0 : len(retV)-1]
  176. }
  177. ret := make([]interface{}, len(retV))
  178. for i, v := range retV {
  179. ret[i] = v.Interface()
  180. }
  181. return ret, err
  182. }
  183. func (h *socketHandler) onAck(id int, decoder *decoder, packet *packet) error {
  184. h.acksmu.Lock()
  185. c, ok := h.acks[id]
  186. if !ok {
  187. h.acksmu.Unlock()
  188. return nil
  189. }
  190. delete(h.acks, id)
  191. h.acksmu.Unlock()
  192. args := c.GetArgs()
  193. packet.Data = &args
  194. if err := decoder.DecodeData(packet); err != nil {
  195. return err
  196. }
  197. c.Call(h.socket, args)
  198. return nil
  199. }