copier.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. package s3
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/ks3sdklib/aws-sdk-go/aws"
  7. "io"
  8. "os"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. type CopyFileInput struct {
  17. // The name of the bucket.
  18. Bucket *string `location:"uri" locationName:"Bucket" type:"string" required:"true"`
  19. // Object key of the object.
  20. Key *string `location:"uri" locationName:"Key" type:"string" required:"true"`
  21. // The name of the source bucket.
  22. SourceBucket *string `location:"uri" locationName:"SourceBucket" type:"string" required:"true"`
  23. // Object key of the source object.
  24. SourceKey *string `location:"uri" locationName:"SourceKey" type:"string" required:"true"`
  25. // The size of each part.
  26. PartSize *int64 `type:"integer"`
  27. // The number of tasks to upload the file.
  28. TaskNum *int64 `type:"integer"`
  29. // Whether to enable checkpoint.
  30. EnableCheckpoint *bool `type:"boolean"`
  31. // The directory to store the checkpoint file.
  32. CheckpointDir *string `type:"string"`
  33. // The checkpoint file path.
  34. CheckpointFile *string `type:"string"`
  35. // The canned ACL to apply to the object.
  36. ACL *string `location:"header" locationName:"x-amz-acl" type:"string"`
  37. // Specifies caching behavior along the request/reply chain.
  38. CacheControl *string `location:"header" locationName:"Cache-Control" type:"string"`
  39. // Specifies presentational information for the object.
  40. ContentDisposition *string `location:"header" locationName:"Content-Disposition" type:"string"`
  41. // Specifies what content encodings have been applied to the object and thus
  42. // what decoding mechanisms must be applied to obtain the media-type referenced
  43. // by the Content-Type header field.
  44. ContentEncoding *string `location:"header" locationName:"Content-Encoding" type:"string"`
  45. // A standard MIME type describing the format of the object data.
  46. ContentType *string `location:"header" locationName:"Content-Type" type:"string"`
  47. // The date and time at which the object is no longer cacheable.
  48. Expires *time.Time `location:"header" locationName:"Expires" type:"timestamp" timestampFormat:"rfc822"`
  49. // A map of metadata to store with the object in S3.
  50. Metadata map[string]*string `location:"headers" locationName:"x-amz-meta-" type:"map"`
  51. // Specifies whether the metadata is copied from the source object or replaced
  52. // with metadata provided in the request.
  53. MetadataDirective *string `location:"header" locationName:"x-amz-metadata-directive" type:"string"`
  54. // The type of storage to use for the object. Defaults to 'STANDARD'.
  55. StorageClass *string `location:"header" locationName:"x-amz-storage-class" type:"string"`
  56. // Specifies the object tag of the object. Multiple tags can be set at the same time, such as: TagA=A&TagB=B.
  57. // Note: Key and Value need to be URL-encoded first. If an item does not have "=", the Value is considered to be an empty string.
  58. Tagging *string `location:"header" locationName:"x-amz-tagging" type:"string"`
  59. // Specifies how to set the object tag of the target object.
  60. // Default value: COPY
  61. // Valid values:
  62. // COPY (default value): Copies the object tag of the source object to the target object.
  63. // REPLACE: Ignores the object tag of the source object and directly uses the object tag specified in the request.
  64. TaggingDirective *string `location:"header" locationName:"x-amz-tagging-directive" type:"string"`
  65. // Specifies whether the object is forbidden to overwrite.
  66. ForbidOverwrite *bool `location:"header" locationName:"x-amz-forbid-overwrite" type:"boolean"`
  67. // Allows grantee to read the object data and its metadata.
  68. GrantRead *string `location:"header" locationName:"x-amz-grant-read" type:"string"`
  69. // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object.
  70. GrantFullControl *string `location:"header" locationName:"x-amz-grant-full-control" type:"string"`
  71. // Copies the object if its entity tag (ETag) matches the specified tag.
  72. CopySourceIfMatch *string `location:"header" locationName:"x-amz-copy-source-if-match" type:"string"`
  73. // Copies the object if it has been modified since the specified time.
  74. CopySourceIfModifiedSince *time.Time `location:"header" locationName:"x-amz-copy-source-if-modified-since" type:"timestamp" timestampFormat:"rfc822"`
  75. // Copies the object if its entity tag (ETag) is different from the specified ETag.
  76. CopySourceIfNoneMatch *string `location:"header" locationName:"x-amz-copy-source-if-none-match" type:"string"`
  77. // Copies the object if it hasn't been modified since the specified time.
  78. CopySourceIfUnmodifiedSince *time.Time `location:"header" locationName:"x-amz-copy-source-if-unmodified-since" type:"timestamp" timestampFormat:"rfc822"`
  79. // Specifies the decryption algorithm used to decrypt the data source object. Valid value: AES256.
  80. CopySourceSSECustomerAlgorithm *string `location:"header" locationName:"x-amz-copy-source-server-side-encryption-customer-algorithm" type:"string"`
  81. // The base64-encoded encryption key used for KS3 decryption specified by the user.
  82. // Its value must be the same as the key used when the data source object was created.
  83. CopySourceSSECustomerKey *string `location:"header" locationName:"x-amz-copy-source-server-side-encryption-customer-key" type:"string"`
  84. // Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321.
  85. // If the server encrypted with a user-provided encryption key, when decryption is requested,
  86. // the response will include this header to provide data consistency verification information
  87. // for the user-provided encryption key.
  88. CopySourceSSECustomerKeyMD5 *string `location:"header" locationName:"x-amz-copy-source-server-side-encryption-customer-key-MD5" type:"string"`
  89. // The Server-side encryption algorithm used when storing this object in KS3, eg: AES256.
  90. ServerSideEncryption *string `location:"header" locationName:"x-amz-server-side-encryption" type:"string"`
  91. // Specifies the algorithm to use to when encrypting the object, eg: AES256.
  92. SSECustomerAlgorithm *string `location:"header" locationName:"x-amz-server-side-encryption-customer-algorithm" type:"string"`
  93. // Specifies the customer-provided encryption key for KS3 to use in encrypting data.
  94. SSECustomerKey *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key" type:"string"`
  95. // Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321.
  96. SSECustomerKeyMD5 *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key-MD5" type:"string"`
  97. // Progress callback function
  98. ProgressFn aws.ProgressFunc `location:"function"`
  99. }
  100. type CopyFileOutput struct {
  101. Bucket *string
  102. Key *string
  103. ETag *string
  104. ChecksumCRC64ECMA *string
  105. }
  106. func (c *S3) CopyFile(request *CopyFileInput) (*CopyFileOutput, error) {
  107. return c.CopyFileWithContext(context.Background(), request)
  108. }
  109. func (c *S3) CopyFileWithContext(ctx context.Context, request *CopyFileInput) (*CopyFileOutput, error) {
  110. return newCopier(c, ctx, request).copyFile()
  111. }
  112. func (c *S3) CopyFileAcrossRegion(request *CopyFileInput, dstClient *S3) (*UploadFileOutput, error) {
  113. return c.CopyFileAcrossRegionWithContext(context.Background(), request, dstClient)
  114. }
  115. func (c *S3) CopyFileAcrossRegionWithContext(ctx context.Context, request *CopyFileInput, dstClient *S3) (*UploadFileOutput, error) {
  116. uploadFileRequest, err := c.buildUploadFileRequest(ctx, request)
  117. if err != nil {
  118. return nil, err
  119. }
  120. return dstClient.UploadFileWithContext(ctx, uploadFileRequest)
  121. }
  122. func (c *S3) buildUploadFileRequest(ctx context.Context, request *CopyFileInput) (*UploadFileInput, error) {
  123. if request == nil {
  124. return nil, errors.New("copyFileRequest is required")
  125. }
  126. if aws.ToString(request.Bucket) == "" {
  127. return nil, errors.New("bucket is required")
  128. }
  129. if aws.ToString(request.Key) == "" {
  130. return nil, errors.New("key is required")
  131. }
  132. if aws.ToString(request.SourceBucket) == "" {
  133. return nil, errors.New("source bucket is required")
  134. }
  135. if aws.ToString(request.SourceKey) == "" {
  136. return nil, errors.New("source key is required")
  137. }
  138. input := &UploadFileInput{
  139. Bucket: request.Bucket,
  140. Key: request.Key,
  141. PartSize: request.PartSize,
  142. TaskNum: request.TaskNum,
  143. EnableCheckpoint: request.EnableCheckpoint,
  144. CheckpointDir: request.CheckpointDir,
  145. CheckpointFile: request.CheckpointFile,
  146. ACL: request.ACL,
  147. CacheControl: request.CacheControl,
  148. ContentDisposition: request.ContentDisposition,
  149. ContentEncoding: request.ContentEncoding,
  150. ContentType: request.ContentType,
  151. Expires: request.Expires,
  152. Metadata: request.Metadata,
  153. StorageClass: request.StorageClass,
  154. Tagging: request.Tagging,
  155. ForbidOverwrite: request.ForbidOverwrite,
  156. GrantRead: request.GrantRead,
  157. GrantFullControl: request.GrantFullControl,
  158. ServerSideEncryption: request.ServerSideEncryption,
  159. SSECustomerAlgorithm: request.SSECustomerAlgorithm,
  160. SSECustomerKey: request.SSECustomerKey,
  161. SSECustomerKeyMD5: request.SSECustomerKeyMD5,
  162. ProgressFn: request.ProgressFn,
  163. }
  164. fetcher := &Fetcher{
  165. client: c,
  166. request: request,
  167. }
  168. var filePartFetcher FilePartFetcher = fetcher
  169. input.FilePartFetcher = &filePartFetcher
  170. resp, err := c.HeadObject(&HeadObjectInput{
  171. Bucket: request.SourceBucket,
  172. Key: request.SourceKey,
  173. IfModifiedSince: request.CopySourceIfModifiedSince,
  174. IfUnmodifiedSince: request.CopySourceIfUnmodifiedSince,
  175. IfMatch: request.CopySourceIfMatch,
  176. IfNoneMatch: request.CopySourceIfNoneMatch,
  177. SSECustomerAlgorithm: request.CopySourceSSECustomerAlgorithm,
  178. SSECustomerKey: request.CopySourceSSECustomerKey,
  179. SSECustomerKeyMD5: request.CopySourceSSECustomerKeyMD5,
  180. })
  181. if err != nil {
  182. return nil, err
  183. }
  184. input.ObjectMeta = resp.Metadata
  185. if !strings.EqualFold(aws.ToString(request.MetadataDirective), "REPLACE") {
  186. input.CacheControl = resp.Metadata[HTTPHeaderCacheControl]
  187. input.ContentDisposition = resp.Metadata[HTTPHeaderContentDisposition]
  188. input.ContentEncoding = resp.Metadata[HTTPHeaderContentEncoding]
  189. input.ContentType = resp.Metadata[HTTPHeaderContentType]
  190. expires, err := time.Parse("Mon, 02 Jan 2006 15:04:05 GMT", aws.ToString(resp.Metadata[HTTPHeaderExpires]))
  191. if err == nil {
  192. input.Expires = aws.Time(expires)
  193. }
  194. metaData := map[string]*string{}
  195. for k, v := range resp.Metadata {
  196. if strings.HasPrefix(strings.ToLower(k), MetaPrefix) {
  197. metaData[k] = v
  198. }
  199. }
  200. input.Metadata = metaData
  201. }
  202. if !strings.EqualFold(aws.ToString(request.TaggingDirective), "REPLACE") {
  203. taggingResp, err := c.GetObjectTaggingWithContext(ctx, &GetObjectTaggingInput{
  204. Bucket: request.SourceBucket,
  205. Key: request.SourceKey,
  206. })
  207. if err != nil {
  208. return nil, err
  209. }
  210. tagStr := taggingResp.Tagging.ToString()
  211. if tagStr != "" {
  212. input.Tagging = aws.String(tagStr)
  213. }
  214. }
  215. if input.ACL == nil {
  216. aclResp, err := c.GetObjectACLWithContext(ctx, &GetObjectACLInput{
  217. Bucket: request.SourceBucket,
  218. Key: request.SourceKey,
  219. })
  220. if err != nil {
  221. return nil, err
  222. }
  223. input.ACL = aws.String(GetCannedACL(aclResp.Grants))
  224. }
  225. if input.StorageClass == nil {
  226. input.StorageClass = resp.Metadata[HTTPHeaderAmzStorageClass]
  227. }
  228. return input, nil
  229. }
  230. type Fetcher struct {
  231. client *S3
  232. request *CopyFileInput
  233. }
  234. type Body struct {
  235. io.ReadCloser
  236. }
  237. func (b *Body) Seek(offset int64, whence int) (int64, error) {
  238. return 0, os.ErrInvalid
  239. }
  240. func (f *Fetcher) Fetch(objectRange []int64) (io.ReadSeeker, error) {
  241. resp, err := f.client.GetObject(&GetObjectInput{
  242. Bucket: f.request.SourceBucket,
  243. Key: f.request.SourceKey,
  244. Range: aws.String(fmt.Sprintf("bytes=%d-%d", objectRange[0], objectRange[1])),
  245. SSECustomerAlgorithm: f.request.CopySourceSSECustomerAlgorithm,
  246. SSECustomerKey: f.request.CopySourceSSECustomerKey,
  247. SSECustomerKeyMD5: f.request.CopySourceSSECustomerKeyMD5,
  248. })
  249. if err != nil {
  250. return nil, err
  251. }
  252. body := &Body{resp.Body}
  253. return body, nil
  254. }
  255. type Copier struct {
  256. client *S3
  257. context context.Context
  258. copyFileRequest *CopyFileInput
  259. copyCheckpoint *CopyCheckpoint
  260. CompletedSize int64
  261. copyObjectMeta map[string]*string
  262. mu sync.Mutex
  263. error error
  264. }
  265. func newCopier(s3 *S3, ctx context.Context, request *CopyFileInput) *Copier {
  266. return &Copier{
  267. client: s3,
  268. context: ctx,
  269. copyFileRequest: request,
  270. }
  271. }
  272. func (c *Copier) copyFile() (*CopyFileOutput, error) {
  273. err := c.validate()
  274. if err != nil {
  275. return nil, err
  276. }
  277. c.copyObjectMeta, err = c.headObject()
  278. if err != nil {
  279. return nil, err
  280. }
  281. fileSize, _ := strconv.ParseInt(aws.ToString(c.copyObjectMeta[HTTPHeaderContentLength]), 10, 64)
  282. var resp *CopyFileOutput
  283. if fileSize <= aws.ToLong(c.copyFileRequest.PartSize) {
  284. resp, err = c.copyObject()
  285. } else {
  286. resp, err = c.multipartCopy()
  287. }
  288. if err != nil {
  289. return nil, err
  290. }
  291. if c.client.Config.CrcCheckEnabled {
  292. clientCrc64, _ := strconv.ParseUint(aws.ToString(c.copyObjectMeta[HTTPHeaderAmzChecksumCrc64ecma]), 10, 64)
  293. serverCrc64, _ := strconv.ParseUint(aws.ToString(resp.ChecksumCRC64ECMA), 10, 64)
  294. c.client.Config.LogDebug("check file crc64, client crc64:%d, server crc64:%d", clientCrc64, serverCrc64)
  295. if serverCrc64 != 0 && clientCrc64 != serverCrc64 {
  296. return nil, errors.New(fmt.Sprintf("crc64 check failed, client crc64:%d, server crc64:%d", clientCrc64, serverCrc64))
  297. }
  298. }
  299. return resp, err
  300. }
  301. func (c *Copier) validate() error {
  302. request := c.copyFileRequest
  303. if request == nil {
  304. return errors.New("copyFileRequest is required")
  305. }
  306. if aws.ToString(request.Bucket) == "" {
  307. return errors.New("bucket is required")
  308. }
  309. if aws.ToString(request.Key) == "" {
  310. return errors.New("key is required")
  311. }
  312. if aws.ToString(request.SourceBucket) == "" {
  313. return errors.New("source bucket is required")
  314. }
  315. if aws.ToString(request.SourceKey) == "" {
  316. return errors.New("source key is required")
  317. }
  318. if request.PartSize == nil {
  319. request.PartSize = aws.Long(DefaultPartSize)
  320. } else if aws.ToLong(request.PartSize) < MinPartSize {
  321. request.PartSize = aws.Long(MinPartSize)
  322. } else if aws.ToLong(request.PartSize) > MaxPartSize {
  323. request.PartSize = aws.Long(MaxPartSize)
  324. }
  325. if aws.ToLong(request.TaskNum) <= 0 {
  326. request.TaskNum = aws.Long(DefaultTaskNum)
  327. }
  328. return nil
  329. }
  330. func (c *Copier) copyObject() (*CopyFileOutput, error) {
  331. request := c.copyFileRequest
  332. input := &CopyObjectInput{
  333. Bucket: request.Bucket,
  334. Key: request.Key,
  335. SourceBucket: request.SourceBucket,
  336. SourceKey: request.SourceKey,
  337. ACL: request.ACL,
  338. CacheControl: request.CacheControl,
  339. ContentDisposition: request.ContentDisposition,
  340. ContentEncoding: request.ContentEncoding,
  341. ContentType: request.ContentType,
  342. Expires: request.Expires,
  343. Metadata: request.Metadata,
  344. MetadataDirective: request.MetadataDirective,
  345. StorageClass: request.StorageClass,
  346. Tagging: request.Tagging,
  347. TaggingDirective: request.TaggingDirective,
  348. ForbidOverwrite: request.ForbidOverwrite,
  349. GrantRead: request.GrantRead,
  350. GrantFullControl: request.GrantFullControl,
  351. CopySourceIfMatch: request.CopySourceIfMatch,
  352. CopySourceIfNoneMatch: request.CopySourceIfNoneMatch,
  353. CopySourceIfModifiedSince: request.CopySourceIfModifiedSince,
  354. CopySourceIfUnmodifiedSince: request.CopySourceIfUnmodifiedSince,
  355. CopySourceSSECustomerAlgorithm: request.CopySourceSSECustomerAlgorithm,
  356. CopySourceSSECustomerKey: request.CopySourceSSECustomerKey,
  357. CopySourceSSECustomerKeyMD5: request.CopySourceSSECustomerKeyMD5,
  358. ServerSideEncryption: request.ServerSideEncryption,
  359. SSECustomerAlgorithm: request.SSECustomerAlgorithm,
  360. SSECustomerKey: request.SSECustomerKey,
  361. SSECustomerKeyMD5: request.SSECustomerKeyMD5,
  362. }
  363. if c.copyFileRequest.StorageClass == nil {
  364. input.StorageClass = c.copyObjectMeta[HTTPHeaderAmzStorageClass]
  365. }
  366. resp, err := c.client.CopyObjectWithContext(c.context, input)
  367. if err != nil {
  368. return nil, err
  369. }
  370. return &CopyFileOutput{
  371. Bucket: request.Bucket,
  372. Key: request.Key,
  373. ETag: resp.CopyObjectResult.ETag,
  374. ChecksumCRC64ECMA: resp.CopyObjectResult.ChecksumCRC64ECMA,
  375. }, nil
  376. }
  377. func (c *Copier) multipartCopy() (*CopyFileOutput, error) {
  378. ccp, err := newCopyCheckpoint(c)
  379. if err != nil {
  380. return nil, err
  381. }
  382. c.copyCheckpoint = ccp
  383. if aws.ToBoolean(c.copyFileRequest.EnableCheckpoint) {
  384. cpFilePath := aws.ToString(c.copyFileRequest.CheckpointFile)
  385. if cpFilePath == "" {
  386. cpFilePath, err = generateCopyCpFilePath(c.copyFileRequest)
  387. if err != nil {
  388. return nil, err
  389. }
  390. }
  391. ccp.CpFilePath = cpFilePath
  392. err = c.copyCheckpoint.load()
  393. if err != nil {
  394. return nil, err
  395. }
  396. if ccp.UploadId != "" && !c.isUploadIdValid() {
  397. ccp.UploadId = ""
  398. ccp.PartETagList = make([]*CompletedPart, 0)
  399. ccp.remove()
  400. }
  401. }
  402. if ccp.UploadId == "" {
  403. ccp.UploadId, err = c.initUploadId()
  404. if err != nil {
  405. return nil, err
  406. }
  407. ccp.dump()
  408. }
  409. fileSize := ccp.SrcObjectSize
  410. partSize := ccp.PartSize
  411. totalPartNum := (fileSize-1)/partSize + 1
  412. tasks := make(chan CopyPartTask, totalPartNum)
  413. var i int64
  414. for i = 0; i < totalPartNum; i++ {
  415. partNum := i + 1
  416. offset := (partNum - 1) * partSize
  417. actualPartSize := c.getActualPartSize(fileSize, partSize, partNum)
  418. partETag := c.getPartETag(partNum)
  419. if partETag != nil {
  420. c.publishProgress(actualPartSize)
  421. } else {
  422. uploadPartTask := CopyPartTask{
  423. partNumber: partNum,
  424. offset: offset,
  425. actualPartSize: actualPartSize,
  426. }
  427. tasks <- uploadPartTask
  428. }
  429. }
  430. close(tasks)
  431. var wg sync.WaitGroup
  432. for i = 0; i < aws.ToLong(c.copyFileRequest.TaskNum); i++ {
  433. wg.Add(1)
  434. go c.runTask(tasks, &wg)
  435. }
  436. wg.Wait()
  437. if c.error != nil {
  438. return nil, c.error
  439. }
  440. completedMultipartUpload := c.getMultipartUploadParts()
  441. resp, err := c.completeMultipartUpload(completedMultipartUpload)
  442. if err != nil {
  443. return nil, err
  444. }
  445. return c.getCopyFileOutput(resp), nil
  446. }
  447. func (c *Copier) getCopyFileOutput(resp *CompleteMultipartUploadOutput) *CopyFileOutput {
  448. return &CopyFileOutput{
  449. Bucket: resp.Bucket,
  450. Key: resp.Key,
  451. ETag: resp.ETag,
  452. ChecksumCRC64ECMA: resp.ChecksumCRC64ECMA,
  453. }
  454. }
  455. func (c *Copier) getPartSize(fileSize int64, originPartSize int64) int64 {
  456. partSize := originPartSize
  457. totalPartNum := (fileSize-1)/partSize + 1
  458. for totalPartNum > MaxPartNum {
  459. partSize += originPartSize
  460. totalPartNum = (fileSize-1)/partSize + 1
  461. }
  462. return partSize
  463. }
  464. func (c *Copier) getActualPartSize(fileSize int64, partSize int64, partNum int64) int64 {
  465. offset := (partNum - 1) * partSize
  466. actualPartSize := partSize
  467. if offset+partSize >= fileSize {
  468. actualPartSize = fileSize - offset
  469. }
  470. return actualPartSize
  471. }
  472. func (c *Copier) getPartETag(partNumber int64) *CompletedPart {
  473. for _, partETag := range c.copyCheckpoint.PartETagList {
  474. if *partETag.PartNumber == partNumber {
  475. return partETag
  476. }
  477. }
  478. return nil
  479. }
  480. type CopyPartTask struct {
  481. partNumber int64
  482. offset int64
  483. actualPartSize int64
  484. }
  485. func (c *Copier) runTask(tasks <-chan CopyPartTask, wg *sync.WaitGroup) {
  486. defer wg.Done()
  487. for task := range tasks {
  488. if c.error != nil {
  489. return
  490. }
  491. partETag, err := c.copyPart(task)
  492. if err != nil {
  493. c.setError(err)
  494. return
  495. }
  496. c.updatePart(partETag)
  497. }
  498. }
  499. func (c *Copier) copyPart(task CopyPartTask) (CompletedPart, error) {
  500. request := c.copyFileRequest
  501. ccp := c.copyCheckpoint
  502. var partETag CompletedPart
  503. start := task.offset
  504. end := task.offset + task.actualPartSize - 1
  505. resp, err := c.client.UploadPartCopyWithContext(c.context, &UploadPartCopyInput{
  506. Bucket: aws.String(ccp.BucketName),
  507. Key: aws.String(ccp.ObjectKey),
  508. SourceBucket: aws.String(ccp.SrcBucketName),
  509. SourceKey: aws.String(ccp.SrcObjectKey),
  510. UploadID: aws.String(ccp.UploadId),
  511. PartNumber: aws.Long(task.partNumber),
  512. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", start, end)),
  513. CopySourceIfMatch: request.CopySourceIfMatch,
  514. CopySourceIfNoneMatch: request.CopySourceIfNoneMatch,
  515. CopySourceIfModifiedSince: request.CopySourceIfModifiedSince,
  516. CopySourceIfUnmodifiedSince: request.CopySourceIfUnmodifiedSince,
  517. CopySourceSSECustomerAlgorithm: request.CopySourceSSECustomerAlgorithm,
  518. CopySourceSSECustomerKey: request.CopySourceSSECustomerKey,
  519. CopySourceSSECustomerKeyMD5: request.CopySourceSSECustomerKeyMD5,
  520. SSECustomerAlgorithm: request.SSECustomerAlgorithm,
  521. SSECustomerKey: request.SSECustomerKey,
  522. SSECustomerKeyMD5: request.SSECustomerKeyMD5,
  523. })
  524. if err != nil {
  525. return partETag, err
  526. }
  527. partETag.PartNumber = aws.Long(task.partNumber)
  528. partETag.ETag = resp.CopyPartResult.ETag
  529. partETag.ChecksumCRC64ECMA = resp.CopyPartResult.ChecksumCRC64ECMA
  530. c.publishProgress(task.actualPartSize)
  531. return partETag, nil
  532. }
  533. func (c *Copier) updatePart(partETag CompletedPart) {
  534. c.mu.Lock()
  535. defer c.mu.Unlock()
  536. c.copyCheckpoint.PartETagList = append(c.copyCheckpoint.PartETagList, &partETag)
  537. c.copyCheckpoint.dump()
  538. }
  539. func (c *Copier) setError(err error) {
  540. c.mu.Lock()
  541. defer c.mu.Unlock()
  542. if c.error == nil {
  543. c.error = err
  544. }
  545. }
  546. func (c *Copier) getMultipartUploadParts() *CompletedMultipartUpload {
  547. partETags := c.copyCheckpoint.PartETagList
  548. // 按照PartNumber排序
  549. sort.Sort(CompletedParts(partETags))
  550. return &CompletedMultipartUpload{
  551. Parts: partETags,
  552. }
  553. }
  554. func (c *Copier) completeMultipartUpload(completedMultipartUpload *CompletedMultipartUpload) (*CompleteMultipartUploadOutput, error) {
  555. resp, err := c.client.CompleteMultipartUploadWithContext(c.context, &CompleteMultipartUploadInput{
  556. Bucket: c.copyFileRequest.Bucket,
  557. Key: c.copyFileRequest.Key,
  558. UploadID: aws.String(c.copyCheckpoint.UploadId),
  559. MultipartUpload: completedMultipartUpload,
  560. ForbidOverwrite: c.copyFileRequest.ForbidOverwrite,
  561. })
  562. if err != nil {
  563. return nil, err
  564. }
  565. c.copyCheckpoint.remove()
  566. return resp, err
  567. }
  568. func (c *Copier) publishProgress(actualPartSize int64) {
  569. if c.copyFileRequest.ProgressFn != nil {
  570. atomic.AddInt64(&c.CompletedSize, actualPartSize)
  571. c.copyFileRequest.ProgressFn(actualPartSize, c.CompletedSize, c.copyCheckpoint.SrcObjectSize)
  572. }
  573. }
  574. func (c *Copier) headObject() (map[string]*string, error) {
  575. request := c.copyFileRequest
  576. resp, err := c.client.HeadObjectWithContext(c.context, &HeadObjectInput{
  577. Bucket: request.SourceBucket,
  578. Key: request.SourceKey,
  579. IfModifiedSince: request.CopySourceIfModifiedSince,
  580. IfUnmodifiedSince: request.CopySourceIfUnmodifiedSince,
  581. IfMatch: request.CopySourceIfMatch,
  582. IfNoneMatch: request.CopySourceIfNoneMatch,
  583. SSECustomerAlgorithm: request.CopySourceSSECustomerAlgorithm,
  584. SSECustomerKey: request.CopySourceSSECustomerKey,
  585. SSECustomerKeyMD5: request.CopySourceSSECustomerKeyMD5,
  586. })
  587. if err != nil {
  588. return nil, err
  589. }
  590. return resp.Metadata, err
  591. }
  592. func (c *Copier) initUploadId() (string, error) {
  593. request := c.copyFileRequest
  594. input := &CreateMultipartUploadInput{
  595. Bucket: request.Bucket,
  596. Key: request.Key,
  597. ACL: request.ACL,
  598. CacheControl: request.CacheControl,
  599. ContentDisposition: request.ContentDisposition,
  600. ContentEncoding: request.ContentEncoding,
  601. ContentType: request.ContentType,
  602. Expires: request.Expires,
  603. Metadata: request.Metadata,
  604. StorageClass: request.StorageClass,
  605. Tagging: request.Tagging,
  606. ForbidOverwrite: request.ForbidOverwrite,
  607. GrantRead: request.GrantRead,
  608. GrantFullControl: request.GrantFullControl,
  609. ServerSideEncryption: request.ServerSideEncryption,
  610. SSECustomerAlgorithm: request.SSECustomerAlgorithm,
  611. SSECustomerKey: request.SSECustomerKey,
  612. SSECustomerKeyMD5: request.SSECustomerKeyMD5,
  613. }
  614. if !strings.EqualFold(aws.ToString(request.MetadataDirective), "REPLACE") {
  615. input.CacheControl = c.copyObjectMeta[HTTPHeaderCacheControl]
  616. input.ContentDisposition = c.copyObjectMeta[HTTPHeaderContentDisposition]
  617. input.ContentEncoding = c.copyObjectMeta[HTTPHeaderContentEncoding]
  618. input.ContentType = c.copyObjectMeta[HTTPHeaderContentType]
  619. expires, err := time.Parse("Mon, 02 Jan 2006 15:04:05 GMT", aws.ToString(c.copyObjectMeta[HTTPHeaderExpires]))
  620. if err == nil {
  621. input.Expires = aws.Time(expires)
  622. }
  623. metaData := map[string]*string{}
  624. for k, v := range c.copyObjectMeta {
  625. if strings.HasPrefix(strings.ToLower(k), MetaPrefix) {
  626. metaData[k] = v
  627. }
  628. }
  629. input.Metadata = metaData
  630. }
  631. if !strings.EqualFold(aws.ToString(request.TaggingDirective), "REPLACE") {
  632. taggingResp, err := c.client.GetObjectTaggingWithContext(c.context, &GetObjectTaggingInput{
  633. Bucket: request.SourceBucket,
  634. Key: request.SourceKey,
  635. })
  636. if err != nil {
  637. return "", err
  638. }
  639. tagStr := taggingResp.Tagging.ToString()
  640. if tagStr != "" {
  641. input.Tagging = aws.String(tagStr)
  642. }
  643. }
  644. if c.copyFileRequest.ACL == nil {
  645. aclResp, err := c.client.GetObjectACLWithContext(c.context, &GetObjectACLInput{
  646. Bucket: request.SourceBucket,
  647. Key: request.SourceKey,
  648. })
  649. if err != nil {
  650. return "", err
  651. }
  652. input.ACL = aws.String(GetCannedACL(aclResp.Grants))
  653. }
  654. if c.copyFileRequest.StorageClass == nil {
  655. input.StorageClass = c.copyObjectMeta[HTTPHeaderAmzStorageClass]
  656. }
  657. resp, err := c.client.CreateMultipartUploadWithContext(c.context, input)
  658. if err != nil {
  659. return "", err
  660. }
  661. return aws.ToString(resp.UploadID), nil
  662. }
  663. func (c *Copier) isUploadIdValid() bool {
  664. _, err := c.client.ListPartsWithContext(c.context, &ListPartsInput{
  665. Bucket: c.copyFileRequest.Bucket,
  666. Key: c.copyFileRequest.Key,
  667. UploadID: aws.String(c.copyCheckpoint.UploadId),
  668. })
  669. if err != nil && strings.Contains(err.Error(), "NoSuchUpload") {
  670. return false
  671. }
  672. return true
  673. }