api-put-object.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. /*
  2. * MinIO Go Library for Amazon S3 Compatible Cloud Storage
  3. * Copyright 2015-2017 MinIO, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package s3cli
  18. import (
  19. "bytes"
  20. "context"
  21. "fmt"
  22. "io"
  23. "net/http"
  24. "runtime/debug"
  25. "sort"
  26. "golang.org/x/net/http/httpguts"
  27. "github.com/minio/minio-go/v6/pkg/encrypt"
  28. "github.com/minio/minio-go/v6/pkg/s3utils"
  29. )
  30. // PutObjectOptions represents options specified by user for PutObject call
  31. type PutObjectOptions struct {
  32. UserMetadata map[string]string
  33. Progress io.Reader
  34. ContentType string
  35. ContentEncoding string
  36. ContentDisposition string
  37. ContentLanguage string
  38. CacheControl string
  39. ServerSideEncryption encrypt.ServerSide
  40. NumThreads uint
  41. StorageClass string
  42. WebsiteRedirectLocation string
  43. PartSize uint64
  44. }
  45. // getNumThreads - gets the number of threads to be used in the multipart
  46. // put object operation
  47. func (opts PutObjectOptions) getNumThreads() (numThreads int) {
  48. if opts.NumThreads > 0 {
  49. numThreads = int(opts.NumThreads)
  50. } else {
  51. numThreads = totalWorkers
  52. }
  53. return
  54. }
  55. // Header - constructs the headers from metadata entered by user in
  56. // PutObjectOptions struct
  57. func (opts PutObjectOptions) Header() (header http.Header) {
  58. header = make(http.Header)
  59. if opts.ContentType != "" {
  60. header["Content-Type"] = []string{opts.ContentType}
  61. } else {
  62. header["Content-Type"] = []string{"application/octet-stream"}
  63. }
  64. if opts.ContentEncoding != "" {
  65. header["Content-Encoding"] = []string{opts.ContentEncoding}
  66. }
  67. if opts.ContentDisposition != "" {
  68. header["Content-Disposition"] = []string{opts.ContentDisposition}
  69. }
  70. if opts.ContentLanguage != "" {
  71. header["Content-Language"] = []string{opts.ContentLanguage}
  72. }
  73. if opts.CacheControl != "" {
  74. header["Cache-Control"] = []string{opts.CacheControl}
  75. }
  76. if opts.ServerSideEncryption != nil {
  77. opts.ServerSideEncryption.Marshal(header)
  78. }
  79. if opts.StorageClass != "" {
  80. header[amzStorageClass] = []string{opts.StorageClass}
  81. }
  82. if opts.WebsiteRedirectLocation != "" {
  83. header[amzWebsiteRedirectLocation] = []string{opts.WebsiteRedirectLocation}
  84. }
  85. for k, v := range opts.UserMetadata {
  86. if !isAmzHeader(k) && !isStandardHeader(k) && !isStorageClassHeader(k) {
  87. header["X-Amz-Meta-"+k] = []string{v}
  88. } else {
  89. header[k] = []string{v}
  90. }
  91. }
  92. return
  93. }
  94. // validate() checks if the UserMetadata map has standard headers or and raises an error if so.
  95. func (opts PutObjectOptions) validate() (err error) {
  96. for k, v := range opts.UserMetadata {
  97. if !httpguts.ValidHeaderFieldName(k) || isStandardHeader(k) || isSSEHeader(k) || isStorageClassHeader(k) {
  98. return ErrInvalidArgument(k + " unsupported user defined metadata name")
  99. }
  100. if !httpguts.ValidHeaderFieldValue(v) {
  101. return ErrInvalidArgument(v + " unsupported user defined metadata value")
  102. }
  103. }
  104. return nil
  105. }
  106. // completedParts is a collection of parts sortable by their part numbers.
  107. // used for sorting the uploaded parts before completing the multipart request.
  108. type completedParts []CompletePart
  109. func (a completedParts) Len() int { return len(a) }
  110. func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  111. func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
  112. // PutObject creates an object in a bucket.
  113. //
  114. // You must have WRITE permissions on a bucket to create an object.
  115. //
  116. // - For size smaller than 128MiB PutObject automatically does a
  117. // single atomic Put operation.
  118. // - For size larger than 128MiB PutObject automatically does a
  119. // multipart Put operation.
  120. // - For size input as -1 PutObject does a multipart Put operation
  121. // until input stream reaches EOF. Maximum object size that can
  122. // be uploaded through this operation will be 5TiB.
  123. func (c Client) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64,
  124. opts PutObjectOptions) (n int64, err error) {
  125. return c.PutObjectWithContext(context.Background(), bucketName, objectName, reader, objectSize, opts)
  126. }
  127. func (c Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
  128. // Check for largest object size allowed.
  129. if size > int64(maxMultipartPutObjectSize) {
  130. return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
  131. }
  132. // NOTE: Streaming signature is not supported by GCS.
  133. if s3utils.IsGoogleEndpoint(*c.endpointURL) {
  134. // Do not compute MD5 for Google Cloud Storage.
  135. return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
  136. }
  137. partSize := opts.PartSize
  138. if opts.PartSize == 0 {
  139. partSize = minPartSize
  140. }
  141. if c.overrideSignerType.IsV2() {
  142. if size >= 0 && size < int64(partSize) {
  143. return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
  144. }
  145. return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
  146. }
  147. if size < 0 {
  148. return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
  149. }
  150. if size < int64(partSize) {
  151. return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
  152. }
  153. // For all sizes greater than 128MiB do multipart.
  154. return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
  155. }
  156. func (c Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) {
  157. // Input validation.
  158. if err = s3utils.CheckValidBucketName(bucketName); err != nil {
  159. return 0, err
  160. }
  161. if err = s3utils.CheckValidObjectName(objectName); err != nil {
  162. return 0, err
  163. }
  164. // Total data read and written to server. should be equal to
  165. // 'size' at the end of the call.
  166. var totalUploadedSize int64
  167. // Complete multipart upload.
  168. var complMultipartUpload CompleteMultipartUpload
  169. // Calculate the optimal parts info for a given size.
  170. totalPartsCount, partSize, _, err := optimalPartInfo(-1, opts.PartSize)
  171. if err != nil {
  172. return 0, err
  173. }
  174. // Initiate a new multipart upload.
  175. uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
  176. if err != nil {
  177. return 0, err
  178. }
  179. defer func() {
  180. if err != nil {
  181. c.AbortMultipartUpload(ctx, bucketName, objectName, uploadID)
  182. }
  183. }()
  184. // Part number always starts with '1'.
  185. partNumber := 1
  186. // Initialize parts uploaded map.
  187. partsInfo := make(map[int]ObjectPart)
  188. // Create a buffer.
  189. buf := make([]byte, partSize)
  190. defer debug.FreeOSMemory()
  191. for partNumber <= totalPartsCount {
  192. length, rErr := io.ReadFull(reader, buf)
  193. if rErr == io.EOF && partNumber > 1 {
  194. break
  195. }
  196. if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
  197. return 0, rErr
  198. }
  199. // Update progress reader appropriately to the latest offset
  200. // as we read from the source.
  201. rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
  202. // Proceed to upload the part.
  203. var objPart ObjectPart
  204. objPart, err = c.UploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
  205. "", "", int64(length), opts.ServerSideEncryption)
  206. if err != nil {
  207. return totalUploadedSize, err
  208. }
  209. // Save successfully uploaded part metadata.
  210. partsInfo[partNumber] = objPart
  211. // Save successfully uploaded size.
  212. totalUploadedSize += int64(length)
  213. // Increment part number.
  214. partNumber++
  215. // For unknown size, Read EOF we break away.
  216. // We do not have to upload till totalPartsCount.
  217. if rErr == io.EOF {
  218. break
  219. }
  220. }
  221. // Loop over total uploaded parts to save them in
  222. // Parts array before completing the multipart request.
  223. for i := 1; i < partNumber; i++ {
  224. part, ok := partsInfo[i]
  225. if !ok {
  226. return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i))
  227. }
  228. complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
  229. ETag: part.ETag,
  230. PartNumber: part.PartNumber,
  231. })
  232. }
  233. // Sort all completed parts.
  234. sort.Sort(completedParts(complMultipartUpload.Parts))
  235. if _, err = c.CompleteMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil {
  236. return totalUploadedSize, err
  237. }
  238. // Return final size.
  239. return totalUploadedSize, nil
  240. }