| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- package tos
- import (
- "context"
- "os"
- "path/filepath"
- "strings"
- "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
- )
- // initUploadPartsInfo initialize parts info from file stat,return TosClientError if failed
- func initUploadPartsInfo(uploadFileStat os.FileInfo, partSize int64) ([]uploadPartInfo, error) {
- partCount := uploadFileStat.Size() / partSize
- lastPartSize := uploadFileStat.Size() % partSize
- if lastPartSize != 0 {
- partCount++
- }
- if partCount > 10000 {
- return nil, InvalidFilePartNum
- }
- parts := make([]uploadPartInfo, 0, partCount)
- for i := int64(0); i < partCount; i++ {
- part := uploadPartInfo{
- PartNumber: int(i + 1),
- PartSize: partSize,
- Offset: uint64(i * partSize),
- }
- parts = append(parts, part)
- }
- if lastPartSize != 0 {
- parts[partCount-1].PartSize = lastPartSize
- }
- if uploadFileStat.Size() == 0 {
- parts = append(parts, uploadPartInfo{PartNumber: 1, PartSize: 0, Offset: 0})
- }
- return parts, nil
- }
- // initUploadCheckpoint initialize checkpoint file, return TosClientError if failed
- func initUploadCheckpoint(input *UploadFileInput, stat os.FileInfo) (*uploadCheckpoint, error) {
- parts, err := initUploadPartsInfo(stat, input.PartSize)
- if err != nil {
- return nil, err
- }
- checkPoint := &uploadCheckpoint{
- checkpointPath: input.CheckpointFile,
- PartsInfo: parts,
- Bucket: input.Bucket,
- Key: input.Key,
- PartSize: input.PartSize,
- SSECAlgorithm: input.SSECAlgorithm,
- SSECKeyMD5: input.SSECKeyMD5,
- EncodingType: input.ContentEncoding,
- FilePath: input.FilePath,
- FileInfo: fileInfo{
- Size: stat.Size(),
- LastModified: stat.ModTime().Unix(),
- },
- }
- return checkPoint, nil
- }
- func getUploadCheckpointFilePath(checkpointPath, filePath string, bucket, key string) string {
- fileName := strings.Join([]string{filepath.Base(filePath), checkpointPathMd5(bucket, key, ""), "upload"}, ".")
- if len(checkpointPath) == 0 {
- dirName := filepath.Dir(filePath)
- return filepath.Join(dirName, fileName)
- }
- return withSuffixIfDir(checkpointPath, fileName)
- }
- // validateUploadInput validate upload input, return TosClientError failed
- func validateUploadInput(input *UploadFileInput, stat os.FileInfo, isCustomDomain bool) error {
- if err := isValidNames(input.Bucket, input.Key, isCustomDomain); err != nil {
- return err
- }
- if input.PartSize == 0 {
- input.PartSize = DefaultPartSize
- }
- if input.PartSize < MinPartSize || input.PartSize > MaxPartSize {
- return InvalidPartSize
- }
- if stat.IsDir() {
- return newTosClientError("tos: does not support directory, please specific your file path.", nil)
- }
- if input.EnableCheckpoint {
- // get correct checkpoint path
- input.CheckpointFile = getUploadCheckpointFilePath(input.CheckpointFile, input.FilePath, input.Bucket, input.Key)
- }
- if input.TaskNum < 1 {
- input.TaskNum = 1
- }
- if input.TaskNum > 1000 {
- input.TaskNum = 1000
- }
- return nil
- }
- func (u *uploadPostEvent) postUploadEvent(event *UploadEvent) {
- if u.input.UploadEventListener != nil {
- u.input.UploadEventListener.EventChange(event)
- }
- }
- func loadExistUploadCheckPoint(ctx context.Context, cli *ClientV2, input *UploadFileInput, srcFile os.FileInfo) (*uploadCheckpoint, bool) {
- checkpoint := &uploadCheckpoint{}
- var err error
- loadCheckPoint(input.CheckpointFile, checkpoint)
- if checkpoint.Valid(srcFile, input.Bucket, input.Key, input.FilePath) {
- return checkpoint, true
- } else if checkpoint.Bucket != "" && checkpoint.Key != "" && checkpoint.UploadID != "" {
- // 尝试去 abort
- _, err = cli.AbortMultipartUpload(ctx,
- &AbortMultipartUploadInput{
- Bucket: checkpoint.Bucket,
- Key: checkpoint.Key,
- UploadID: checkpoint.UploadID})
- if err != nil && cli.logger != nil {
- cli.logger.Debug("fail to abort upload task: %s, err:%s", checkpoint.UploadID, err.Error())
- }
- }
- return nil, false
- }
- // getUploadCheckpoint get struct checkpoint from checkpoint file if checkpointPath is valid,
- // or initialize from scratch with function init
- func getUploadCheckpoint(ctx context.Context, cli *ClientV2, input *UploadFileInput, srcFile os.FileInfo, init func() (*uploadCheckpoint, error)) (checkpoint *uploadCheckpoint, err error) {
- if !input.EnableCheckpoint {
- return init()
- }
- checkpoint, exist := loadExistUploadCheckPoint(ctx, cli, input, srcFile)
- if exist {
- return checkpoint, nil
- }
- err = checkAndCreateDir(input.CheckpointFile)
- if err != nil {
- return nil, InvalidCheckpointFilePath.withCause(err)
- }
- file, err := os.Create(input.CheckpointFile)
- if err != nil {
- return nil, newTosClientError("tos: create checkpoint file failed", err)
- }
- _ = file.Close()
- checkpoint, err = init()
- if err != nil {
- return nil, err
- }
- err = checkpoint.WriteToFile()
- if err != nil {
- return nil, err
- }
- return
- }
- func bindCancelHookWithAborter(hook CancelHook, aborter func() error) {
- if hook == nil {
- return
- }
- cancel := hook.(*canceler)
- cancel.aborter = aborter
- }
- func bindCancelHookWithCleaner(hook CancelHook, cleaner func()) {
- if hook == nil {
- return
- }
- cancel := hook.(*canceler)
- cancel.cleaner = cleaner
- }
- func (cli *ClientV2) UploadFile(ctx context.Context, input *UploadFileInput) (output *UploadFileOutput, err error) {
- // avoid modifying on origin pointer
- input = &(*input)
- stat, err := os.Stat(input.FilePath)
- if err != nil {
- return nil, InvalidSrcFilePath
- }
- if err = validateUploadInput(input, stat, cli.isCustomDomain); err != nil {
- return nil, err
- }
- init := func() (*uploadCheckpoint, error) {
- return initUploadCheckpoint(input, stat)
- }
- // if the checkpoint file not exist, here we will create it
- checkpoint, err := getUploadCheckpoint(ctx, cli, input, stat, init)
- if err != nil {
- return nil, err
- }
- event := &uploadPostEvent{
- input: input,
- checkPoint: checkpoint,
- }
- if checkpoint.UploadID == "" {
- // create multipart upload task
- created, err := cli.CreateMultipartUploadV2(ctx, &input.CreateMultipartUploadV2Input)
- if err != nil {
- event.postUploadEvent(&UploadEvent{
- Type: enum.UploadEventCreateMultipartUploadFailed,
- Err: err,
- Bucket: input.Bucket,
- Key: input.Key,
- CheckpointFile: &input.CheckpointFile,
- })
- return nil, err
- }
- event.postUploadEvent(&UploadEvent{
- Type: enum.UploadEventCreateMultipartUploadSucceed,
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: &created.UploadID,
- CheckpointFile: &input.CheckpointFile,
- })
- checkpoint.UploadID = created.UploadID
- }
- cleaner := func() {
- _ = os.Remove(input.CheckpointFile)
- }
- event.checkPoint = checkpoint
- bindCancelHookWithCleaner(input.CancelHook, cleaner)
- return cli.uploadPart(ctx, checkpoint, input, event)
- }
- func prepareUploadTasks(cli *ClientV2, ctx context.Context, checkpoint *uploadCheckpoint, input *UploadFileInput) []task {
- tasks := make([]task, 0)
- consumed := int64(0)
- subtotal := int64(0)
- for _, part := range checkpoint.PartsInfo {
- if !part.IsCompleted {
- tasks = append(tasks, &uploadTask{
- cli: cli,
- ctx: ctx,
- input: input,
- total: checkpoint.FileInfo.Size,
- UploadID: checkpoint.UploadID,
- PartNumber: part.PartNumber,
- subtotal: &subtotal,
- consumed: &consumed,
- Offset: part.Offset,
- PartSize: part.PartSize,
- })
- } else {
- consumed += part.PartSize
- }
- }
- return tasks
- }
- func (u *uploadPostEvent) newUploadPartSucceedEvent(input *UploadFileInput, part uploadPartInfo) *UploadEvent {
- return &UploadEvent{
- Type: enum.UploadEventUploadPartSucceed,
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: part.uploadID,
- CheckpointFile: &input.CheckpointFile,
- UploadPartInfo: &UploadPartInfo{
- PartNumber: part.PartNumber,
- PartSize: part.PartSize,
- Offset: int64(part.Offset),
- ETag: &part.ETag,
- HashCrc64ecma: &part.HashCrc64ecma,
- },
- }
- }
- func (u *uploadPostEvent) newUploadPartAbortedEvent(input *UploadFileInput, uploadID string, err error) *UploadEvent {
- return &UploadEvent{
- Type: enum.UploadEventUploadPartAborted,
- Err: err,
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: &uploadID,
- CheckpointFile: &input.CheckpointFile,
- }
- }
- func (u *uploadPostEvent) newUploadPartFailedEvent(input *UploadFileInput, uploadID string, err error) *UploadEvent {
- return &UploadEvent{
- Type: enum.UploadEventUploadPartFailed,
- Err: err,
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: &uploadID,
- CheckpointFile: &input.CheckpointFile,
- }
- }
- func (u *uploadPostEvent) newCompleteMultipartUploadFailedEvent(input *UploadFileInput, uploadID string, err error) *UploadEvent {
- return &UploadEvent{
- Type: enum.UploadEventCompleteMultipartUploadFailed,
- Err: err,
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: &uploadID,
- CheckpointFile: &input.CheckpointFile,
- }
- }
- func newCompleteMultipartUploadSucceedEvent(input *UploadFileInput, uploadID string) *UploadEvent {
- return &UploadEvent{
- Type: enum.UploadEventCompleteMultipartUploadSucceed,
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: &uploadID,
- CheckpointFile: &input.CheckpointFile,
- }
- }
- func postDataTransferStatus(listener DataTransferListener, status *DataTransferStatus) {
- if listener != nil {
- listener.DataTransferStatusChange(status)
- }
- }
- func getCancelHandle(hook CancelHook) chan struct{} {
- if c, ok := hook.(*canceler); ok {
- return c.cancelHandle
- }
- return make(chan struct{})
- }
- func combineCRCInDownload(parts []downloadPartInfo) uint64 {
- if len(parts) == 0 {
- return 0
- }
- crc := parts[0].HashCrc64ecma
- for i := 1; i < len(parts); i++ {
- crc = CRC64Combine(crc, parts[i].HashCrc64ecma, uint64(parts[i].RangeEnd-parts[i].RangeStart+1))
- }
- return crc
- }
- // combineCRCInParts calculates the total CRC of continuous parts
- func combineCRCInParts(parts []uploadPartInfo) uint64 {
- if parts == nil || len(parts) == 0 {
- return 0
- }
- crc := parts[0].HashCrc64ecma
- for i := 1; i < len(parts); i++ {
- crc = CRC64Combine(crc, parts[i].HashCrc64ecma, uint64(parts[i].PartSize))
- }
- return crc
- }
- func (cli *ClientV2) uploadPart(ctx context.Context, checkpoint *uploadCheckpoint, input *UploadFileInput, event *uploadPostEvent) (*UploadFileOutput, error) {
- // prepare tasks
- // if amount of tasks >= 10000, err "tos: part count too many" will be raised.
- tasks := prepareUploadTasks(cli, ctx, checkpoint, input)
- routinesNum := min(input.TaskNum, len(tasks))
- cancelHandle := getCancelHandle(input.CancelHook)
- tg := newTaskGroup(cancelHandle, routinesNum, checkpoint, event, input.EnableCheckpoint, tasks)
- abort := func() error {
- _, err := cli.AbortMultipartUpload(ctx,
- &AbortMultipartUploadInput{
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: checkpoint.UploadID})
- _ = os.Remove(input.CheckpointFile)
- return err
- }
- bindCancelHookWithAborter(input.CancelHook, abort)
- tg.RunWorker()
- // start adding tasks
- postDataTransferStatus(input.DataTransferListener, &DataTransferStatus{
- TotalBytes: checkpoint.FileInfo.Size,
- Type: enum.DataTransferStarted,
- })
- tg.Scheduler()
- success, taskErr := tg.Wait()
- if taskErr != nil {
- if err := abort(); err != nil {
- return nil, err
- }
- return nil, taskErr
- }
- // handle results
- if success < len(tasks) {
- return nil, newTosClientError("tos: some upload tasks failed.", nil)
- }
- complete, err := cli.CompleteMultipartUploadV2(ctx, &CompleteMultipartUploadV2Input{
- Bucket: input.Bucket,
- Key: input.Key,
- UploadID: checkpoint.UploadID,
- Parts: checkpoint.GetParts(),
- })
- if err != nil {
- event.postUploadEvent(event.newCompleteMultipartUploadFailedEvent(input, checkpoint.UploadID, err))
- return nil, err
- }
- event.postUploadEvent(newCompleteMultipartUploadSucceedEvent(input, checkpoint.UploadID))
- if cli.enableCRC && complete.HashCrc64ecma != 0 && combineCRCInParts(checkpoint.PartsInfo) != complete.HashCrc64ecma {
- return nil, newTosClientError("tos: crc of entire file mismatch.", nil)
- }
- _ = os.Remove(input.CheckpointFile)
- return &UploadFileOutput{
- RequestInfo: complete.RequestInfo,
- Bucket: complete.Bucket,
- Key: complete.Key,
- UploadID: checkpoint.UploadID,
- ETag: complete.ETag,
- Location: complete.Location,
- VersionID: complete.VersionID,
- HashCrc64ecma: complete.HashCrc64ecma,
- SSECAlgorithm: checkpoint.SSECAlgorithm,
- SSECKeyMD5: checkpoint.SSECKeyMD5,
- EncodingType: checkpoint.EncodingType,
- }, nil
- }
|