api-get-object.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. /*
  2. * MinIO Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2015-2017 MinIO, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package s3cli
  18. import (
  19. "context"
  20. "errors"
  21. "fmt"
  22. "io"
  23. "net/http"
  24. "strings"
  25. "sync"
  26. "time"
  27. "github.com/minio/minio-go/v6/pkg/s3utils"
  28. )
  29. // GetObject - returns an seekable, readable object.
  30. func (c Client) GetObject(bucketName, objectName string, opts GetObjectOptions) (*Object, error) {
  31. return c.getObjectWithContext(context.Background(), bucketName, objectName, opts)
  32. }
  33. // GetObject wrapper function that accepts a request context
  34. func (c Client) getObjectWithContext(ctx context.Context, bucketName, objectName string, opts GetObjectOptions) (*Object, error) {
  35. // Input validation.
  36. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  37. return nil, err
  38. }
  39. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  40. return nil, err
  41. }
  42. var httpReader io.ReadCloser
  43. var objectInfo ObjectInfo
  44. var err error
  45. // Create request channel.
  46. reqCh := make(chan getRequest)
  47. // Create response channel.
  48. resCh := make(chan getResponse)
  49. // Create done channel.
  50. doneCh := make(chan struct{})
  51. // This routine feeds partial object data as and when the caller reads.
  52. go func() {
  53. defer close(reqCh)
  54. defer close(resCh)
  55. // Used to verify if etag of object has changed since last read.
  56. var etag string
  57. // Loop through the incoming control messages and read data.
  58. for {
  59. select {
  60. // When the done channel is closed exit our routine.
  61. case <-doneCh:
  62. // Close the http response body before returning.
  63. // This ends the connection with the server.
  64. if httpReader != nil {
  65. httpReader.Close()
  66. }
  67. return
  68. // Gather incoming request.
  69. case req := <-reqCh:
  70. // If this is the first request we may not need to do a getObject request yet.
  71. if req.isFirstReq {
  72. // First request is a Read/ReadAt.
  73. if req.isReadOp {
  74. // Differentiate between wanting the whole object and just a range.
  75. if req.isReadAt {
  76. // If this is a ReadAt request only get the specified range.
  77. // Range is set with respect to the offset and length of the buffer requested.
  78. // Do not set objectInfo from the first readAt request because it will not get
  79. // the whole object.
  80. opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
  81. } else if req.Offset > 0 {
  82. opts.SetRange(req.Offset, 0)
  83. }
  84. httpReader, objectInfo, err = c.getObject(ctx, bucketName, objectName, opts)
  85. if err != nil {
  86. resCh <- getResponse{Error: err}
  87. return
  88. }
  89. etag = objectInfo.ETag
  90. // Read at least firstReq.Buffer bytes, if not we have
  91. // reached our EOF.
  92. size, err := io.ReadFull(httpReader, req.Buffer)
  93. if size > 0 && err == io.ErrUnexpectedEOF {
  94. // If an EOF happens after reading some but not
  95. // all the bytes ReadFull returns ErrUnexpectedEOF
  96. err = io.EOF
  97. }
  98. // Send back the first response.
  99. resCh <- getResponse{
  100. objectInfo: objectInfo,
  101. Size: int(size),
  102. Error: err,
  103. didRead: true,
  104. }
  105. } else {
  106. // First request is a Stat or Seek call.
  107. // Only need to run a StatObject until an actual Read or ReadAt request comes through.
  108. // Remove range header if already set, for stat Operations to get original file size.
  109. delete(opts.headers, "Range")
  110. objectInfo, err = c.statObject(ctx, bucketName, objectName, StatObjectOptions{opts})
  111. if err != nil {
  112. resCh <- getResponse{
  113. Error: err,
  114. }
  115. // Exit the go-routine.
  116. return
  117. }
  118. etag = objectInfo.ETag
  119. // Send back the first response.
  120. resCh <- getResponse{
  121. objectInfo: objectInfo,
  122. }
  123. }
  124. } else if req.settingObjectInfo { // Request is just to get objectInfo.
  125. // Remove range header if already set, for stat Operations to get original file size.
  126. delete(opts.headers, "Range")
  127. if etag != "" {
  128. opts.SetMatchETag(etag)
  129. }
  130. objectInfo, err := c.statObject(ctx, bucketName, objectName, StatObjectOptions{opts})
  131. if err != nil {
  132. resCh <- getResponse{
  133. Error: err,
  134. }
  135. // Exit the goroutine.
  136. return
  137. }
  138. // Send back the objectInfo.
  139. resCh <- getResponse{
  140. objectInfo: objectInfo,
  141. }
  142. } else {
  143. // Offset changes fetch the new object at an Offset.
  144. // Because the httpReader may not be set by the first
  145. // request if it was a stat or seek it must be checked
  146. // if the object has been read or not to only initialize
  147. // new ones when they haven't been already.
  148. // All readAt requests are new requests.
  149. if req.DidOffsetChange || !req.beenRead {
  150. if etag != "" {
  151. opts.SetMatchETag(etag)
  152. }
  153. if httpReader != nil {
  154. // Close previously opened http reader.
  155. httpReader.Close()
  156. }
  157. // If this request is a readAt only get the specified range.
  158. if req.isReadAt {
  159. // Range is set with respect to the offset and length of the buffer requested.
  160. opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
  161. } else if req.Offset > 0 { // Range is set with respect to the offset.
  162. opts.SetRange(req.Offset, 0)
  163. }
  164. httpReader, objectInfo, err = c.getObject(ctx, bucketName, objectName, opts)
  165. if err != nil {
  166. resCh <- getResponse{
  167. Error: err,
  168. }
  169. return
  170. }
  171. }
  172. // Read at least req.Buffer bytes, if not we have
  173. // reached our EOF.
  174. size, err := io.ReadFull(httpReader, req.Buffer)
  175. if err == io.ErrUnexpectedEOF {
  176. // If an EOF happens after reading some but not
  177. // all the bytes ReadFull returns ErrUnexpectedEOF
  178. err = io.EOF
  179. }
  180. // Reply back how much was read.
  181. resCh <- getResponse{
  182. Size: int(size),
  183. Error: err,
  184. didRead: true,
  185. objectInfo: objectInfo,
  186. }
  187. }
  188. }
  189. }
  190. }()
  191. // Create a newObject through the information sent back by reqCh.
  192. return newObject(reqCh, resCh, doneCh), nil
  193. }
  194. // get request message container to communicate with internal
  195. // go-routine.
  196. type getRequest struct {
  197. Buffer []byte
  198. Offset int64 // readAt offset.
  199. DidOffsetChange bool // Tracks the offset changes for Seek requests.
  200. beenRead bool // Determines if this is the first time an object is being read.
  201. isReadAt bool // Determines if this request is a request to a specific range
  202. isReadOp bool // Determines if this request is a Read or Read/At request.
  203. isFirstReq bool // Determines if this request is the first time an object is being accessed.
  204. settingObjectInfo bool // Determines if this request is to set the objectInfo of an object.
  205. }
  206. // get response message container to reply back for the request.
  207. type getResponse struct {
  208. Size int
  209. Error error
  210. didRead bool // Lets subsequent calls know whether or not httpReader has been initiated.
  211. objectInfo ObjectInfo // Used for the first request.
  212. }
  213. // Object represents an open object. It implements
  214. // Reader, ReaderAt, Seeker, Closer for a HTTP stream.
  215. type Object struct {
  216. // Mutex.
  217. mutex *sync.Mutex
  218. // User allocated and defined.
  219. reqCh chan<- getRequest
  220. resCh <-chan getResponse
  221. doneCh chan<- struct{}
  222. currOffset int64
  223. objectInfo ObjectInfo
  224. // Ask lower level to initiate data fetching based on currOffset
  225. seekData bool
  226. // Keeps track of closed call.
  227. isClosed bool
  228. // Keeps track of if this is the first call.
  229. isStarted bool
  230. // Previous error saved for future calls.
  231. prevErr error
  232. // Keeps track of if this object has been read yet.
  233. beenRead bool
  234. // Keeps track of if objectInfo has been set yet.
  235. objectInfoSet bool
  236. }
  237. // doGetRequest - sends and blocks on the firstReqCh and reqCh of an object.
  238. // Returns back the size of the buffer read, if anything was read, as well
  239. // as any error encountered. For all first requests sent on the object
  240. // it is also responsible for sending back the objectInfo.
  241. func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
  242. o.reqCh <- request
  243. response := <-o.resCh
  244. // Return any error to the top level.
  245. if response.Error != nil {
  246. return response, response.Error
  247. }
  248. // This was the first request.
  249. if !o.isStarted {
  250. // The object has been operated on.
  251. o.isStarted = true
  252. }
  253. // Set the objectInfo if the request was not readAt
  254. // and it hasn't been set before.
  255. if !o.objectInfoSet && !request.isReadAt {
  256. o.objectInfo = response.objectInfo
  257. o.objectInfoSet = true
  258. }
  259. // Set beenRead only if it has not been set before.
  260. if !o.beenRead {
  261. o.beenRead = response.didRead
  262. }
  263. // Data are ready on the wire, no need to reinitiate connection in lower level
  264. o.seekData = false
  265. return response, nil
  266. }
  267. // setOffset - handles the setting of offsets for
  268. // Read/ReadAt/Seek requests.
  269. func (o *Object) setOffset(bytesRead int64) error {
  270. // Update the currentOffset.
  271. o.currOffset += bytesRead
  272. if o.objectInfo.Size > -1 && o.currOffset >= o.objectInfo.Size {
  273. return io.EOF
  274. }
  275. return nil
  276. }
  277. // Read reads up to len(b) bytes into b. It returns the number of
  278. // bytes read (0 <= n <= len(b)) and any error encountered. Returns
  279. // io.EOF upon end of file.
  280. func (o *Object) Read(b []byte) (n int, err error) {
  281. if o == nil {
  282. return 0, ErrInvalidArgument("Object is nil")
  283. }
  284. // Locking.
  285. o.mutex.Lock()
  286. defer o.mutex.Unlock()
  287. // prevErr is previous error saved from previous operation.
  288. if o.prevErr != nil || o.isClosed {
  289. return 0, o.prevErr
  290. }
  291. // Create a new request.
  292. readReq := getRequest{
  293. isReadOp: true,
  294. beenRead: o.beenRead,
  295. Buffer: b,
  296. }
  297. // Alert that this is the first request.
  298. if !o.isStarted {
  299. readReq.isFirstReq = true
  300. }
  301. // Ask to establish a new data fetch routine based on seekData flag
  302. readReq.DidOffsetChange = o.seekData
  303. readReq.Offset = o.currOffset
  304. // Send and receive from the first request.
  305. response, err := o.doGetRequest(readReq)
  306. if err != nil && err != io.EOF {
  307. // Save the error for future calls.
  308. o.prevErr = err
  309. return response.Size, err
  310. }
  311. // Bytes read.
  312. bytesRead := int64(response.Size)
  313. // Set the new offset.
  314. oerr := o.setOffset(bytesRead)
  315. if oerr != nil {
  316. // Save the error for future calls.
  317. o.prevErr = oerr
  318. return response.Size, oerr
  319. }
  320. // Return the response.
  321. return response.Size, err
  322. }
  323. // Stat returns the ObjectInfo structure describing Object.
  324. func (o *Object) Stat() (ObjectInfo, error) {
  325. if o == nil {
  326. return ObjectInfo{}, ErrInvalidArgument("Object is nil")
  327. }
  328. // Locking.
  329. o.mutex.Lock()
  330. defer o.mutex.Unlock()
  331. if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed {
  332. return ObjectInfo{}, o.prevErr
  333. }
  334. // This is the first request.
  335. if !o.isStarted || !o.objectInfoSet {
  336. // Send the request and get the response.
  337. _, err := o.doGetRequest(getRequest{
  338. isFirstReq: !o.isStarted,
  339. settingObjectInfo: !o.objectInfoSet,
  340. })
  341. if err != nil {
  342. o.prevErr = err
  343. return ObjectInfo{}, err
  344. }
  345. }
  346. return o.objectInfo, nil
  347. }
  348. // ReadAt reads len(b) bytes from the File starting at byte offset
  349. // off. It returns the number of bytes read and the error, if any.
  350. // ReadAt always returns a non-nil error when n < len(b). At end of
  351. // file, that error is io.EOF.
  352. func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
  353. if o == nil {
  354. return 0, ErrInvalidArgument("Object is nil")
  355. }
  356. // Locking.
  357. o.mutex.Lock()
  358. defer o.mutex.Unlock()
  359. // prevErr is error which was saved in previous operation.
  360. if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed {
  361. return 0, o.prevErr
  362. }
  363. // Set the current offset to ReadAt offset, because the current offset will be shifted at the end of this method.
  364. o.currOffset = offset
  365. // Can only compare offsets to size when size has been set.
  366. if o.objectInfoSet {
  367. // If offset is negative than we return io.EOF.
  368. // If offset is greater than or equal to object size we return io.EOF.
  369. if (o.objectInfo.Size > -1 && offset >= o.objectInfo.Size) || offset < 0 {
  370. return 0, io.EOF
  371. }
  372. }
  373. // Create the new readAt request.
  374. readAtReq := getRequest{
  375. isReadOp: true,
  376. isReadAt: true,
  377. DidOffsetChange: true, // Offset always changes.
  378. beenRead: o.beenRead, // Set if this is the first request to try and read.
  379. Offset: offset, // Set the offset.
  380. Buffer: b,
  381. }
  382. // Alert that this is the first request.
  383. if !o.isStarted {
  384. readAtReq.isFirstReq = true
  385. }
  386. // Send and receive from the first request.
  387. response, err := o.doGetRequest(readAtReq)
  388. if err != nil && err != io.EOF {
  389. // Save the error.
  390. o.prevErr = err
  391. return response.Size, err
  392. }
  393. // Bytes read.
  394. bytesRead := int64(response.Size)
  395. // There is no valid objectInfo yet
  396. // to compare against for EOF.
  397. if !o.objectInfoSet {
  398. // Update the currentOffset.
  399. o.currOffset += bytesRead
  400. } else {
  401. // If this was not the first request update
  402. // the offsets and compare against objectInfo
  403. // for EOF.
  404. oerr := o.setOffset(bytesRead)
  405. if oerr != nil {
  406. o.prevErr = oerr
  407. return response.Size, oerr
  408. }
  409. }
  410. return response.Size, err
  411. }
  412. // Seek sets the offset for the next Read or Write to offset,
  413. // interpreted according to whence: 0 means relative to the
  414. // origin of the file, 1 means relative to the current offset,
  415. // and 2 means relative to the end.
  416. // Seek returns the new offset and an error, if any.
  417. //
  418. // Seeking to a negative offset is an error. Seeking to any positive
  419. // offset is legal, subsequent io operations succeed until the
  420. // underlying object is not closed.
  421. func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
  422. if o == nil {
  423. return 0, ErrInvalidArgument("Object is nil")
  424. }
  425. // Locking.
  426. o.mutex.Lock()
  427. defer o.mutex.Unlock()
  428. // At EOF seeking is legal allow only io.EOF, for any other errors we return.
  429. if o.prevErr != nil && o.prevErr != io.EOF {
  430. return 0, o.prevErr
  431. }
  432. // Negative offset is valid for whence of '2'.
  433. if offset < 0 && whence != 2 {
  434. return 0, ErrInvalidArgument(fmt.Sprintf("Negative position not allowed for %d", whence))
  435. }
  436. // This is the first request. So before anything else
  437. // get the ObjectInfo.
  438. if !o.isStarted || !o.objectInfoSet {
  439. // Create the new Seek request.
  440. seekReq := getRequest{
  441. isReadOp: false,
  442. Offset: offset,
  443. isFirstReq: true,
  444. }
  445. // Send and receive from the seek request.
  446. _, err := o.doGetRequest(seekReq)
  447. if err != nil {
  448. // Save the error.
  449. o.prevErr = err
  450. return 0, err
  451. }
  452. }
  453. // Switch through whence.
  454. switch whence {
  455. default:
  456. return 0, ErrInvalidArgument(fmt.Sprintf("Invalid whence %d", whence))
  457. case 0:
  458. if o.objectInfo.Size > -1 && offset > o.objectInfo.Size {
  459. return 0, io.EOF
  460. }
  461. o.currOffset = offset
  462. case 1:
  463. if o.objectInfo.Size > -1 && o.currOffset+offset > o.objectInfo.Size {
  464. return 0, io.EOF
  465. }
  466. o.currOffset += offset
  467. case 2:
  468. // If we don't know the object size return an error for io.SeekEnd
  469. if o.objectInfo.Size < 0 {
  470. return 0, ErrInvalidArgument("Whence END is not supported when the object size is unknown")
  471. }
  472. // Seeking to positive offset is valid for whence '2', but
  473. // since we are backing a Reader we have reached 'EOF' if
  474. // offset is positive.
  475. if offset > 0 {
  476. return 0, io.EOF
  477. }
  478. // Seeking to negative position not allowed for whence.
  479. if o.objectInfo.Size+offset < 0 {
  480. return 0, ErrInvalidArgument(fmt.Sprintf("Seeking at negative offset not allowed for %d", whence))
  481. }
  482. o.currOffset = o.objectInfo.Size + offset
  483. }
  484. // Reset the saved error since we successfully seeked, let the Read
  485. // and ReadAt decide.
  486. if o.prevErr == io.EOF {
  487. o.prevErr = nil
  488. }
  489. // Ask lower level to fetch again from source
  490. o.seekData = true
  491. // Return the effective offset.
  492. return o.currOffset, nil
  493. }
  494. // Close - The behavior of Close after the first call returns error
  495. // for subsequent Close() calls.
  496. func (o *Object) Close() (err error) {
  497. if o == nil {
  498. return ErrInvalidArgument("Object is nil")
  499. }
  500. // Locking.
  501. o.mutex.Lock()
  502. defer o.mutex.Unlock()
  503. // if already closed return an error.
  504. if o.isClosed {
  505. return o.prevErr
  506. }
  507. // Close successfully.
  508. close(o.doneCh)
  509. // Save for future operations.
  510. errMsg := "Object is already closed. Bad file descriptor."
  511. o.prevErr = errors.New(errMsg)
  512. // Save here that we closed done channel successfully.
  513. o.isClosed = true
  514. return nil
  515. }
  516. // newObject instantiates a new *minio.Object*
  517. // ObjectInfo will be set by setObjectInfo
  518. func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object {
  519. return &Object{
  520. mutex: &sync.Mutex{},
  521. reqCh: reqCh,
  522. resCh: resCh,
  523. doneCh: doneCh,
  524. }
  525. }
  526. // getObject - retrieve object from Object Storage.
  527. //
  528. // Additionally this function also takes range arguments to download the specified
  529. // range bytes of an object. Setting offset and length = 0 will download the full object.
  530. //
  531. // For more information about the HTTP Range header.
  532. // go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.
  533. func (c Client) getObject(ctx context.Context, bucketName, objectName string, opts GetObjectOptions) (io.ReadCloser, ObjectInfo, error) {
  534. // Validate input arguments.
  535. if err := s3utils.CheckValidBucketName(bucketName); err != nil {
  536. return nil, ObjectInfo{}, err
  537. }
  538. if err := s3utils.CheckValidObjectName(objectName); err != nil {
  539. return nil, ObjectInfo{}, err
  540. }
  541. // Execute GET on objectName.
  542. resp, err := c.executeMethod(ctx, "GET", requestMetadata{
  543. bucketName: bucketName,
  544. objectName: objectName,
  545. customHeader: opts.Header(),
  546. contentSHA256Hex: emptySHA256Hex,
  547. })
  548. if err != nil {
  549. return nil, ObjectInfo{}, err
  550. }
  551. if resp != nil {
  552. if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
  553. return nil, ObjectInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
  554. }
  555. }
  556. // Trim off the odd double quotes from ETag in the beginning and end.
  557. md5sum := strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
  558. md5sum = strings.TrimSuffix(md5sum, "\"")
  559. // Parse the date.
  560. date, err := time.Parse(http.TimeFormat, resp.Header.Get("Last-Modified"))
  561. if err != nil {
  562. msg := "Last-Modified time format not recognized. " + reportIssue
  563. return nil, ObjectInfo{}, ErrorResponse{
  564. Code: "InternalError",
  565. Message: msg,
  566. RequestID: resp.Header.Get("x-amz-request-id"),
  567. HostID: resp.Header.Get("x-amz-id-2"),
  568. Region: resp.Header.Get("x-amz-bucket-region"),
  569. }
  570. }
  571. // Get content-type.
  572. contentType := strings.TrimSpace(resp.Header.Get("Content-Type"))
  573. if contentType == "" {
  574. contentType = "application/octet-stream"
  575. }
  576. objectStat := ObjectInfo{
  577. ETag: md5sum,
  578. Key: objectName,
  579. Size: resp.ContentLength,
  580. LastModified: date,
  581. ContentType: contentType,
  582. // Extract only the relevant header keys describing the object.
  583. // following function filters out a list of standard set of keys
  584. // which are not part of object metadata.
  585. Metadata: extractObjMetadata(resp.Header),
  586. }
  587. // do not close body here, caller will close
  588. return resp.Body, objectStat, nil
  589. }