eventstream.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. // Code generated by smithy-go-codegen DO NOT EDIT.
  2. package s3
  3. import (
  4. "context"
  5. "fmt"
  6. "github.com/aws/aws-sdk-go-v2/aws"
  7. "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream"
  8. "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream/eventstreamapi"
  9. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  10. smithy "github.com/aws/smithy-go"
  11. "github.com/aws/smithy-go/middleware"
  12. smithysync "github.com/aws/smithy-go/sync"
  13. smithyhttp "github.com/aws/smithy-go/transport/http"
  14. "io"
  15. "io/ioutil"
  16. "sync"
  17. )
  18. // SelectObjectContentEventStreamReader provides the interface for reading events
  19. // from a stream.
  20. //
  21. // The writer's Close method must allow multiple concurrent calls.
  22. type SelectObjectContentEventStreamReader interface {
  23. Events() <-chan types.SelectObjectContentEventStream
  24. Close() error
  25. Err() error
  26. }
  27. type selectObjectContentEventStreamReader struct {
  28. stream chan types.SelectObjectContentEventStream
  29. decoder *eventstream.Decoder
  30. eventStream io.ReadCloser
  31. err *smithysync.OnceErr
  32. payloadBuf []byte
  33. done chan struct{}
  34. closeOnce sync.Once
  35. }
  36. func newSelectObjectContentEventStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder) *selectObjectContentEventStreamReader {
  37. w := &selectObjectContentEventStreamReader{
  38. stream: make(chan types.SelectObjectContentEventStream),
  39. decoder: decoder,
  40. eventStream: readCloser,
  41. err: smithysync.NewOnceErr(),
  42. done: make(chan struct{}),
  43. payloadBuf: make([]byte, 10*1024),
  44. }
  45. go w.readEventStream()
  46. return w
  47. }
  48. func (r *selectObjectContentEventStreamReader) Events() <-chan types.SelectObjectContentEventStream {
  49. return r.stream
  50. }
  51. func (r *selectObjectContentEventStreamReader) readEventStream() {
  52. defer r.Close()
  53. defer close(r.stream)
  54. for {
  55. r.payloadBuf = r.payloadBuf[0:0]
  56. decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf)
  57. if err != nil {
  58. if err == io.EOF {
  59. return
  60. }
  61. select {
  62. case <-r.done:
  63. return
  64. default:
  65. r.err.SetError(err)
  66. return
  67. }
  68. }
  69. event, err := r.deserializeEventMessage(&decodedMessage)
  70. if err != nil {
  71. r.err.SetError(err)
  72. return
  73. }
  74. select {
  75. case r.stream <- event:
  76. case <-r.done:
  77. return
  78. }
  79. }
  80. }
  81. func (r *selectObjectContentEventStreamReader) deserializeEventMessage(msg *eventstream.Message) (types.SelectObjectContentEventStream, error) {
  82. messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader)
  83. if messageType == nil {
  84. return nil, fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader)
  85. }
  86. switch messageType.String() {
  87. case eventstreamapi.EventMessageType:
  88. var v types.SelectObjectContentEventStream
  89. if err := awsRestxml_deserializeEventStreamSelectObjectContentEventStream(&v, msg); err != nil {
  90. return nil, err
  91. }
  92. return v, nil
  93. case eventstreamapi.ExceptionMessageType:
  94. return nil, awsRestxml_deserializeEventStreamExceptionSelectObjectContentEventStream(msg)
  95. case eventstreamapi.ErrorMessageType:
  96. errorCode := "UnknownError"
  97. errorMessage := errorCode
  98. if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil {
  99. errorCode = header.String()
  100. }
  101. if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil {
  102. errorMessage = header.String()
  103. }
  104. return nil, &smithy.GenericAPIError{
  105. Code: errorCode,
  106. Message: errorMessage,
  107. }
  108. default:
  109. mc := msg.Clone()
  110. return nil, &UnknownEventMessageError{
  111. Type: messageType.String(),
  112. Message: &mc,
  113. }
  114. }
  115. }
  116. func (r *selectObjectContentEventStreamReader) ErrorSet() <-chan struct{} {
  117. return r.err.ErrorSet()
  118. }
  119. func (r *selectObjectContentEventStreamReader) Close() error {
  120. r.closeOnce.Do(r.safeClose)
  121. return r.Err()
  122. }
  123. func (r *selectObjectContentEventStreamReader) safeClose() {
  124. close(r.done)
  125. r.eventStream.Close()
  126. }
  127. func (r *selectObjectContentEventStreamReader) Err() error {
  128. return r.err.Err()
  129. }
  130. func (r *selectObjectContentEventStreamReader) Closed() <-chan struct{} {
  131. return r.done
  132. }
  133. type awsRestxml_deserializeOpEventStreamSelectObjectContent struct {
  134. LogEventStreamWrites bool
  135. LogEventStreamReads bool
  136. }
  137. func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) ID() string {
  138. return "OperationEventStreamDeserializer"
  139. }
  140. func (m *awsRestxml_deserializeOpEventStreamSelectObjectContent) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) (
  141. out middleware.DeserializeOutput, metadata middleware.Metadata, err error,
  142. ) {
  143. defer func() {
  144. if err == nil {
  145. return
  146. }
  147. m.closeResponseBody(out)
  148. }()
  149. logger := middleware.GetLogger(ctx)
  150. request, ok := in.Request.(*smithyhttp.Request)
  151. if !ok {
  152. return out, metadata, fmt.Errorf("unknown transport type: %T", in.Request)
  153. }
  154. _ = request
  155. out, metadata, err = next.HandleDeserialize(ctx, in)
  156. if err != nil {
  157. return out, metadata, err
  158. }
  159. deserializeOutput, ok := out.RawResponse.(*smithyhttp.Response)
  160. if !ok {
  161. return out, metadata, fmt.Errorf("unknown transport type: %T", out.RawResponse)
  162. }
  163. _ = deserializeOutput
  164. output, ok := out.Result.(*SelectObjectContentOutput)
  165. if out.Result != nil && !ok {
  166. return out, metadata, fmt.Errorf("unexpected output result type: %T", out.Result)
  167. } else if out.Result == nil {
  168. output = &SelectObjectContentOutput{}
  169. out.Result = output
  170. }
  171. eventReader := newSelectObjectContentEventStreamReader(
  172. deserializeOutput.Body,
  173. eventstream.NewDecoder(func(options *eventstream.DecoderOptions) {
  174. options.Logger = logger
  175. options.LogMessages = m.LogEventStreamReads
  176. }),
  177. )
  178. defer func() {
  179. if err == nil {
  180. return
  181. }
  182. _ = eventReader.Close()
  183. }()
  184. output.eventStream = NewSelectObjectContentEventStream(func(stream *SelectObjectContentEventStream) {
  185. stream.Reader = eventReader
  186. })
  187. go output.eventStream.waitStreamClose()
  188. return out, metadata, nil
  189. }
  190. func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) closeResponseBody(out middleware.DeserializeOutput) {
  191. if resp, ok := out.RawResponse.(*smithyhttp.Response); ok && resp != nil && resp.Body != nil {
  192. _, _ = io.Copy(ioutil.Discard, resp.Body)
  193. _ = resp.Body.Close()
  194. }
  195. }
  196. func addEventStreamSelectObjectContentMiddleware(stack *middleware.Stack, options Options) error {
  197. if err := stack.Deserialize.Insert(&awsRestxml_deserializeOpEventStreamSelectObjectContent{
  198. LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(),
  199. LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(),
  200. }, "OperationDeserializer", middleware.Before); err != nil {
  201. return err
  202. }
  203. return nil
  204. }
  205. // UnknownEventMessageError provides an error when a message is received from the stream,
  206. // but the reader is unable to determine what kind of message it is.
  207. type UnknownEventMessageError struct {
  208. Type string
  209. Message *eventstream.Message
  210. }
  211. // Error retruns the error message string.
  212. func (e *UnknownEventMessageError) Error() string {
  213. return "unknown event stream message type, " + e.Type
  214. }
  215. func setSafeEventStreamClientLogMode(o *Options, operation string) {
  216. switch operation {
  217. case "SelectObjectContent":
  218. toggleEventStreamClientLogMode(o, false, true)
  219. return
  220. default:
  221. return
  222. }
  223. }
  224. func toggleEventStreamClientLogMode(o *Options, request, response bool) {
  225. mode := o.ClientLogMode
  226. if request && mode.IsRequestWithBody() {
  227. mode.ClearRequestWithBody()
  228. mode |= aws.LogRequest
  229. }
  230. if response && mode.IsResponseWithBody() {
  231. mode.ClearResponseWithBody()
  232. mode |= aws.LogResponse
  233. }
  234. o.ClientLogMode = mode
  235. }