parser.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package socketio
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "strconv"
  10. "github.com/googollee/go-engine.io"
  11. )
  12. const Protocol = 4
  13. type packetType int
  14. const (
  15. _CONNECT packetType = iota
  16. _DISCONNECT
  17. _EVENT
  18. _ACK
  19. _ERROR
  20. _BINARY_EVENT
  21. _BINARY_ACK
  22. )
  23. func (t packetType) String() string {
  24. switch t {
  25. case _CONNECT:
  26. return "connect"
  27. case _DISCONNECT:
  28. return "disconnect"
  29. case _EVENT:
  30. return "event"
  31. case _ACK:
  32. return "ack"
  33. case _ERROR:
  34. return "error"
  35. case _BINARY_EVENT:
  36. return "binary_event"
  37. case _BINARY_ACK:
  38. return "binary_ack"
  39. }
  40. return fmt.Sprintf("unknown(%d)", t)
  41. }
  42. type frameReader interface {
  43. NextReader() (engineio.MessageType, io.ReadCloser, error)
  44. }
  45. type frameWriter interface {
  46. NextWriter(engineio.MessageType) (io.WriteCloser, error)
  47. }
  48. type packet struct {
  49. Type packetType
  50. NSP string
  51. Id int
  52. Data interface{}
  53. attachNumber int
  54. }
  55. type encoder struct {
  56. w frameWriter
  57. err error
  58. }
  59. func newEncoder(w frameWriter) *encoder {
  60. return &encoder{
  61. w: w,
  62. }
  63. }
  64. func (e *encoder) Encode(v packet) error {
  65. attachments := encodeAttachments(v.Data)
  66. v.attachNumber = len(attachments)
  67. if v.attachNumber > 0 {
  68. v.Type += _BINARY_EVENT - _EVENT
  69. }
  70. if err := e.encodePacket(v); err != nil {
  71. return err
  72. }
  73. for _, a := range attachments {
  74. if err := e.writeBinary(a); err != nil {
  75. return err
  76. }
  77. }
  78. return nil
  79. }
  80. func (e *encoder) encodePacket(v packet) error {
  81. writer, err := e.w.NextWriter(engineio.MessageText)
  82. if err != nil {
  83. return err
  84. }
  85. defer writer.Close()
  86. w := newTrimWriter(writer, "\n")
  87. wh := newWriterHelper(w)
  88. wh.Write([]byte{byte(v.Type) + '0'})
  89. if v.Type == _BINARY_EVENT || v.Type == _BINARY_ACK {
  90. wh.Write([]byte(fmt.Sprintf("%d-", v.attachNumber)))
  91. }
  92. needEnd := false
  93. if v.NSP != "" {
  94. wh.Write([]byte(v.NSP))
  95. needEnd = true
  96. }
  97. if v.Id >= 0 {
  98. f := "%d"
  99. if needEnd {
  100. f = ",%d"
  101. needEnd = false
  102. }
  103. wh.Write([]byte(fmt.Sprintf(f, v.Id)))
  104. }
  105. if v.Data != nil {
  106. if needEnd {
  107. wh.Write([]byte{','})
  108. needEnd = false
  109. }
  110. if wh.Error() != nil {
  111. return wh.Error()
  112. }
  113. encoder := json.NewEncoder(w)
  114. return encoder.Encode(v.Data)
  115. }
  116. return wh.Error()
  117. }
  118. func (e *encoder) writeBinary(r io.Reader) error {
  119. writer, err := e.w.NextWriter(engineio.MessageBinary)
  120. if err != nil {
  121. return err
  122. }
  123. defer writer.Close()
  124. if _, err := io.Copy(writer, r); err != nil {
  125. return err
  126. }
  127. return nil
  128. }
  129. type decoder struct {
  130. reader frameReader
  131. message string
  132. current io.Reader
  133. currentCloser io.Closer
  134. }
  135. func newDecoder(r frameReader) *decoder {
  136. return &decoder{
  137. reader: r,
  138. }
  139. }
  140. func (d *decoder) Close() {
  141. if d != nil && d.currentCloser != nil {
  142. d.currentCloser.Close()
  143. d.current = nil
  144. d.currentCloser = nil
  145. }
  146. }
  147. func (d *decoder) Decode(v *packet) error {
  148. ty, r, err := d.reader.NextReader()
  149. if err != nil {
  150. return err
  151. }
  152. if d.current != nil {
  153. d.Close()
  154. }
  155. defer func() {
  156. if d.current == nil {
  157. r.Close()
  158. }
  159. }()
  160. if ty != engineio.MessageText {
  161. return fmt.Errorf("need text package")
  162. }
  163. reader := bufio.NewReader(r)
  164. v.Id = -1
  165. t, err := reader.ReadByte()
  166. if err != nil {
  167. return err
  168. }
  169. v.Type = packetType(t - '0')
  170. if v.Type == _BINARY_EVENT || v.Type == _BINARY_ACK {
  171. num, err := reader.ReadBytes('-')
  172. if err != nil {
  173. return err
  174. }
  175. numLen := len(num)
  176. if numLen == 0 {
  177. return fmt.Errorf("invalid packet")
  178. }
  179. n, err := strconv.ParseInt(string(num[:numLen-1]), 10, 64)
  180. if err != nil {
  181. return fmt.Errorf("invalid packet")
  182. }
  183. v.attachNumber = int(n)
  184. }
  185. next, err := reader.Peek(1)
  186. if err == io.EOF {
  187. return nil
  188. }
  189. if err != nil {
  190. return err
  191. }
  192. if len(next) == 0 {
  193. return fmt.Errorf("invalid packet")
  194. }
  195. if next[0] == '/' {
  196. path, err := reader.ReadBytes(',')
  197. if err != nil && err != io.EOF {
  198. return err
  199. }
  200. pathLen := len(path)
  201. if pathLen == 0 {
  202. return fmt.Errorf("invalid packet")
  203. }
  204. if err == nil {
  205. path = path[:pathLen-1]
  206. }
  207. v.NSP = string(path)
  208. if err == io.EOF {
  209. return nil
  210. }
  211. }
  212. id := bytes.NewBuffer(nil)
  213. finish := false
  214. for {
  215. next, err := reader.Peek(1)
  216. if err == io.EOF {
  217. finish = true
  218. break
  219. }
  220. if err != nil {
  221. return err
  222. }
  223. if '0' <= next[0] && next[0] <= '9' {
  224. if err := id.WriteByte(next[0]); err != nil {
  225. return err
  226. }
  227. } else {
  228. break
  229. }
  230. reader.ReadByte()
  231. }
  232. if id.Len() > 0 {
  233. id, err := strconv.ParseInt(id.String(), 10, 64)
  234. if err != nil {
  235. return err
  236. }
  237. v.Id = int(id)
  238. }
  239. if finish {
  240. return nil
  241. }
  242. switch v.Type {
  243. case _EVENT:
  244. fallthrough
  245. case _BINARY_EVENT:
  246. msgReader, err := newMessageReader(reader)
  247. if err != nil {
  248. return err
  249. }
  250. d.message = msgReader.Message()
  251. d.current = msgReader
  252. d.currentCloser = r
  253. case _ACK:
  254. fallthrough
  255. case _BINARY_ACK:
  256. d.current = reader
  257. d.currentCloser = r
  258. }
  259. return nil
  260. }
  261. func (d *decoder) Message() string {
  262. return d.message
  263. }
  264. func (d *decoder) DecodeData(v *packet) error {
  265. if d.current == nil {
  266. return nil
  267. }
  268. decoder := json.NewDecoder(d.current)
  269. if err := decoder.Decode(v.Data); err != nil {
  270. return err
  271. }
  272. if v.Type == _BINARY_EVENT || v.Type == _BINARY_ACK {
  273. binary, err := d.decodeBinary(v.attachNumber)
  274. if err != nil {
  275. return err
  276. }
  277. if err := decodeAttachments(v.Data, binary); err != nil {
  278. return err
  279. }
  280. v.Type -= _BINARY_EVENT - _EVENT
  281. }
  282. return nil
  283. }
  284. func (d *decoder) decodeBinary(num int) ([][]byte, error) {
  285. ret := make([][]byte, num)
  286. for i := 0; i < num; i++ {
  287. d.currentCloser.Close()
  288. t, r, err := d.reader.NextReader()
  289. if err != nil {
  290. return nil, err
  291. }
  292. d.currentCloser = r
  293. if t == engineio.MessageText {
  294. return nil, fmt.Errorf("need binary")
  295. }
  296. b, err := ioutil.ReadAll(r)
  297. if err != nil {
  298. return nil, err
  299. }
  300. ret[i] = b
  301. }
  302. return ret, nil
  303. }