api-compose-object.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. /*
  2. * MinIO Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2017, 2018 MinIO, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package s3cli
  18. import (
  19. "context"
  20. "fmt"
  21. "io"
  22. "io/ioutil"
  23. "net/http"
  24. "net/url"
  25. "strconv"
  26. "strings"
  27. "time"
  28. "github.com/minio/minio-go/v6/pkg/encrypt"
  29. "github.com/minio/minio-go/v6/pkg/s3utils"
  30. )
  31. // DestinationInfo - type with information about the object to be
  32. // created via server-side copy requests, using the Compose API.
  33. type DestinationInfo struct {
  34. bucket, object string
  35. encryption encrypt.ServerSide
  36. // if no user-metadata is provided, it is copied from source
  37. // (when there is only once source object in the compose
  38. // request)
  39. userMetadata map[string]string
  40. }
  41. // NewDestinationInfo - creates a compose-object/copy-source
  42. // destination info object.
  43. //
  44. // `encSSEC` is the key info for server-side-encryption with customer
  45. // provided key. If it is nil, no encryption is performed.
  46. //
  47. // `userMeta` is the user-metadata key-value pairs to be set on the
  48. // destination. The keys are automatically prefixed with `x-amz-meta-`
  49. // if needed. If nil is passed, and if only a single source (of any
  50. // size) is provided in the ComposeObject call, then metadata from the
  51. // source is copied to the destination.
  52. func NewDestinationInfo(bucket, object string, sse encrypt.ServerSide, userMeta map[string]string) (d DestinationInfo, err error) {
  53. // Input validation.
  54. if err = s3utils.CheckValidBucketName(bucket); err != nil {
  55. return d, err
  56. }
  57. if err = s3utils.CheckValidObjectName(object); err != nil {
  58. return d, err
  59. }
  60. // Process custom-metadata to remove a `x-amz-meta-` prefix if
  61. // present and validate that keys are distinct (after this
  62. // prefix removal).
  63. m := make(map[string]string)
  64. for k, v := range userMeta {
  65. if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
  66. k = k[len("x-amz-meta-"):]
  67. }
  68. if _, ok := m[k]; ok {
  69. return d, ErrInvalidArgument(fmt.Sprintf("Cannot add both %s and x-amz-meta-%s keys as custom metadata", k, k))
  70. }
  71. m[k] = v
  72. }
  73. return DestinationInfo{
  74. bucket: bucket,
  75. object: object,
  76. encryption: sse,
  77. userMetadata: m,
  78. }, nil
  79. }
  80. // getUserMetaHeadersMap - construct appropriate key-value pairs to send
  81. // as headers from metadata map to pass into copy-object request. For
  82. // single part copy-object (i.e. non-multipart object), enable the
  83. // withCopyDirectiveHeader to set the `x-amz-metadata-directive` to
  84. // `REPLACE`, so that metadata headers from the source are not copied
  85. // over.
  86. func (d *DestinationInfo) getUserMetaHeadersMap(withCopyDirectiveHeader bool) map[string]string {
  87. if len(d.userMetadata) == 0 {
  88. return nil
  89. }
  90. r := make(map[string]string)
  91. if withCopyDirectiveHeader {
  92. r["x-amz-metadata-directive"] = "REPLACE"
  93. }
  94. for k, v := range d.userMetadata {
  95. if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) {
  96. r[k] = v
  97. } else {
  98. r["x-amz-meta-"+k] = v
  99. }
  100. }
  101. return r
  102. }
  103. // SourceInfo - represents a source object to be copied, using
  104. // server-side copying APIs.
  105. type SourceInfo struct {
  106. bucket, object string
  107. start, end int64
  108. encryption encrypt.ServerSide
  109. // Headers to send with the upload-part-copy request involving
  110. // this source object.
  111. Headers http.Header
  112. }
  113. // NewSourceInfo - create a compose-object/copy-object source info
  114. // object.
  115. //
  116. // `decryptSSEC` is the decryption key using server-side-encryption
  117. // with customer provided key. It may be nil if the source is not
  118. // encrypted.
  119. func NewSourceInfo(bucket, object string, sse encrypt.ServerSide) SourceInfo {
  120. r := SourceInfo{
  121. bucket: bucket,
  122. object: object,
  123. start: -1, // range is unspecified by default
  124. encryption: sse,
  125. Headers: make(http.Header),
  126. }
  127. // Set the source header
  128. r.Headers.Set("x-amz-copy-source", s3utils.EncodePath(bucket+"/"+object))
  129. return r
  130. }
  131. // SetRange - Set the start and end offset of the source object to be
  132. // copied. If this method is not called, the whole source object is
  133. // copied.
  134. func (s *SourceInfo) SetRange(start, end int64) error {
  135. if start > end || start < 0 {
  136. return ErrInvalidArgument("start must be non-negative, and start must be at most end.")
  137. }
  138. // Note that 0 <= start <= end
  139. s.start, s.end = start, end
  140. return nil
  141. }
  142. // SetMatchETagCond - Set ETag match condition. The object is copied
  143. // only if the etag of the source matches the value given here.
  144. func (s *SourceInfo) SetMatchETagCond(etag string) error {
  145. if etag == "" {
  146. return ErrInvalidArgument("ETag cannot be empty.")
  147. }
  148. s.Headers.Set("x-amz-copy-source-if-match", etag)
  149. return nil
  150. }
  151. // SetMatchETagExceptCond - Set the ETag match exception
  152. // condition. The object is copied only if the etag of the source is
  153. // not the value given here.
  154. func (s *SourceInfo) SetMatchETagExceptCond(etag string) error {
  155. if etag == "" {
  156. return ErrInvalidArgument("ETag cannot be empty.")
  157. }
  158. s.Headers.Set("x-amz-copy-source-if-none-match", etag)
  159. return nil
  160. }
  161. // SetModifiedSinceCond - Set the modified since condition.
  162. func (s *SourceInfo) SetModifiedSinceCond(modTime time.Time) error {
  163. if modTime.IsZero() {
  164. return ErrInvalidArgument("Input time cannot be 0.")
  165. }
  166. s.Headers.Set("x-amz-copy-source-if-modified-since", modTime.Format(http.TimeFormat))
  167. return nil
  168. }
  169. // SetUnmodifiedSinceCond - Set the unmodified since condition.
  170. func (s *SourceInfo) SetUnmodifiedSinceCond(modTime time.Time) error {
  171. if modTime.IsZero() {
  172. return ErrInvalidArgument("Input time cannot be 0.")
  173. }
  174. s.Headers.Set("x-amz-copy-source-if-unmodified-since", modTime.Format(http.TimeFormat))
  175. return nil
  176. }
  177. // Helper to fetch size and etag of an object using a StatObject call.
  178. func (s *SourceInfo) getProps(c Client) (size int64, etag string, userMeta map[string]string, err error) {
  179. // Get object info - need size and etag here. Also, decryption
  180. // headers are added to the stat request if given.
  181. var objInfo ObjectInfo
  182. opts := StatObjectOptions{GetObjectOptions{ServerSideEncryption: encrypt.SSE(s.encryption)}}
  183. objInfo, err = c.statObject(context.Background(), s.bucket, s.object, opts)
  184. if err != nil {
  185. err = ErrInvalidArgument(fmt.Sprintf("Could not stat object - %s/%s: %v", s.bucket, s.object, err))
  186. } else {
  187. size = objInfo.Size
  188. etag = objInfo.ETag
  189. userMeta = make(map[string]string)
  190. for k, v := range objInfo.Metadata {
  191. if strings.HasPrefix(k, "x-amz-meta-") {
  192. if len(v) > 0 {
  193. userMeta[k] = v[0]
  194. }
  195. }
  196. }
  197. }
  198. return
  199. }
  200. // Low level implementation of CopyObject API, supports only upto 5GiB worth of copy.
  201. func (c Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string,
  202. metadata map[string]string) (ObjectInfo, error) {
  203. // Build headers.
  204. headers := make(http.Header)
  205. // Set all the metadata headers.
  206. for k, v := range metadata {
  207. headers.Set(k, v)
  208. }
  209. // Set the source header
  210. headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
  211. // Send upload-part-copy request
  212. resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
  213. bucketName: destBucket,
  214. objectName: destObject,
  215. customHeader: headers,
  216. })
  217. defer closeResponse(resp)
  218. if err != nil {
  219. return ObjectInfo{}, err
  220. }
  221. // Check if we got an error response.
  222. if resp.StatusCode != http.StatusOK {
  223. return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject)
  224. }
  225. cpObjRes := CopyObjectResult{}
  226. err = xmlDecoder(resp.Body, &cpObjRes)
  227. if err != nil {
  228. return ObjectInfo{}, err
  229. }
  230. objInfo := ObjectInfo{
  231. Key: destObject,
  232. ETag: strings.Trim(cpObjRes.ETag, "\""),
  233. LastModified: cpObjRes.LastModified,
  234. }
  235. return objInfo, nil
  236. }
  237. func (c Client) CopyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string,
  238. partID int, startOffset int64, length int64, metadata map[string]string) (p CompletePart, err error) {
  239. headers := make(http.Header)
  240. // Set source
  241. headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
  242. if startOffset < 0 {
  243. return p, ErrInvalidArgument("startOffset must be non-negative")
  244. }
  245. if length >= 0 {
  246. headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
  247. }
  248. for k, v := range metadata {
  249. headers.Set(k, v)
  250. }
  251. queryValues := make(url.Values)
  252. queryValues.Set("partNumber", strconv.Itoa(partID))
  253. queryValues.Set("uploadId", uploadID)
  254. resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
  255. bucketName: destBucket,
  256. objectName: destObject,
  257. customHeader: headers,
  258. queryValues: queryValues,
  259. })
  260. defer closeResponse(resp)
  261. if err != nil {
  262. return
  263. }
  264. // Check if we got an error response.
  265. if resp.StatusCode != http.StatusOK {
  266. return p, httpRespToErrorResponse(resp, destBucket, destObject)
  267. }
  268. // Decode copy-part response on success.
  269. cpObjRes := CopyObjectResult{}
  270. err = xmlDecoder(resp.Body, &cpObjRes)
  271. if err != nil {
  272. return p, err
  273. }
  274. p.PartNumber, p.ETag = partID, cpObjRes.ETag
  275. return p, nil
  276. }
  277. // uploadPartCopy - helper function to create a part in a multipart
  278. // upload via an upload-part-copy request
  279. // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
  280. func (c Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int,
  281. headers http.Header) (p CompletePart, err error) {
  282. // Build query parameters
  283. urlValues := make(url.Values)
  284. urlValues.Set("partNumber", strconv.Itoa(partNumber))
  285. urlValues.Set("uploadId", uploadID)
  286. // Send upload-part-copy request
  287. resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
  288. bucketName: bucket,
  289. objectName: object,
  290. customHeader: headers,
  291. queryValues: urlValues,
  292. })
  293. defer closeResponse(resp)
  294. if err != nil {
  295. return p, err
  296. }
  297. // Check if we got an error response.
  298. if resp.StatusCode != http.StatusOK {
  299. return p, httpRespToErrorResponse(resp, bucket, object)
  300. }
  301. // Decode copy-part response on success.
  302. cpObjRes := CopyObjectResult{}
  303. err = xmlDecoder(resp.Body, &cpObjRes)
  304. if err != nil {
  305. return p, err
  306. }
  307. p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
  308. return p, nil
  309. }
  310. // ComposeObjectWithProgress - creates an object using server-side copying of
  311. // existing objects. It takes a list of source objects (with optional
  312. // offsets) and concatenates them into a new object using only
  313. // server-side copying operations. Optionally takes progress reader hook
  314. // for applications to look at current progress.
  315. func (c Client) ComposeObjectWithProgress(dst DestinationInfo, srcs []SourceInfo, progress io.Reader) error {
  316. if len(srcs) < 1 || len(srcs) > maxPartsCount {
  317. return ErrInvalidArgument("There must be as least one and up to 10000 source objects.")
  318. }
  319. ctx := context.Background()
  320. srcSizes := make([]int64, len(srcs))
  321. var totalSize, size, totalParts int64
  322. var srcUserMeta map[string]string
  323. etags := make([]string, len(srcs))
  324. var err error
  325. for i, src := range srcs {
  326. size, etags[i], srcUserMeta, err = src.getProps(c)
  327. if err != nil {
  328. return err
  329. }
  330. // Error out if client side encryption is used in this source object when
  331. // more than one source objects are given.
  332. if len(srcs) > 1 && src.Headers.Get("x-amz-meta-x-amz-key") != "" {
  333. return ErrInvalidArgument(
  334. fmt.Sprintf("Client side encryption is used in source object %s/%s", src.bucket, src.object))
  335. }
  336. // Check if a segment is specified, and if so, is the
  337. // segment within object bounds?
  338. if src.start != -1 {
  339. // Since range is specified,
  340. // 0 <= src.start <= src.end
  341. // so only invalid case to check is:
  342. if src.end >= size {
  343. return ErrInvalidArgument(
  344. fmt.Sprintf("SourceInfo %d has invalid segment-to-copy [%d, %d] (size is %d)",
  345. i, src.start, src.end, size))
  346. }
  347. size = src.end - src.start + 1
  348. }
  349. // Only the last source may be less than `absMinPartSize`
  350. if size < absMinPartSize && i < len(srcs)-1 {
  351. return ErrInvalidArgument(
  352. fmt.Sprintf("SourceInfo %d is too small (%d) and it is not the last part", i, size))
  353. }
  354. // Is data to copy too large?
  355. totalSize += size
  356. if totalSize > maxMultipartPutObjectSize {
  357. return ErrInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
  358. }
  359. // record source size
  360. srcSizes[i] = size
  361. // calculate parts needed for current source
  362. totalParts += partsRequired(size)
  363. // Do we need more parts than we are allowed?
  364. if totalParts > maxPartsCount {
  365. return ErrInvalidArgument(fmt.Sprintf(
  366. "Your proposed compose object requires more than %d parts", maxPartsCount))
  367. }
  368. }
  369. // Single source object case (i.e. when only one source is
  370. // involved, it is being copied wholly and at most 5GiB in
  371. // size, emptyfiles are also supported).
  372. if (totalParts == 1 && srcs[0].start == -1 && totalSize <= maxPartSize) || (totalSize == 0) {
  373. return c.CopyObjectWithProgress(dst, srcs[0], progress)
  374. }
  375. // Now, handle multipart-copy cases.
  376. // 1. Ensure that the object has not been changed while
  377. // we are copying data.
  378. for i, src := range srcs {
  379. if src.Headers.Get("x-amz-copy-source-if-match") == "" {
  380. src.SetMatchETagCond(etags[i])
  381. }
  382. }
  383. // 2. Initiate a new multipart upload.
  384. // Set user-metadata on the destination object. If no
  385. // user-metadata is specified, and there is only one source,
  386. // (only) then metadata from source is copied.
  387. userMeta := dst.getUserMetaHeadersMap(false)
  388. metaMap := userMeta
  389. if len(userMeta) == 0 && len(srcs) == 1 {
  390. metaMap = srcUserMeta
  391. }
  392. metaHeaders := make(map[string]string)
  393. for k, v := range metaMap {
  394. metaHeaders[k] = v
  395. }
  396. uploadID, err := c.newUploadID(ctx, dst.bucket, dst.object, PutObjectOptions{ServerSideEncryption: dst.encryption, UserMetadata: metaHeaders})
  397. if err != nil {
  398. return err
  399. }
  400. // 3. Perform copy part uploads
  401. objParts := []CompletePart{}
  402. partIndex := 1
  403. for i, src := range srcs {
  404. h := src.Headers
  405. if src.encryption != nil {
  406. encrypt.SSECopy(src.encryption).Marshal(h)
  407. }
  408. // Add destination encryption headers
  409. if dst.encryption != nil {
  410. dst.encryption.Marshal(h)
  411. }
  412. // calculate start/end indices of parts after
  413. // splitting.
  414. startIdx, endIdx := calculateEvenSplits(srcSizes[i], src)
  415. for j, start := range startIdx {
  416. end := endIdx[j]
  417. // Add (or reset) source range header for
  418. // upload part copy request.
  419. h.Set("x-amz-copy-source-range",
  420. fmt.Sprintf("bytes=%d-%d", start, end))
  421. // make upload-part-copy request
  422. complPart, err := c.uploadPartCopy(ctx, dst.bucket,
  423. dst.object, uploadID, partIndex, h)
  424. if err != nil {
  425. return err
  426. }
  427. if progress != nil {
  428. io.CopyN(ioutil.Discard, progress, end-start+1)
  429. }
  430. objParts = append(objParts, complPart)
  431. partIndex++
  432. }
  433. }
  434. // 4. Make final complete-multipart request.
  435. _, err = c.CompleteMultipartUpload(ctx, dst.bucket, dst.object, uploadID,
  436. CompleteMultipartUpload{Parts: objParts})
  437. if err != nil {
  438. return err
  439. }
  440. return nil
  441. }
  442. // ComposeObject - creates an object using server-side copying of
  443. // existing objects. It takes a list of source objects (with optional
  444. // offsets) and concatenates them into a new object using only
  445. // server-side copying operations.
  446. func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
  447. return c.ComposeObjectWithProgress(dst, srcs, nil)
  448. }
  449. // partsRequired is maximum parts possible with
  450. // max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1))
  451. func partsRequired(size int64) int64 {
  452. maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1)
  453. r := size / int64(maxPartSize)
  454. if size%int64(maxPartSize) > 0 {
  455. r++
  456. }
  457. return r
  458. }
  459. // calculateEvenSplits - computes splits for a source and returns
  460. // start and end index slices. Splits happen evenly to be sure that no
  461. // part is less than 5MiB, as that could fail the multipart request if
  462. // it is not the last part.
  463. func calculateEvenSplits(size int64, src SourceInfo) (startIndex, endIndex []int64) {
  464. if size == 0 {
  465. return
  466. }
  467. reqParts := partsRequired(size)
  468. startIndex = make([]int64, reqParts)
  469. endIndex = make([]int64, reqParts)
  470. // Compute number of required parts `k`, as:
  471. //
  472. // k = ceiling(size / copyPartSize)
  473. //
  474. // Now, distribute the `size` bytes in the source into
  475. // k parts as evenly as possible:
  476. //
  477. // r parts sized (q+1) bytes, and
  478. // (k - r) parts sized q bytes, where
  479. //
  480. // size = q * k + r (by simple division of size by k,
  481. // so that 0 <= r < k)
  482. //
  483. start := src.start
  484. if start == -1 {
  485. start = 0
  486. }
  487. quot, rem := size/reqParts, size%reqParts
  488. nextStart := start
  489. for j := int64(0); j < reqParts; j++ {
  490. curPartSize := quot
  491. if j < rem {
  492. curPartSize++
  493. }
  494. cStart := nextStart
  495. cEnd := cStart + curPartSize - 1
  496. nextStart = cEnd + 1
  497. startIndex[j], endIndex[j] = cStart, cEnd
  498. }
  499. return
  500. }