| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- package tos
- import (
- "context"
- "crypto/md5"
- "encoding/base64"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "path/filepath"
- "strings"
- "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
- )
- func getDownloadCheckpoint(input *DownloadFileInput, init func(input *HeadObjectV2Output) (*downloadCheckpoint, error), output *HeadObjectV2Output) (checkpoint *downloadCheckpoint, err error) {
- enabled := input.EnableCheckpoint
- checkpointPath := input.CheckpointFile
- if !enabled {
- return init(output)
- }
- checkpoint = &downloadCheckpoint{}
- loadCheckPoint(checkpointPath, checkpoint)
- if checkpoint.Valid(input, output) {
- return
- }
- parentDir := filepath.Dir(checkpointPath)
- stat, err := os.Stat(parentDir)
- if err != nil {
- err = os.MkdirAll(parentDir, os.ModePerm)
- if err != nil {
- return nil, newTosClientError(err.Error(), err)
- }
- } else if !stat.IsDir() {
- return nil, newTosClientError("Fail to create folder due to a same file exists.", nil)
- }
- file, err := os.Create(checkpointPath)
- if err != nil {
- return nil, newTosClientError(err.Error(), err)
- }
- _ = file.Close()
- checkpoint, err = init(output)
- if err != nil {
- return nil, err
- }
- err = checkpoint.WriteToFile()
- if err != nil {
- return nil, err
- }
- return
- }
- func (cli *ClientV2) DownloadFile(ctx context.Context, input *DownloadFileInput) (*DownloadFileOutput, error) {
- err := validateDownloadInput(input, cli.isCustomDomain)
- if err != nil {
- return nil, err
- }
- headOutput, err := cli.HeadObjectV2(ctx, &input.HeadObjectV2Input)
- if err != nil {
- return nil, err
- }
- needDownload, err := parseDownloadFilePath(input)
- if err != nil {
- return nil, err
- }
- if !needDownload {
- return &DownloadFileOutput{*headOutput}, nil
- }
- event := downloadEvent{input: input}
- init := func(output *HeadObjectV2Output) (*downloadCheckpoint, error) {
- err := createDownloadTempFile(input, event)
- if err != nil {
- return nil, err
- }
- return initDownloadCheckpoint(input, headOutput)
- }
- checkpoint, err := getDownloadCheckpoint(input, init, headOutput)
- if err != nil {
- return nil, err
- }
- cleaner := func() {
- _ = os.Remove(input.CheckpointFile)
- _ = os.Remove(input.tempFile)
- }
- bindCancelHookWithCleaner(input.CancelHook, cleaner)
- return cli.downloadFile(ctx, headOutput, checkpoint, input, event)
- }
- // loadCheckPoint load UploadFile checkpoint or DownloadFile checkpoint.
- // checkpoint must be a pointer
- func loadCheckPoint(path string, checkpoint interface{}) {
- contents, err := ioutil.ReadFile(path)
- if err != nil && !os.IsNotExist(err) {
- return
- }
- if len(contents) == 0 {
- return
- }
- json.Unmarshal(contents, &checkpoint)
- }
- func isDir(filePath string) bool {
- stat, err := os.Stat(filePath)
- if err != nil {
- _, fileName := filepath.Split(filePath)
- return fileName == ""
- }
- return stat.IsDir()
- }
- // if file is a directory, append suffix to it to make a file name
- func withSuffixIfDir(filePath string, suffix string) string {
- if isDir(filePath) {
- return filepath.Clean(filepath.Join(filePath, suffix))
- }
- return filePath
- }
- func getDownloadCheckPointPath(checkpointPath, filePath, bucket, key, versionId string) string {
- fileName := strings.Join([]string{filepath.Base(filePath), checkpointPathMd5(bucket, key, versionId), "download"}, ".")
- if len(checkpointPath) == 0 {
- dirName := filepath.Dir(filePath)
- return filepath.Clean(filepath.Join(dirName, fileName))
- }
- return withSuffixIfDir(checkpointPath, fileName)
- }
- func checkpointPathMd5(bucket string, key string, versionId string) string {
- var data []byte
- if versionId != "" {
- data = []byte(strings.Join([]string{bucket, key, versionId}, "."))
- } else {
- data = []byte(strings.Join([]string{bucket, key}, "."))
- }
- r := md5.Sum(data)
- return base64.URLEncoding.EncodeToString(r[:])
- }
- func parseDownloadFilePath(input *DownloadFileInput) (needDownloadFile bool, err error) {
- input.filePath = input.FilePath
- inputFile := input.filePath
- isDirRes := isDir(input.filePath)
- if isDirRes {
- input.filePath = filepath.Clean(filepath.Join(input.filePath, input.Key))
- }
- input.tempFile = input.filePath + TempFileSuffix
- if input.EnableCheckpoint {
- input.CheckpointFile = getDownloadCheckPointPath(input.CheckpointFile, input.filePath, input.Bucket, input.Key, input.VersionID)
- }
- if isDirRes && strings.HasSuffix(input.Key, "/") {
- err := os.MkdirAll(filepath.Join(inputFile, input.Key), os.ModePerm)
- if err != nil {
- return false, InvalidFilePath.withCause(err)
- }
- return false, nil
- }
- return true, nil
- }
- func validateDownloadInput(input *DownloadFileInput, isCustomDomain bool) error {
- if err := isValidNames(input.Bucket, input.Key, isCustomDomain); err != nil {
- return err
- }
- if input.PartSize == 0 {
- input.PartSize = DefaultPartSize
- }
- if input.PartSize < MinPartSize || input.PartSize > MaxPartSize {
- return newTosClientError("The input part size is invalid, please set it range from 5MB to 5GB", nil)
- }
- if input.TaskNum < 1 {
- input.TaskNum = 1
- }
- if input.TaskNum > 1000 {
- input.TaskNum = 1000
- }
- return nil
- }
- func initDownloadCheckpoint(input *DownloadFileInput, headOutput *HeadObjectV2Output) (*downloadCheckpoint, error) {
- partsNum := headOutput.ContentLength / input.PartSize
- remainder := headOutput.ContentLength % input.PartSize
- if remainder != 0 {
- partsNum++
- }
- parts := make([]downloadPartInfo, partsNum)
- for i := int64(0); i < partsNum; i++ {
- parts[i] = downloadPartInfo{
- PartNumber: int(i + 1),
- RangeStart: i * input.PartSize,
- RangeEnd: (i+1)*input.PartSize - 1,
- }
- }
- if remainder != 0 {
- parts[partsNum-1].RangeEnd = (partsNum-1)*input.PartSize + remainder - 1
- }
- if len(parts) > 10000 {
- return nil, newTosClientError("tos: part count too many", nil)
- }
- return &downloadCheckpoint{
- checkpointPath: input.CheckpointFile,
- Bucket: input.Bucket,
- Key: input.Key,
- VersionID: input.VersionID,
- PartSize: input.PartSize,
- IfMatch: input.IfMatch,
- IfModifiedSince: input.IfModifiedSince,
- IfNoneMatch: input.IfNoneMatch,
- IfUnmodifiedSince: input.IfUnmodifiedSince,
- SSECAlgorithm: input.SSECAlgorithm,
- SSECKeyMD5: input.SSECKey,
- ObjectInfo: objectInfo{
- Etag: headOutput.ETag,
- HashCrc64ecma: headOutput.HashCrc64ecma,
- LastModified: headOutput.LastModified,
- ObjectSize: headOutput.ContentLength,
- },
- FileInfo: downloadFileInfo{
- FilePath: input.filePath,
- TempFilePath: input.tempFile,
- },
- PartsInfo: parts,
- }, nil
- }
- func checkAndCreateDir(filePath string) error {
- dir := filepath.Dir(filePath)
- stat, err := os.Stat(dir)
- if err != nil {
- err = os.MkdirAll(dir, os.ModePerm)
- if err != nil {
- return err
- }
- } else if !stat.IsDir() {
- return fmt.Errorf("dir name same as file name. ")
- }
- return nil
- }
- func createDownloadTempFile(input *DownloadFileInput, event downloadEvent) error {
- wrapErr := func(err error) error {
- event.postDownloadEvent(&DownloadEvent{
- Type: enum.DownloadEventCreateTempFileFailed,
- Bucket: input.Bucket,
- Key: input.Key,
- VersionID: input.VersionID,
- FilePath: input.filePath,
- TempFilePath: &input.tempFile,
- CheckpointFile: &input.CheckpointFile,
- })
- return newTosClientError("tos: create temp file failed.", err)
- }
- err := checkAndCreateDir(input.tempFile)
- if err != nil {
- return wrapErr(err)
- }
- file, err := os.Create(input.tempFile)
- if err != nil {
- return wrapErr(err)
- }
- _ = file.Close()
- event.postDownloadEvent(&DownloadEvent{
- Type: enum.DownloadEventCreateTempFileSucceed,
- Bucket: input.Bucket,
- Key: input.Key,
- VersionID: input.VersionID,
- FilePath: input.filePath,
- TempFilePath: &input.tempFile,
- CheckpointFile: &input.CheckpointFile,
- })
- return nil
- }
- func getDownloadTasks(cli *ClientV2, ctx context.Context, headOutput *HeadObjectV2Output,
- checkpoint *downloadCheckpoint, input *DownloadFileInput) []task {
- tasks := make([]task, 0)
- consumed := int64(0)
- subtotal := int64(0)
- for _, part := range checkpoint.PartsInfo {
- if !part.IsCompleted {
- tasks = append(tasks, &downloadTask{
- cli: cli,
- ctx: ctx,
- input: input,
- partNumber: part.PartNumber,
- rangeStart: part.RangeStart,
- rangeEnd: part.RangeEnd,
- consumed: &consumed,
- subtotal: &subtotal,
- total: headOutput.ContentLength,
- enableCRC64: cli.enableCRC,
- })
- } else {
- consumed += part.RangeEnd - part.RangeStart + 1
- }
- }
- return tasks
- }
- func (d downloadEvent) newDownloadEvent() *DownloadEvent {
- return &DownloadEvent{
- Bucket: d.input.Bucket,
- Key: d.input.Key,
- VersionID: d.input.VersionID,
- FilePath: d.input.filePath,
- CheckpointFile: &d.input.CheckpointFile,
- TempFilePath: &d.input.tempFile,
- }
- }
- func (d downloadEvent) newDownloadPartSucceedEvent(part downloadPartInfo) *DownloadEvent {
- event := d.newSucceedEvent(enum.DownloadEventDownloadPartSucceed)
- event.DowloadPartInfo = &DownloadPartInfo{
- PartNumber: part.PartNumber,
- RangeStart: part.RangeStart,
- RangeEnd: part.RangeEnd,
- }
- return event
- }
- func (d downloadEvent) newSucceedEvent(eventType enum.DownloadEventType) *DownloadEvent {
- event := d.newDownloadEvent()
- event.Type = eventType
- return event
- }
- func (d downloadEvent) newFailedEvent(err error, eventType enum.DownloadEventType) *DownloadEvent {
- event := d.newDownloadEvent()
- event.Type = eventType
- event.Err = err
- return event
- }
- func (d downloadEvent) postDownloadEvent(event *DownloadEvent) {
- if d.input.DownloadEventListener != nil {
- d.input.DownloadEventListener.EventChange(event)
- }
- }
- func (cli *ClientV2) downloadFile(ctx context.Context,
- headOutput *HeadObjectV2Output, checkpoint *downloadCheckpoint, input *DownloadFileInput, event downloadEvent) (*DownloadFileOutput, error) {
- // prepare tasks
- tasks := getDownloadTasks(cli, ctx, headOutput, checkpoint, input)
- routinesNum := min(input.TaskNum, len(tasks))
- tg := newTaskGroup(getCancelHandle(input.CancelHook), routinesNum, checkpoint, event, input.EnableCheckpoint, tasks)
- tg.RunWorker()
- // start adding tasks
- postDataTransferStatus(input.DataTransferListener, &DataTransferStatus{
- Type: enum.DataTransferStarted,
- })
- tg.Scheduler()
- success, err := tg.Wait()
- if err != nil {
- _ = os.Remove(input.tempFile)
- }
- if success < len(tasks) {
- return nil, newTosClientError("tos: some download task failed.", nil)
- }
- // Check CRC64
- if cli.enableCRC && headOutput.HashCrc64ecma != 0 && combineCRCInDownload(checkpoint.PartsInfo) != headOutput.HashCrc64ecma {
- return nil, newTosClientError("tos: crc of entire file mismatch.", nil)
- }
- err = os.Rename(input.tempFile, input.filePath)
- if err != nil {
- event.postDownloadEvent(event.newFailedEvent(err, enum.DownloadEventRenameTempFileFailed))
- return nil, err
- }
- event.postDownloadEvent(event.newSucceedEvent(enum.DownloadEventRenameTempFileSucceed))
- _ = os.Remove(checkpoint.checkpointPath)
- return &DownloadFileOutput{*headOutput}, nil
- }
|