resumable_copy.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package tos
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
  8. )
  9. func parseResumableCopyObjectPath(input *ResumableCopyObjectInput) {
  10. isDirRes := isDir(input.CheckpointFile)
  11. if isDirRes || input.CheckpointFile == "" {
  12. input.CheckpointFile = filepath.Clean(filepath.Join(input.CheckpointFile, fmt.Sprintf("%s.%s.%s.%s.%s", input.SrcBucket, input.SrcKey, input.SrcVersionID, input.Bucket, input.Key)))
  13. }
  14. }
  15. func loadExistCopyCheckPoint(ctx context.Context, cli *ClientV2, input *ResumableCopyObjectInput, headOutput *HeadObjectV2Output) (*copyObjectCheckpoint, bool) {
  16. checkpoint := &copyObjectCheckpoint{}
  17. var err error
  18. loadCheckPoint(input.CheckpointFile, checkpoint)
  19. if checkpoint.Valid(input, headOutput) {
  20. return checkpoint, true
  21. } else if checkpoint.Bucket != "" && checkpoint.Key != "" && checkpoint.UploadID != "" {
  22. // 尝试去 abort
  23. _, err = cli.AbortMultipartUpload(ctx,
  24. &AbortMultipartUploadInput{
  25. Bucket: checkpoint.Bucket,
  26. Key: checkpoint.Key,
  27. UploadID: checkpoint.UploadID})
  28. if err != nil && cli.logger != nil {
  29. cli.logger.Debug("fail to abort upload task: %s, err:%s", checkpoint.UploadID, err.Error())
  30. }
  31. }
  32. return nil, false
  33. }
  34. func getResumableCopyObjectCheckpoint(ctx context.Context, cli *ClientV2, input *ResumableCopyObjectInput, headOutput *HeadObjectV2Output, init func() (*copyObjectCheckpoint, error)) (checkpoint *copyObjectCheckpoint, err error) {
  35. if !input.EnableCheckpoint {
  36. return init()
  37. }
  38. parseResumableCopyObjectPath(input)
  39. checkpoint, exist := loadExistCopyCheckPoint(ctx, cli, input, headOutput)
  40. if exist {
  41. return checkpoint, nil
  42. }
  43. err = checkAndCreateDir(input.CheckpointFile)
  44. if err != nil {
  45. return nil, InvalidCheckpointFilePath.withCause(err)
  46. }
  47. file, err := os.Create(input.CheckpointFile)
  48. if err != nil {
  49. return nil, newTosClientError("tos: create checkpoint file failed", err)
  50. }
  51. _ = file.Close()
  52. checkpoint, err = init()
  53. if err != nil {
  54. return nil, err
  55. }
  56. err = checkpoint.WriteToFile()
  57. if err != nil {
  58. return nil, err
  59. }
  60. return
  61. }
  62. func initCopyPartsInfo(headOutput *HeadObjectV2Output, partSize int64) ([]copyPartInfo, error) {
  63. if headOutput.ContentLength == 0 {
  64. return []copyPartInfo{{
  65. PartNumber: 1,
  66. IsZeroSize: true,
  67. }}, nil
  68. }
  69. partCount := headOutput.ContentLength / partSize
  70. remainder := headOutput.ContentLength % partSize
  71. if remainder != 0 {
  72. partCount++
  73. }
  74. if partCount > 10000 {
  75. return nil, InvalidFilePartNum
  76. }
  77. parts := make([]copyPartInfo, 0, partCount)
  78. for i := int64(0); i < partCount; i++ {
  79. part := copyPartInfo{
  80. PartNumber: i + 1,
  81. CopySourceRangeStart: i * partSize,
  82. CopySourceRangeEnd: (i+1)*partSize - 1,
  83. CopySourceRange: fmt.Sprintf("bytes=%d-%d", i*partSize, (i+1)*partSize-1),
  84. }
  85. parts = append(parts, part)
  86. }
  87. if remainder != 0 {
  88. parts[partCount-1].CopySourceRangeEnd = (partCount-1)*partSize + remainder - 1
  89. parts[partCount-1].CopySourceRange = fmt.Sprintf("bytes=%d-%d", (partCount-1)*partSize, (partCount-1)*partSize+remainder-1)
  90. }
  91. return parts, nil
  92. }
  93. func initCopyCheckpoint(input *ResumableCopyObjectInput, headOutput *HeadObjectV2Output) (*copyObjectCheckpoint, error) {
  94. parts, err := initCopyPartsInfo(headOutput, input.PartSize)
  95. if err != nil {
  96. return nil, err
  97. }
  98. cp := &copyObjectCheckpoint{
  99. Bucket: input.Bucket,
  100. Key: input.Key,
  101. SrcBucket: input.SrcBucket,
  102. SrcVersionID: input.SrcVersionID,
  103. PartSize: input.PartSize,
  104. UploadID: "",
  105. CopySourceIfMatch: input.CopySourceIfMatch,
  106. CopySourceIfModifiedSince: input.CopySourceIfModifiedSince,
  107. CopySourceIfNoneMatch: input.CopySourceIfNoneMatch,
  108. CopySourceIfUnmodifiedSince: input.CopySourceIfUnmodifiedSince,
  109. CopySourceSSECAlgorithm: input.CopySourceSSECAlgorithm,
  110. CopySourceSSECKeyMD5: input.CopySourceSSECKeyMD5,
  111. SSECAlgorithm: input.SSECAlgorithm,
  112. SSECKeyMD5: input.SSECKeyMD5,
  113. EncodingType: input.EncodingType,
  114. CopySourceObjectInfo: objectInfo{
  115. Etag: headOutput.ETag,
  116. HashCrc64ecma: headOutput.HashCrc64ecma,
  117. LastModified: headOutput.LastModified,
  118. ObjectSize: headOutput.ContentLength,
  119. },
  120. PartsInfo: parts,
  121. CheckpointPath: input.CheckpointFile,
  122. }
  123. return cp, nil
  124. }
  125. func prepareCopyTasks(cli *ClientV2, ctx context.Context, checkpoint *copyObjectCheckpoint, input *ResumableCopyObjectInput) []task {
  126. tasks := make([]task, 0)
  127. for _, part := range checkpoint.PartsInfo {
  128. if !part.IsCompleted {
  129. tasks = append(tasks, &copyTask{
  130. cli: cli,
  131. ctx: ctx,
  132. input: input,
  133. UploadID: checkpoint.UploadID,
  134. PartNumber: part.PartNumber,
  135. PartInfo: part,
  136. })
  137. }
  138. }
  139. return tasks
  140. }
  141. func (cli *ClientV2) copyPart(ctx context.Context, cp *copyObjectCheckpoint, input *ResumableCopyObjectInput, event *copyEvent) (*ResumableCopyObjectOutput, error) {
  142. tasks := prepareCopyTasks(cli, ctx, cp, input)
  143. routinesNum := min(input.TaskNum, len(tasks))
  144. cancelHandle := getCancelHandle(input.CancelHook)
  145. tg := newTaskGroup(cancelHandle, routinesNum, cp, event, input.EnableCheckpoint, tasks)
  146. abort := func() error {
  147. _, err := cli.AbortMultipartUpload(ctx,
  148. &AbortMultipartUploadInput{
  149. Bucket: input.Bucket,
  150. Key: input.Key,
  151. UploadID: cp.UploadID})
  152. return err
  153. }
  154. bindCancelHookWithAborter(input.CancelHook, abort)
  155. tg.RunWorker()
  156. tg.Scheduler()
  157. success, taskErr := tg.Wait()
  158. if taskErr != nil {
  159. if err := abort(); err != nil {
  160. return nil, err
  161. }
  162. return nil, taskErr
  163. }
  164. // handle results
  165. if success < len(tasks) {
  166. return nil, newTosClientError("tos: some tasks copy failed.", nil)
  167. }
  168. complete, err := cli.CompleteMultipartUploadV2(ctx, &CompleteMultipartUploadV2Input{
  169. Bucket: input.Bucket,
  170. Key: input.Key,
  171. UploadID: cp.UploadID,
  172. Parts: cp.GetParts(),
  173. })
  174. if err != nil {
  175. event.postCopyEvent(&CopyEvent{
  176. Type: enum.CopyEventCompleteMultipartUploadFailed,
  177. Err: err,
  178. Bucket: input.Bucket,
  179. Key: input.Key,
  180. UploadID: &cp.UploadID,
  181. SrcBucket: input.SrcBucket,
  182. SrcKey: input.SrcKey,
  183. SrcVersionID: input.SrcVersionID,
  184. CheckpointFile: &input.CheckpointFile,
  185. })
  186. return nil, err
  187. }
  188. event.postCopyEvent(&CopyEvent{
  189. Type: enum.CopyEventCompleteMultipartUploadSucceed,
  190. Err: err,
  191. Bucket: input.Bucket,
  192. Key: input.Key,
  193. UploadID: &cp.UploadID,
  194. SrcBucket: input.SrcBucket,
  195. SrcKey: input.SrcKey,
  196. SrcVersionID: input.SrcVersionID,
  197. CheckpointFile: &input.CheckpointFile,
  198. })
  199. _ = os.Remove(input.CheckpointFile)
  200. return &ResumableCopyObjectOutput{
  201. RequestInfo: complete.RequestInfo,
  202. Bucket: complete.Bucket,
  203. Key: complete.Key,
  204. UploadID: cp.UploadID,
  205. Etag: complete.ETag,
  206. Location: complete.Location,
  207. VersionID: complete.VersionID,
  208. HashCrc64ecma: complete.HashCrc64ecma,
  209. SSECAlgorithm: cp.SSECAlgorithm,
  210. SSECKeyMD5: cp.SSECKeyMD5,
  211. EncodingType: cp.EncodingType,
  212. }, nil
  213. }
  214. func (cli *ClientV2) ResumableCopyObject(ctx context.Context, input *ResumableCopyObjectInput) (*ResumableCopyObjectOutput, error) {
  215. rawInput := *input
  216. copyInput := &rawInput
  217. headOutput, err := cli.HeadObjectV2(ctx, &HeadObjectV2Input{
  218. Bucket: copyInput.SrcBucket,
  219. Key: copyInput.SrcKey,
  220. VersionID: copyInput.SrcVersionID,
  221. SSECAlgorithm: copyInput.CopySourceSSECAlgorithm,
  222. SSECKey: copyInput.CopySourceSSECKey,
  223. SSECKeyMD5: copyInput.CopySourceSSECKeyMD5,
  224. IfModifiedSince: copyInput.CopySourceIfModifiedSince,
  225. IfNoneMatch: copyInput.CopySourceIfNoneMatch,
  226. IfUnmodifiedSince: copyInput.CopySourceIfUnmodifiedSince,
  227. IfMatch: copyInput.CopySourceIfMatch,
  228. })
  229. if err != nil {
  230. return nil, err
  231. }
  232. event := &copyEvent{input: copyInput}
  233. init := func() (*copyObjectCheckpoint, error) {
  234. return initCopyCheckpoint(copyInput, headOutput)
  235. }
  236. cp, err := getResumableCopyObjectCheckpoint(ctx, cli, copyInput, headOutput, init)
  237. if err != nil {
  238. return nil, err
  239. }
  240. if cp.UploadID == "" {
  241. created, err := cli.CreateMultipartUploadV2(ctx, &copyInput.CreateMultipartUploadV2Input)
  242. if err != nil {
  243. event.postCopyEvent(&CopyEvent{
  244. Type: enum.CopyEventCreateMultipartUploadFailed,
  245. Err: err,
  246. Bucket: copyInput.Bucket,
  247. Key: copyInput.Key,
  248. SrcBucket: copyInput.SrcBucket,
  249. SrcKey: copyInput.SrcKey,
  250. SrcVersionID: copyInput.SrcVersionID,
  251. CheckpointFile: &copyInput.CheckpointFile,
  252. })
  253. return nil, err
  254. }
  255. event.uploadID = created.UploadID
  256. event.postCopyEvent(&CopyEvent{
  257. Type: enum.CopyEventCreateMultipartUploadSucceed,
  258. Bucket: copyInput.Bucket,
  259. Key: copyInput.Key,
  260. SrcBucket: copyInput.SrcBucket,
  261. SrcKey: copyInput.SrcKey,
  262. SrcVersionID: copyInput.SrcVersionID,
  263. CheckpointFile: &copyInput.CheckpointFile,
  264. })
  265. cp.UploadID = created.UploadID
  266. }
  267. cleaner := func() {
  268. _ = os.Remove(copyInput.CheckpointFile)
  269. }
  270. bindCancelHookWithCleaner(copyInput.CancelHook, cleaner)
  271. return cli.copyPart(ctx, cp, copyInput, event)
  272. }