payload.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package parser
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "sync"
  9. )
  10. // payloadEncoder is the encoder to encode packets as payload. It can be used in multi-thread.
  11. type PayloadEncoder struct {
  12. buffers [][]byte
  13. locker sync.Mutex
  14. isString bool
  15. }
  16. // NewStringPayloadEncoder returns the encoder which encode as string.
  17. func NewStringPayloadEncoder() *PayloadEncoder {
  18. return &PayloadEncoder{
  19. isString: true,
  20. }
  21. }
  22. // NewStringPayloadEncoder returns the encoder which encode as binary.
  23. func NewBinaryPayloadEncoder() *PayloadEncoder {
  24. return &PayloadEncoder{
  25. isString: false,
  26. }
  27. }
  28. type encoder struct {
  29. *PacketEncoder
  30. buf *bytes.Buffer
  31. binaryPrefix string
  32. payload *PayloadEncoder
  33. }
  34. func (e encoder) Close() error {
  35. if err := e.PacketEncoder.Close(); err != nil {
  36. return err
  37. }
  38. var buffer []byte
  39. if e.payload.isString {
  40. buffer = []byte(fmt.Sprintf("%d:%s", e.buf.Len(), e.buf.String()))
  41. } else {
  42. buffer = []byte(fmt.Sprintf("%s%d", e.binaryPrefix, e.buf.Len()))
  43. for i, n := 0, len(buffer); i < n; i++ {
  44. buffer[i] = buffer[i] - '0'
  45. }
  46. buffer = append(buffer, 0xff)
  47. buffer = append(buffer, e.buf.Bytes()...)
  48. }
  49. e.payload.locker.Lock()
  50. e.payload.buffers = append(e.payload.buffers, buffer)
  51. e.payload.locker.Unlock()
  52. return nil
  53. }
  54. // NextString returns the encoder with packet type t and encode as string.
  55. func (e *PayloadEncoder) NextString(t PacketType) (io.WriteCloser, error) {
  56. buf := bytes.NewBuffer(nil)
  57. pEncoder, err := NewStringEncoder(buf, t)
  58. if err != nil {
  59. return nil, err
  60. }
  61. return encoder{
  62. PacketEncoder: pEncoder,
  63. buf: buf,
  64. binaryPrefix: "0",
  65. payload: e,
  66. }, nil
  67. }
  68. // NextBinary returns the encoder with packet type t and encode as binary.
  69. func (e *PayloadEncoder) NextBinary(t PacketType) (io.WriteCloser, error) {
  70. buf := bytes.NewBuffer(nil)
  71. var pEncoder *PacketEncoder
  72. var err error
  73. if e.isString {
  74. pEncoder, err = NewB64Encoder(buf, t)
  75. } else {
  76. pEncoder, err = NewBinaryEncoder(buf, t)
  77. }
  78. if err != nil {
  79. return nil, err
  80. }
  81. return encoder{
  82. PacketEncoder: pEncoder,
  83. buf: buf,
  84. binaryPrefix: "1",
  85. payload: e,
  86. }, nil
  87. }
  88. // EncodeTo writes encoded payload to writer w. It will clear the buffer of encoder.
  89. func (e *PayloadEncoder) EncodeTo(w io.Writer) error {
  90. e.locker.Lock()
  91. buffers := e.buffers
  92. e.buffers = nil
  93. e.locker.Unlock()
  94. for _, b := range buffers {
  95. for len(b) > 0 {
  96. n, err := w.Write(b)
  97. if err != nil {
  98. return err
  99. }
  100. b = b[n:]
  101. }
  102. }
  103. return nil
  104. }
  105. //IsString returns true if payload encode to string, otherwise returns false.
  106. func (e *PayloadEncoder) IsString() bool {
  107. return e.isString
  108. }
  109. // payloadDecoder is the decoder to decode payload.
  110. type PayloadDecoder struct {
  111. r *bufio.Reader
  112. }
  113. // NewPaylaodDecoder returns the payload decoder which read from reader r.
  114. func NewPayloadDecoder(r io.Reader) *PayloadDecoder {
  115. br, ok := r.(*bufio.Reader)
  116. if !ok {
  117. br = bufio.NewReader(r)
  118. }
  119. return &PayloadDecoder{
  120. r: br,
  121. }
  122. }
  123. // Next returns the packet decoder. Make sure it will be closed after used.
  124. func (d *PayloadDecoder) Next() (*PacketDecoder, error) {
  125. firstByte, err := d.r.Peek(1)
  126. if err != nil {
  127. return nil, err
  128. }
  129. isBinary := firstByte[0] < '0'
  130. delim := byte(':')
  131. if isBinary {
  132. d.r.ReadByte()
  133. delim = 0xff
  134. }
  135. line, err := d.r.ReadBytes(delim)
  136. if err != nil {
  137. return nil, err
  138. }
  139. l := len(line)
  140. if l < 1 {
  141. return nil, fmt.Errorf("invalid input")
  142. }
  143. lenByte := line[:l-1]
  144. if isBinary {
  145. for i, n := 0, l; i < n; i++ {
  146. line[i] = line[i] + '0'
  147. }
  148. }
  149. packetLen, err := strconv.ParseInt(string(lenByte), 10, 64)
  150. if err != nil {
  151. return nil, fmt.Errorf("invalid input")
  152. }
  153. return NewDecoder(newLimitReader(d.r, int(packetLen)))
  154. }