decode.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package eventstream
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "hash"
  9. "hash/crc32"
  10. "io"
  11. "github.com/aws/aws-sdk-go/aws"
  12. )
  13. // Decoder provides decoding of an Event Stream messages.
  14. type Decoder struct {
  15. r io.Reader
  16. logger aws.Logger
  17. }
  18. // NewDecoder initializes and returns a Decoder for decoding event
  19. // stream messages from the reader provided.
  20. func NewDecoder(r io.Reader, opts ...func(*Decoder)) *Decoder {
  21. d := &Decoder{
  22. r: r,
  23. }
  24. for _, opt := range opts {
  25. opt(d)
  26. }
  27. return d
  28. }
  29. // DecodeWithLogger adds a logger to be used by the decoder when decoding
  30. // stream events.
  31. func DecodeWithLogger(logger aws.Logger) func(*Decoder) {
  32. return func(d *Decoder) {
  33. d.logger = logger
  34. }
  35. }
  36. // Decode attempts to decode a single message from the event stream reader.
  37. // Will return the event stream message, or error if Decode fails to read
  38. // the message from the stream.
  39. func (d *Decoder) Decode(payloadBuf []byte) (m Message, err error) {
  40. reader := d.r
  41. if d.logger != nil {
  42. debugMsgBuf := bytes.NewBuffer(nil)
  43. reader = io.TeeReader(reader, debugMsgBuf)
  44. defer func() {
  45. logMessageDecode(d.logger, debugMsgBuf, m, err)
  46. }()
  47. }
  48. m, err = Decode(reader, payloadBuf)
  49. return m, err
  50. }
  51. // Decode attempts to decode a single message from the event stream reader.
  52. // Will return the event stream message, or error if Decode fails to read
  53. // the message from the reader.
  54. func Decode(reader io.Reader, payloadBuf []byte) (m Message, err error) {
  55. crc := crc32.New(crc32IEEETable)
  56. hashReader := io.TeeReader(reader, crc)
  57. prelude, err := decodePrelude(hashReader, crc)
  58. if err != nil {
  59. return Message{}, err
  60. }
  61. if prelude.HeadersLen > 0 {
  62. lr := io.LimitReader(hashReader, int64(prelude.HeadersLen))
  63. m.Headers, err = decodeHeaders(lr)
  64. if err != nil {
  65. return Message{}, err
  66. }
  67. }
  68. if payloadLen := prelude.PayloadLen(); payloadLen > 0 {
  69. buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen)))
  70. if err != nil {
  71. return Message{}, err
  72. }
  73. m.Payload = buf
  74. }
  75. msgCRC := crc.Sum32()
  76. if err := validateCRC(reader, msgCRC); err != nil {
  77. return Message{}, err
  78. }
  79. return m, nil
  80. }
  81. func logMessageDecode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) {
  82. w := bytes.NewBuffer(nil)
  83. defer func() { logger.Log(w.String()) }()
  84. fmt.Fprintf(w, "Raw message:\n%s\n",
  85. hex.Dump(msgBuf.Bytes()))
  86. if decodeErr != nil {
  87. fmt.Fprintf(w, "Decode error: %v\n", decodeErr)
  88. return
  89. }
  90. rawMsg, err := msg.rawMessage()
  91. if err != nil {
  92. fmt.Fprintf(w, "failed to create raw message, %v\n", err)
  93. return
  94. }
  95. decodedMsg := decodedMessage{
  96. rawMessage: rawMsg,
  97. Headers: decodedHeaders(msg.Headers),
  98. }
  99. fmt.Fprintf(w, "Decoded message:\n")
  100. encoder := json.NewEncoder(w)
  101. if err := encoder.Encode(decodedMsg); err != nil {
  102. fmt.Fprintf(w, "failed to generate decoded message, %v\n", err)
  103. }
  104. }
  105. func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) {
  106. var p messagePrelude
  107. var err error
  108. p.Length, err = decodeUint32(r)
  109. if err != nil {
  110. return messagePrelude{}, err
  111. }
  112. p.HeadersLen, err = decodeUint32(r)
  113. if err != nil {
  114. return messagePrelude{}, err
  115. }
  116. if err := p.ValidateLens(); err != nil {
  117. return messagePrelude{}, err
  118. }
  119. preludeCRC := crc.Sum32()
  120. if err := validateCRC(r, preludeCRC); err != nil {
  121. return messagePrelude{}, err
  122. }
  123. p.PreludeCRC = preludeCRC
  124. return p, nil
  125. }
  126. func decodePayload(buf []byte, r io.Reader) ([]byte, error) {
  127. w := bytes.NewBuffer(buf[0:0])
  128. _, err := io.Copy(w, r)
  129. return w.Bytes(), err
  130. }
  131. func decodeUint8(r io.Reader) (uint8, error) {
  132. type byteReader interface {
  133. ReadByte() (byte, error)
  134. }
  135. if br, ok := r.(byteReader); ok {
  136. v, err := br.ReadByte()
  137. return uint8(v), err
  138. }
  139. var b [1]byte
  140. _, err := io.ReadFull(r, b[:])
  141. return uint8(b[0]), err
  142. }
  143. func decodeUint16(r io.Reader) (uint16, error) {
  144. var b [2]byte
  145. bs := b[:]
  146. _, err := io.ReadFull(r, bs)
  147. if err != nil {
  148. return 0, err
  149. }
  150. return binary.BigEndian.Uint16(bs), nil
  151. }
  152. func decodeUint32(r io.Reader) (uint32, error) {
  153. var b [4]byte
  154. bs := b[:]
  155. _, err := io.ReadFull(r, bs)
  156. if err != nil {
  157. return 0, err
  158. }
  159. return binary.BigEndian.Uint32(bs), nil
  160. }
  161. func decodeUint64(r io.Reader) (uint64, error) {
  162. var b [8]byte
  163. bs := b[:]
  164. _, err := io.ReadFull(r, bs)
  165. if err != nil {
  166. return 0, err
  167. }
  168. return binary.BigEndian.Uint64(bs), nil
  169. }
  170. func validateCRC(r io.Reader, expect uint32) error {
  171. msgCRC, err := decodeUint32(r)
  172. if err != nil {
  173. return err
  174. }
  175. if msgCRC != expect {
  176. return ChecksumError{}
  177. }
  178. return nil
  179. }