| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- package tos
- import (
- "context"
- "fmt"
- "os"
- "path/filepath"
- "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
- )
- func parseResumableCopyObjectPath(input *ResumableCopyObjectInput) {
- isDirRes := isDir(input.CheckpointFile)
- if isDirRes || input.CheckpointFile == "" {
- input.CheckpointFile = filepath.Clean(filepath.Join(input.CheckpointFile, fmt.Sprintf("%s.%s.%s.%s.%s", input.SrcBucket, input.SrcKey, input.SrcVersionID, input.Bucket, input.Key)))
- }
- }
- func loadExistCopyCheckPoint(ctx context.Context, cli *ClientV2, input *ResumableCopyObjectInput, headOutput *HeadObjectV2Output) (*copyObjectCheckpoint, bool) {
- checkpoint := ©ObjectCheckpoint{}
- var err error
- loadCheckPoint(input.CheckpointFile, checkpoint)
- if checkpoint.Valid(input, headOutput) {
- return checkpoint, true
- } else if checkpoint.Bucket != "" && checkpoint.Key != "" && checkpoint.UploadID != "" {
- // 尝试去 abort
- _, err = cli.AbortMultipartUpload(ctx,
- &AbortMultipartUploadInput{
- Bucket: checkpoint.Bucket,
- Key: checkpoint.Key,
- UploadID: checkpoint.UploadID})
- if err != nil && cli.logger != nil {
- cli.logger.Debug("fail to abort upload task: %s, err:%s", checkpoint.UploadID, err.Error())
- }
- }
- return nil, false
- }
- func getResumableCopyObjectCheckpoint(ctx context.Context, cli *ClientV2, input *ResumableCopyObjectInput, headOutput *HeadObjectV2Output, init func() (*copyObjectCheckpoint, error)) (checkpoint *copyObjectCheckpoint, err error) {
- if !input.EnableCheckpoint {
- return init()
- }
- parseResumableCopyObjectPath(input)
- checkpoint, exist := loadExistCopyCheckPoint(ctx, cli, input, headOutput)
- if exist {
- return checkpoint, nil
- }
- err = checkAndCreateDir(input.CheckpointFile)
- if err != nil {
- return nil, InvalidCheckpointFilePath.withCause(err)
- }
- file, err := os.Create(input.CheckpointFile)
- if err != nil {
- return nil, newTosClientError("tos: create checkpoint file failed", err)
- }
- _ = file.Close()
- checkpoint, err = init()
- if err != nil {
- return nil, err
- }
- err = checkpoint.WriteToFile()
- if err != nil {
- return nil, err
- }
- return
- }
- func initCopyPartsInfo(headOutput *HeadObjectV2Output, partSize int64) ([]copyPartInfo, error) {
- if headOutput.ContentLength == 0 {
- return []copyPartInfo{{
- PartNumber: 1,
- IsZeroSize: true,
- }}, nil
- }
- partCount := headOutput.ContentLength / partSize
- remainder := headOutput.ContentLength % partSize
- if remainder != 0 {
- partCount++
- }
- if partCount > 10000 {
- return nil, InvalidFilePartNum
- }
- parts := make([]copyPartInfo, 0, partCount)
- for i := int64(0); i < partCount; i++ {
- part := copyPartInfo{
- PartNumber: i + 1,
- CopySourceRangeStart: i * partSize,
- CopySourceRangeEnd: (i+1)*partSize - 1,
- CopySourceRange: fmt.Sprintf("bytes=%d-%d", i*partSize, (i+1)*partSize-1),
- }
- parts = append(parts, part)
- }
- if remainder != 0 {
- parts[partCount-1].CopySourceRangeEnd = (partCount-1)*partSize + remainder - 1
- parts[partCount-1].CopySourceRange = fmt.Sprintf("bytes=%d-%d", (partCount-1)*partSize, (partCount-1)*partSize+remainder-1)
- }
- return parts, nil
- }
- func initCopyCheckpoint(input *ResumableCopyObjectInput, headOutput *HeadObjectV2Output) (*copyObjectCheckpoint, error) {
- parts, err := initCopyPartsInfo(headOutput, input.PartSize)
- if err != nil {
- return nil, err
- }
- cp := ©ObjectCheckpoint{
- Bucket: input.Bucket,
- Key: input.Key,
- SrcBucket: input.SrcBucket,
- SrcVersionID: input.SrcVersionID,
- PartSize: input.PartSize,
- UploadID: "",
- CopySourceIfMatch: input.CopySourceIfMatch,
- CopySourceIfModifiedSince: input.CopySourceIfModifiedSince,
- CopySourceIfNoneMatch: input.CopySourceIfNoneMatch,
- CopySourceIfUnmodifiedSince: input.CopySourceIfUnmodifiedSince,
- CopySourceSSECAlgorithm: input.CopySourceSSECAlgorithm,
- CopySourceSSECKeyMD5: input.CopySourceSSECKeyMD5,
- SSECAlgorithm: input.SSECAlgorithm,
- SSECKeyMD5: input.SSECKeyMD5,
- EncodingType: input.EncodingType,
- CopySourceObjectInfo: objectInfo{
- Etag: headOutput.ETag,
- HashCrc64ecma: headOutput.HashCrc64ecma,
- LastModified: headOutput.LastModified,
- ObjectSize: headOutput.ContentLength,
- },
- PartsInfo: parts,
- CheckpointPath: input.CheckpointFile,
- }
- return cp, nil
- }
- func prepareCopyTasks(cli *ClientV2, ctx context.Context, checkpoint *copyObjectCheckpoint, input *ResumableCopyObjectInput) []task {
- tasks := make([]task, 0)
- for _, part := range checkpoint.PartsInfo {
- if !part.IsCompleted {
- tasks = append(tasks, ©Task{
- cli: cli,
- ctx: ctx,
- input: input,
- UploadID: checkpoint.UploadID,
- PartNumber: part.PartNumber,
- PartInfo: part,
- })
- }
- }
- return tasks
- }
- func (cli *ClientV2) copyPart(ctx context.Context, cp *copyObjectCheckpoint, input *ResumableCopyObjectInput, event *copyEvent) (*ResumableCopyObjectOutput, error) {
- tasks := prepareCopyTasks(cli, ctx, cp, input)
- routinesNum := min(input.TaskNum, len(tasks))
- cancelHandle := getCancelHandle(input.CancelHook)
- tg := newTaskGroup(cancelHandle, routinesNum, cp, event, input.EnableCheckpoint, tasks)
- abort := func() error {
- _, err := cli.AbortMultipartUpload(ctx,
- &AbortMultipartUploadInput{
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: cp.UploadID})
- return err
- }
- bindCancelHookWithAborter(input.CancelHook, abort)
- tg.RunWorker()
- tg.Scheduler()
- success, taskErr := tg.Wait()
- if taskErr != nil {
- if err := abort(); err != nil {
- return nil, err
- }
- return nil, taskErr
- }
- // handle results
- if success < len(tasks) {
- return nil, newTosClientError("tos: some tasks copy failed.", nil)
- }
- complete, err := cli.CompleteMultipartUploadV2(ctx, &CompleteMultipartUploadV2Input{
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: cp.UploadID,
- Parts: cp.GetParts(),
- })
- if err != nil {
- event.postCopyEvent(&CopyEvent{
- Type: enum.CopyEventCompleteMultipartUploadFailed,
- Err: err,
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: &cp.UploadID,
- SrcBucket: input.SrcBucket,
- SrcKey: input.SrcKey,
- SrcVersionID: input.SrcVersionID,
- CheckpointFile: &input.CheckpointFile,
- })
- return nil, err
- }
- event.postCopyEvent(&CopyEvent{
- Type: enum.CopyEventCompleteMultipartUploadSucceed,
- Err: err,
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: &cp.UploadID,
- SrcBucket: input.SrcBucket,
- SrcKey: input.SrcKey,
- SrcVersionID: input.SrcVersionID,
- CheckpointFile: &input.CheckpointFile,
- })
- _ = os.Remove(input.CheckpointFile)
- return &ResumableCopyObjectOutput{
- RequestInfo: complete.RequestInfo,
- Bucket: complete.Bucket,
- Key: complete.Key,
- UploadID: cp.UploadID,
- Etag: complete.ETag,
- Location: complete.Location,
- VersionID: complete.VersionID,
- HashCrc64ecma: complete.HashCrc64ecma,
- SSECAlgorithm: cp.SSECAlgorithm,
- SSECKeyMD5: cp.SSECKeyMD5,
- EncodingType: cp.EncodingType,
- }, nil
- }
- func (cli *ClientV2) ResumableCopyObject(ctx context.Context, input *ResumableCopyObjectInput) (*ResumableCopyObjectOutput, error) {
- rawInput := *input
- copyInput := &rawInput
- headOutput, err := cli.HeadObjectV2(ctx, &HeadObjectV2Input{
- Bucket: copyInput.SrcBucket,
- Key: copyInput.SrcKey,
- VersionID: copyInput.SrcVersionID,
- SSECAlgorithm: copyInput.CopySourceSSECAlgorithm,
- SSECKey: copyInput.CopySourceSSECKey,
- SSECKeyMD5: copyInput.CopySourceSSECKeyMD5,
- IfModifiedSince: copyInput.CopySourceIfModifiedSince,
- IfNoneMatch: copyInput.CopySourceIfNoneMatch,
- IfUnmodifiedSince: copyInput.CopySourceIfUnmodifiedSince,
- IfMatch: copyInput.CopySourceIfMatch,
- })
- if err != nil {
- return nil, err
- }
- event := ©Event{input: copyInput}
- init := func() (*copyObjectCheckpoint, error) {
- return initCopyCheckpoint(copyInput, headOutput)
- }
- cp, err := getResumableCopyObjectCheckpoint(ctx, cli, copyInput, headOutput, init)
- if err != nil {
- return nil, err
- }
- if cp.UploadID == "" {
- created, err := cli.CreateMultipartUploadV2(ctx, ©Input.CreateMultipartUploadV2Input)
- if err != nil {
- event.postCopyEvent(&CopyEvent{
- Type: enum.CopyEventCreateMultipartUploadFailed,
- Err: err,
- Bucket: copyInput.Bucket,
- Key: copyInput.Key,
- SrcBucket: copyInput.SrcBucket,
- SrcKey: copyInput.SrcKey,
- SrcVersionID: copyInput.SrcVersionID,
- CheckpointFile: ©Input.CheckpointFile,
- })
- return nil, err
- }
- event.uploadID = created.UploadID
- event.postCopyEvent(&CopyEvent{
- Type: enum.CopyEventCreateMultipartUploadSucceed,
- Bucket: copyInput.Bucket,
- Key: copyInput.Key,
- SrcBucket: copyInput.SrcBucket,
- SrcKey: copyInput.SrcKey,
- SrcVersionID: copyInput.SrcVersionID,
- CheckpointFile: ©Input.CheckpointFile,
- })
- cp.UploadID = created.UploadID
- }
- cleaner := func() {
- _ = os.Remove(copyInput.CheckpointFile)
- }
- bindCancelHookWithCleaner(copyInput.CancelHook, cleaner)
- return cli.copyPart(ctx, cp, copyInput, event)
- }
|