Reader.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. * Copyright (c) 2014 by Farsight Security, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package framestream
  17. import (
  18. "bufio"
  19. "encoding/binary"
  20. "io"
  21. "io/ioutil"
  22. "time"
  23. )
  24. type ReaderOptions struct {
  25. // The ContentTypes accepted by the Reader. May be left unset for no
  26. // content negotiation. If the corresponding Writer offers a disjoint
  27. // set of ContentTypes, NewReader() will return ErrContentTypeMismatch.
  28. ContentTypes [][]byte
  29. // If Bidirectional is true, the underlying io.Reader must be an
  30. // io.ReadWriter, and the Reader will engage in a bidirectional
  31. // handshake with its peer to establish content type and communicate
  32. // shutdown.
  33. Bidirectional bool
  34. // Timeout gives the timeout for reading the initial handshake messages
  35. // from the peer and writing response messages if Bidirectional. It is
  36. // only effective for underlying Readers satisfying net.Conn.
  37. Timeout time.Duration
  38. }
  39. // Reader reads data frames from an underlying io.Reader using the Frame
  40. // Streams framing protocol.
  41. type Reader struct {
  42. contentType []byte
  43. bidirectional bool
  44. r *bufio.Reader
  45. w *bufio.Writer
  46. stopped bool
  47. }
  48. // NewReader creates a Frame Streams Reader reading from the given io.Reader
  49. // with the given ReaderOptions.
  50. func NewReader(r io.Reader, opt *ReaderOptions) (*Reader, error) {
  51. if opt == nil {
  52. opt = &ReaderOptions{}
  53. }
  54. tr := timeoutReader(r, opt)
  55. reader := &Reader{
  56. bidirectional: opt.Bidirectional,
  57. r: bufio.NewReader(tr),
  58. w: nil,
  59. }
  60. if len(opt.ContentTypes) > 0 {
  61. reader.contentType = opt.ContentTypes[0]
  62. }
  63. var cf ControlFrame
  64. if opt.Bidirectional {
  65. w, ok := tr.(io.Writer)
  66. if !ok {
  67. return nil, ErrType
  68. }
  69. reader.w = bufio.NewWriter(w)
  70. // Read the ready control frame.
  71. err := cf.DecodeTypeEscape(reader.r, CONTROL_READY)
  72. if err != nil {
  73. return nil, err
  74. }
  75. // Check content type.
  76. if t, ok := cf.ChooseContentType(opt.ContentTypes); ok {
  77. reader.contentType = t
  78. } else {
  79. return nil, ErrContentTypeMismatch
  80. }
  81. // Send the accept control frame.
  82. accept := ControlAccept
  83. accept.SetContentType(reader.contentType)
  84. err = accept.EncodeFlush(reader.w)
  85. if err != nil {
  86. return nil, err
  87. }
  88. }
  89. // Read the start control frame.
  90. err := cf.DecodeTypeEscape(reader.r, CONTROL_START)
  91. if err != nil {
  92. return nil, err
  93. }
  94. // Disable the read timeout to prevent killing idle connections.
  95. disableReadTimeout(tr)
  96. // Check content type.
  97. if !cf.MatchContentType(reader.contentType) {
  98. return nil, ErrContentTypeMismatch
  99. }
  100. return reader, nil
  101. }
  102. // ReadFrame reads a data frame into the supplied buffer, returning its length.
  103. // If the frame is longer than the supplied buffer, Read returns
  104. // ErrDataFrameTooLarge and discards the frame. Subsequent calls to Read()
  105. // after this error may succeed.
  106. func (r *Reader) ReadFrame(b []byte) (length int, err error) {
  107. if r.stopped {
  108. return 0, EOF
  109. }
  110. for length == 0 {
  111. length, err = r.readFrame(b)
  112. if err != nil {
  113. return
  114. }
  115. }
  116. return
  117. }
  118. // ContentType returns the content type negotiated with the Writer.
  119. func (r *Reader) ContentType() []byte {
  120. return r.contentType
  121. }
  122. func (r *Reader) readFrame(b []byte) (int, error) {
  123. // Read the frame length.
  124. var frameLen uint32
  125. err := binary.Read(r.r, binary.BigEndian, &frameLen)
  126. if err != nil {
  127. return 0, err
  128. }
  129. if frameLen > uint32(len(b)) {
  130. io.CopyN(ioutil.Discard, r.r, int64(frameLen))
  131. return 0, ErrDataFrameTooLarge
  132. }
  133. if frameLen == 0 {
  134. // This is a control frame.
  135. var cf ControlFrame
  136. err = cf.Decode(r.r)
  137. if err != nil {
  138. return 0, err
  139. }
  140. if cf.ControlType == CONTROL_STOP {
  141. r.stopped = true
  142. if r.bidirectional {
  143. ff := &ControlFrame{ControlType: CONTROL_FINISH}
  144. err = ff.EncodeFlush(r.w)
  145. if err != nil {
  146. return 0, err
  147. }
  148. }
  149. return 0, EOF
  150. }
  151. return 0, err
  152. }
  153. return io.ReadFull(r.r, b[0:frameLen])
  154. }