api-select.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. /*
  2. * MinIO Go Library for Amazon S3 Compatible Cloud Storage
  3. * (C) 2018 MinIO, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package s3cli
  18. import (
  19. "bytes"
  20. "context"
  21. "encoding/binary"
  22. "encoding/xml"
  23. "errors"
  24. "fmt"
  25. "hash"
  26. "hash/crc32"
  27. "io"
  28. "net/http"
  29. "net/url"
  30. "strings"
  31. "github.com/minio/minio-go/v6/pkg/encrypt"
  32. "github.com/minio/minio-go/v6/pkg/s3utils"
  33. )
  34. // CSVFileHeaderInfo - is the parameter for whether to utilize headers.
  35. type CSVFileHeaderInfo string
  36. // Constants for file header info.
  37. const (
  38. CSVFileHeaderInfoNone CSVFileHeaderInfo = "NONE"
  39. CSVFileHeaderInfoIgnore = "IGNORE"
  40. CSVFileHeaderInfoUse = "USE"
  41. )
  42. // SelectCompressionType - is the parameter for what type of compression is
  43. // present
  44. type SelectCompressionType string
  45. // Constants for compression types under select API.
  46. const (
  47. SelectCompressionNONE SelectCompressionType = "NONE"
  48. SelectCompressionGZIP = "GZIP"
  49. SelectCompressionBZIP = "BZIP2"
  50. )
  51. // CSVQuoteFields - is the parameter for how CSV fields are quoted.
  52. type CSVQuoteFields string
  53. // Constants for csv quote styles.
  54. const (
  55. CSVQuoteFieldsAlways CSVQuoteFields = "Always"
  56. CSVQuoteFieldsAsNeeded = "AsNeeded"
  57. )
  58. // QueryExpressionType - is of what syntax the expression is, this should only
  59. // be SQL
  60. type QueryExpressionType string
  61. // Constants for expression type.
  62. const (
  63. QueryExpressionTypeSQL QueryExpressionType = "SQL"
  64. )
  65. // JSONType determines json input serialization type.
  66. type JSONType string
  67. // Constants for JSONTypes.
  68. const (
  69. JSONDocumentType JSONType = "DOCUMENT"
  70. JSONLinesType = "LINES"
  71. )
  72. // ParquetInputOptions parquet input specific options
  73. type ParquetInputOptions struct{}
  74. // CSVInputOptions csv input specific options
  75. type CSVInputOptions struct {
  76. FileHeaderInfo CSVFileHeaderInfo
  77. RecordDelimiter string
  78. FieldDelimiter string `xml:",omitempty"`
  79. QuoteCharacter string `xml:",omitempty"`
  80. QuoteEscapeCharacter string `xml:",omitempty"`
  81. Comments string `xml:",omitempty"`
  82. }
  83. // CSVOutputOptions csv output specific options
  84. type CSVOutputOptions struct {
  85. QuoteFields CSVQuoteFields `xml:",omitempty"`
  86. RecordDelimiter string
  87. FieldDelimiter string `xml:",omitempty"`
  88. QuoteCharacter string `xml:",omitempty"`
  89. QuoteEscapeCharacter string `xml:",omitempty"`
  90. }
  91. // JSONInputOptions json input specific options
  92. type JSONInputOptions struct {
  93. Type JSONType
  94. }
  95. // JSONOutputOptions - json output specific options
  96. type JSONOutputOptions struct {
  97. RecordDelimiter string
  98. }
  99. // SelectObjectInputSerialization - input serialization parameters
  100. type SelectObjectInputSerialization struct {
  101. CompressionType SelectCompressionType
  102. Parquet *ParquetInputOptions `xml:"Parquet,omitempty"`
  103. CSV *CSVInputOptions `xml:"CSV,omitempty"`
  104. JSON *JSONInputOptions `xml:"JSON,omitempty"`
  105. }
  106. // SelectObjectOutputSerialization - output serialization parameters.
  107. type SelectObjectOutputSerialization struct {
  108. CSV *CSVOutputOptions `xml:"CSV,omitempty"`
  109. JSON *JSONOutputOptions `xml:"JSON,omitempty"`
  110. }
  111. // SelectObjectOptions - represents the input select body
  112. type SelectObjectOptions struct {
  113. XMLName xml.Name `xml:"SelectObjectContentRequest" json:"-"`
  114. ServerSideEncryption encrypt.ServerSide `xml:"-"`
  115. Expression string
  116. ExpressionType QueryExpressionType
  117. InputSerialization SelectObjectInputSerialization
  118. OutputSerialization SelectObjectOutputSerialization
  119. RequestProgress struct {
  120. Enabled bool
  121. }
  122. }
  123. // Header returns the http.Header representation of the SelectObject options.
  124. func (o SelectObjectOptions) Header() http.Header {
  125. headers := make(http.Header)
  126. if o.ServerSideEncryption != nil && o.ServerSideEncryption.Type() == encrypt.SSEC {
  127. o.ServerSideEncryption.Marshal(headers)
  128. }
  129. return headers
  130. }
  131. // SelectObjectType - is the parameter which defines what type of object the
  132. // operation is being performed on.
  133. type SelectObjectType string
  134. // Constants for input data types.
  135. const (
  136. SelectObjectTypeCSV SelectObjectType = "CSV"
  137. SelectObjectTypeJSON = "JSON"
  138. SelectObjectTypeParquet = "Parquet"
  139. )
  140. // preludeInfo is used for keeping track of necessary information from the
  141. // prelude.
  142. type preludeInfo struct {
  143. totalLen uint32
  144. headerLen uint32
  145. }
  146. // SelectResults is used for the streaming responses from the server.
  147. type SelectResults struct {
  148. pipeReader *io.PipeReader
  149. resp *http.Response
  150. stats *StatsMessage
  151. progress *ProgressMessage
  152. }
  153. // ProgressMessage is a struct for progress xml message.
  154. type ProgressMessage struct {
  155. XMLName xml.Name `xml:"Progress" json:"-"`
  156. StatsMessage
  157. }
  158. // StatsMessage is a struct for stat xml message.
  159. type StatsMessage struct {
  160. XMLName xml.Name `xml:"Stats" json:"-"`
  161. BytesScanned int64
  162. BytesProcessed int64
  163. BytesReturned int64
  164. }
  165. // messageType represents the type of message.
  166. type messageType string
  167. const (
  168. errorMsg messageType = "error"
  169. commonMsg = "event"
  170. )
  171. // eventType represents the type of event.
  172. type eventType string
  173. // list of event-types returned by Select API.
  174. const (
  175. endEvent eventType = "End"
  176. recordsEvent = "Records"
  177. progressEvent = "Progress"
  178. statsEvent = "Stats"
  179. )
  180. // contentType represents content type of event.
  181. type contentType string
  182. const (
  183. xmlContent contentType = "text/xml"
  184. )
  185. // SelectObjectContent is a implementation of http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html AWS S3 API.
  186. func (c Client) SelectObjectContent(ctx context.Context, bucketName, objectName string, opts SelectObjectOptions) (*SelectResults, error) {
  187. // Input validation.
  188. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  189. return nil, err
  190. }
  191. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  192. return nil, err
  193. }
  194. selectReqBytes, err := xml.Marshal(opts)
  195. if err != nil {
  196. return nil, err
  197. }
  198. urlValues := make(url.Values)
  199. urlValues.Set("select", "")
  200. urlValues.Set("select-type", "2")
  201. // Execute POST on bucket/object.
  202. resp, err := c.executeMethod(ctx, "POST", requestMetadata{
  203. bucketName: bucketName,
  204. objectName: objectName,
  205. queryValues: urlValues,
  206. customHeader: opts.Header(),
  207. contentMD5Base64: sumMD5Base64(selectReqBytes),
  208. contentSHA256Hex: sum256Hex(selectReqBytes),
  209. contentBody: bytes.NewReader(selectReqBytes),
  210. contentLength: int64(len(selectReqBytes)),
  211. })
  212. if err != nil {
  213. return nil, err
  214. }
  215. if resp.StatusCode != http.StatusOK {
  216. return nil, httpRespToErrorResponse(resp, bucketName, "")
  217. }
  218. pipeReader, pipeWriter := io.Pipe()
  219. streamer := &SelectResults{
  220. resp: resp,
  221. stats: &StatsMessage{},
  222. progress: &ProgressMessage{},
  223. pipeReader: pipeReader,
  224. }
  225. streamer.start(pipeWriter)
  226. return streamer, nil
  227. }
  228. // Close - closes the underlying response body and the stream reader.
  229. func (s *SelectResults) Close() error {
  230. defer closeResponse(s.resp)
  231. return s.pipeReader.Close()
  232. }
  233. // Read - is a reader compatible implementation for SelectObjectContent records.
  234. func (s *SelectResults) Read(b []byte) (n int, err error) {
  235. return s.pipeReader.Read(b)
  236. }
  237. // Stats - information about a request's stats when processing is complete.
  238. func (s *SelectResults) Stats() *StatsMessage {
  239. return s.stats
  240. }
  241. // Progress - information about the progress of a request.
  242. func (s *SelectResults) Progress() *ProgressMessage {
  243. return s.progress
  244. }
  245. // start is the main function that decodes the large byte array into
  246. // several events that are sent through the eventstream.
  247. func (s *SelectResults) start(pipeWriter *io.PipeWriter) {
  248. go func() {
  249. for {
  250. var prelude preludeInfo
  251. var headers = make(http.Header)
  252. var err error
  253. // Create CRC code
  254. crc := crc32.New(crc32.IEEETable)
  255. crcReader := io.TeeReader(s.resp.Body, crc)
  256. // Extract the prelude(12 bytes) into a struct to extract relevant information.
  257. prelude, err = processPrelude(crcReader, crc)
  258. if err != nil {
  259. pipeWriter.CloseWithError(err)
  260. closeResponse(s.resp)
  261. return
  262. }
  263. // Extract the headers(variable bytes) into a struct to extract relevant information
  264. if prelude.headerLen > 0 {
  265. if err = extractHeader(io.LimitReader(crcReader, int64(prelude.headerLen)), headers); err != nil {
  266. pipeWriter.CloseWithError(err)
  267. closeResponse(s.resp)
  268. return
  269. }
  270. }
  271. // Get the actual payload length so that the appropriate amount of
  272. // bytes can be read or parsed.
  273. payloadLen := prelude.PayloadLen()
  274. m := messageType(headers.Get("message-type"))
  275. switch m {
  276. case errorMsg:
  277. pipeWriter.CloseWithError(errors.New(headers.Get("error-code") + ":\"" + headers.Get("error-message") + "\""))
  278. closeResponse(s.resp)
  279. return
  280. case commonMsg:
  281. // Get content-type of the payload.
  282. c := contentType(headers.Get("content-type"))
  283. // Get event type of the payload.
  284. e := eventType(headers.Get("event-type"))
  285. // Handle all supported events.
  286. switch e {
  287. case endEvent:
  288. pipeWriter.Close()
  289. closeResponse(s.resp)
  290. return
  291. case recordsEvent:
  292. if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil {
  293. pipeWriter.CloseWithError(err)
  294. closeResponse(s.resp)
  295. return
  296. }
  297. case progressEvent:
  298. switch c {
  299. case xmlContent:
  300. if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil {
  301. pipeWriter.CloseWithError(err)
  302. closeResponse(s.resp)
  303. return
  304. }
  305. default:
  306. pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent))
  307. closeResponse(s.resp)
  308. return
  309. }
  310. case statsEvent:
  311. switch c {
  312. case xmlContent:
  313. if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil {
  314. pipeWriter.CloseWithError(err)
  315. closeResponse(s.resp)
  316. return
  317. }
  318. default:
  319. pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent))
  320. closeResponse(s.resp)
  321. return
  322. }
  323. }
  324. }
  325. // Ensures that the full message's CRC is correct and
  326. // that the message is not corrupted
  327. if err := checkCRC(s.resp.Body, crc.Sum32()); err != nil {
  328. pipeWriter.CloseWithError(err)
  329. closeResponse(s.resp)
  330. return
  331. }
  332. }
  333. }()
  334. }
  335. // PayloadLen is a function that calculates the length of the payload.
  336. func (p preludeInfo) PayloadLen() int64 {
  337. return int64(p.totalLen - p.headerLen - 16)
  338. }
  339. // processPrelude is the function that reads the 12 bytes of the prelude and
  340. // ensures the CRC is correct while also extracting relevant information into
  341. // the struct,
  342. func processPrelude(prelude io.Reader, crc hash.Hash32) (preludeInfo, error) {
  343. var err error
  344. var pInfo = preludeInfo{}
  345. // reads total length of the message (first 4 bytes)
  346. pInfo.totalLen, err = extractUint32(prelude)
  347. if err != nil {
  348. return pInfo, err
  349. }
  350. // reads total header length of the message (2nd 4 bytes)
  351. pInfo.headerLen, err = extractUint32(prelude)
  352. if err != nil {
  353. return pInfo, err
  354. }
  355. // checks that the CRC is correct (3rd 4 bytes)
  356. preCRC := crc.Sum32()
  357. if err := checkCRC(prelude, preCRC); err != nil {
  358. return pInfo, err
  359. }
  360. return pInfo, nil
  361. }
  362. // extracts the relevant information from the Headers.
  363. func extractHeader(body io.Reader, myHeaders http.Header) error {
  364. for {
  365. // extracts the first part of the header,
  366. headerTypeName, err := extractHeaderType(body)
  367. if err != nil {
  368. // Since end of file, we have read all of our headers
  369. if err == io.EOF {
  370. break
  371. }
  372. return err
  373. }
  374. // reads the 7 present in the header and ignores it.
  375. extractUint8(body)
  376. headerValueName, err := extractHeaderValue(body)
  377. if err != nil {
  378. return err
  379. }
  380. myHeaders.Set(headerTypeName, headerValueName)
  381. }
  382. return nil
  383. }
  384. // extractHeaderType extracts the first half of the header message, the header type.
  385. func extractHeaderType(body io.Reader) (string, error) {
  386. // extracts 2 bit integer
  387. headerNameLen, err := extractUint8(body)
  388. if err != nil {
  389. return "", err
  390. }
  391. // extracts the string with the appropriate number of bytes
  392. headerName, err := extractString(body, int(headerNameLen))
  393. if err != nil {
  394. return "", err
  395. }
  396. return strings.TrimPrefix(headerName, ":"), nil
  397. }
  398. // extractsHeaderValue extracts the second half of the header message, the
  399. // header value
  400. func extractHeaderValue(body io.Reader) (string, error) {
  401. bodyLen, err := extractUint16(body)
  402. if err != nil {
  403. return "", err
  404. }
  405. bodyName, err := extractString(body, int(bodyLen))
  406. if err != nil {
  407. return "", err
  408. }
  409. return bodyName, nil
  410. }
  411. // extracts a string from byte array of a particular number of bytes.
  412. func extractString(source io.Reader, lenBytes int) (string, error) {
  413. myVal := make([]byte, lenBytes)
  414. _, err := source.Read(myVal)
  415. if err != nil {
  416. return "", err
  417. }
  418. return string(myVal), nil
  419. }
  420. // extractUint32 extracts a 4 byte integer from the byte array.
  421. func extractUint32(r io.Reader) (uint32, error) {
  422. buf := make([]byte, 4)
  423. _, err := io.ReadFull(r, buf)
  424. if err != nil {
  425. return 0, err
  426. }
  427. return binary.BigEndian.Uint32(buf), nil
  428. }
  429. // extractUint16 extracts a 2 byte integer from the byte array.
  430. func extractUint16(r io.Reader) (uint16, error) {
  431. buf := make([]byte, 2)
  432. _, err := io.ReadFull(r, buf)
  433. if err != nil {
  434. return 0, err
  435. }
  436. return binary.BigEndian.Uint16(buf), nil
  437. }
  438. // extractUint8 extracts a 1 byte integer from the byte array.
  439. func extractUint8(r io.Reader) (uint8, error) {
  440. buf := make([]byte, 1)
  441. _, err := io.ReadFull(r, buf)
  442. if err != nil {
  443. return 0, err
  444. }
  445. return buf[0], nil
  446. }
  447. // checkCRC ensures that the CRC matches with the one from the reader.
  448. func checkCRC(r io.Reader, expect uint32) error {
  449. msgCRC, err := extractUint32(r)
  450. if err != nil {
  451. return err
  452. }
  453. if msgCRC != expect {
  454. return fmt.Errorf("Checksum Mismatch, MessageCRC of 0x%X does not equal expected CRC of 0x%X", msgCRC, expect)
  455. }
  456. return nil
  457. }