message.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package eventstream
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "hash/crc32"
  6. )
  7. const preludeLen = 8
  8. const preludeCRCLen = 4
  9. const msgCRCLen = 4
  10. const minMsgLen = preludeLen + preludeCRCLen + msgCRCLen
  11. const maxPayloadLen = 1024 * 1024 * 16 // 16MB
  12. const maxHeadersLen = 1024 * 128 // 128KB
  13. const maxMsgLen = minMsgLen + maxHeadersLen + maxPayloadLen
  14. var crc32IEEETable = crc32.MakeTable(crc32.IEEE)
  15. // A Message provides the eventstream message representation.
  16. type Message struct {
  17. Headers Headers
  18. Payload []byte
  19. }
  20. func (m *Message) rawMessage() (rawMessage, error) {
  21. var raw rawMessage
  22. if len(m.Headers) > 0 {
  23. var headers bytes.Buffer
  24. if err := EncodeHeaders(&headers, m.Headers); err != nil {
  25. return rawMessage{}, err
  26. }
  27. raw.Headers = headers.Bytes()
  28. raw.HeadersLen = uint32(len(raw.Headers))
  29. }
  30. raw.Length = raw.HeadersLen + uint32(len(m.Payload)) + minMsgLen
  31. hash := crc32.New(crc32IEEETable)
  32. binaryWriteFields(hash, binary.BigEndian, raw.Length, raw.HeadersLen)
  33. raw.PreludeCRC = hash.Sum32()
  34. binaryWriteFields(hash, binary.BigEndian, raw.PreludeCRC)
  35. if raw.HeadersLen > 0 {
  36. hash.Write(raw.Headers)
  37. }
  38. // Read payload bytes and update hash for it as well.
  39. if len(m.Payload) > 0 {
  40. raw.Payload = m.Payload
  41. hash.Write(raw.Payload)
  42. }
  43. raw.CRC = hash.Sum32()
  44. return raw, nil
  45. }
  46. // Clone returns a deep copy of the message.
  47. func (m Message) Clone() Message {
  48. var payload []byte
  49. if m.Payload != nil {
  50. payload = make([]byte, len(m.Payload))
  51. copy(payload, m.Payload)
  52. }
  53. return Message{
  54. Headers: m.Headers.Clone(),
  55. Payload: payload,
  56. }
  57. }
  58. type messagePrelude struct {
  59. Length uint32
  60. HeadersLen uint32
  61. PreludeCRC uint32
  62. }
  63. func (p messagePrelude) PayloadLen() uint32 {
  64. return p.Length - p.HeadersLen - minMsgLen
  65. }
  66. func (p messagePrelude) ValidateLens() error {
  67. if p.Length == 0 || p.Length > maxMsgLen {
  68. return LengthError{
  69. Part: "message prelude",
  70. Want: maxMsgLen,
  71. Have: int(p.Length),
  72. }
  73. }
  74. if p.HeadersLen > maxHeadersLen {
  75. return LengthError{
  76. Part: "message headers",
  77. Want: maxHeadersLen,
  78. Have: int(p.HeadersLen),
  79. }
  80. }
  81. if payloadLen := p.PayloadLen(); payloadLen > maxPayloadLen {
  82. return LengthError{
  83. Part: "message payload",
  84. Want: maxPayloadLen,
  85. Have: int(payloadLen),
  86. }
  87. }
  88. return nil
  89. }
  90. type rawMessage struct {
  91. messagePrelude
  92. Headers []byte
  93. Payload []byte
  94. CRC uint32
  95. }