download_file.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. package tos
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/base64"
  6. "encoding/json"
  7. "fmt"
  8. "io/ioutil"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
  13. )
  14. func getDownloadCheckpoint(input *DownloadFileInput, init func(input *HeadObjectV2Output) (*downloadCheckpoint, error), output *HeadObjectV2Output) (checkpoint *downloadCheckpoint, err error) {
  15. enabled := input.EnableCheckpoint
  16. checkpointPath := input.CheckpointFile
  17. if !enabled {
  18. return init(output)
  19. }
  20. checkpoint = &downloadCheckpoint{}
  21. loadCheckPoint(checkpointPath, checkpoint)
  22. if checkpoint.Valid(input, output) {
  23. return
  24. }
  25. parentDir := filepath.Dir(checkpointPath)
  26. stat, err := os.Stat(parentDir)
  27. if err != nil {
  28. err = os.MkdirAll(parentDir, os.ModePerm)
  29. if err != nil {
  30. return nil, newTosClientError(err.Error(), err)
  31. }
  32. } else if !stat.IsDir() {
  33. return nil, newTosClientError("Fail to create folder due to a same file exists.", nil)
  34. }
  35. file, err := os.Create(checkpointPath)
  36. if err != nil {
  37. return nil, newTosClientError(err.Error(), err)
  38. }
  39. _ = file.Close()
  40. checkpoint, err = init(output)
  41. if err != nil {
  42. return nil, err
  43. }
  44. err = checkpoint.WriteToFile()
  45. if err != nil {
  46. return nil, err
  47. }
  48. return
  49. }
  50. func (cli *ClientV2) DownloadFile(ctx context.Context, input *DownloadFileInput) (*DownloadFileOutput, error) {
  51. err := validateDownloadInput(input, cli.isCustomDomain)
  52. if err != nil {
  53. return nil, err
  54. }
  55. headOutput, err := cli.HeadObjectV2(ctx, &input.HeadObjectV2Input)
  56. if err != nil {
  57. return nil, err
  58. }
  59. needDownload, err := parseDownloadFilePath(input)
  60. if err != nil {
  61. return nil, err
  62. }
  63. if !needDownload {
  64. return &DownloadFileOutput{*headOutput}, nil
  65. }
  66. event := downloadEvent{input: input}
  67. init := func(output *HeadObjectV2Output) (*downloadCheckpoint, error) {
  68. err := createDownloadTempFile(input, event)
  69. if err != nil {
  70. return nil, err
  71. }
  72. return initDownloadCheckpoint(input, headOutput)
  73. }
  74. checkpoint, err := getDownloadCheckpoint(input, init, headOutput)
  75. if err != nil {
  76. return nil, err
  77. }
  78. cleaner := func() {
  79. _ = os.Remove(input.CheckpointFile)
  80. _ = os.Remove(input.tempFile)
  81. }
  82. bindCancelHookWithCleaner(input.CancelHook, cleaner)
  83. return cli.downloadFile(ctx, headOutput, checkpoint, input, event)
  84. }
  85. // loadCheckPoint load UploadFile checkpoint or DownloadFile checkpoint.
  86. // checkpoint must be a pointer
  87. func loadCheckPoint(path string, checkpoint interface{}) {
  88. contents, err := ioutil.ReadFile(path)
  89. if err != nil && !os.IsNotExist(err) {
  90. return
  91. }
  92. if len(contents) == 0 {
  93. return
  94. }
  95. json.Unmarshal(contents, &checkpoint)
  96. }
  97. func isDir(filePath string) bool {
  98. stat, err := os.Stat(filePath)
  99. if err != nil {
  100. _, fileName := filepath.Split(filePath)
  101. return fileName == ""
  102. }
  103. return stat.IsDir()
  104. }
  105. // if file is a directory, append suffix to it to make a file name
  106. func withSuffixIfDir(filePath string, suffix string) string {
  107. if isDir(filePath) {
  108. return filepath.Clean(filepath.Join(filePath, suffix))
  109. }
  110. return filePath
  111. }
  112. func getDownloadCheckPointPath(checkpointPath, filePath, bucket, key, versionId string) string {
  113. fileName := strings.Join([]string{filepath.Base(filePath), checkpointPathMd5(bucket, key, versionId), "download"}, ".")
  114. if len(checkpointPath) == 0 {
  115. dirName := filepath.Dir(filePath)
  116. return filepath.Clean(filepath.Join(dirName, fileName))
  117. }
  118. return withSuffixIfDir(checkpointPath, fileName)
  119. }
  120. func checkpointPathMd5(bucket string, key string, versionId string) string {
  121. var data []byte
  122. if versionId != "" {
  123. data = []byte(strings.Join([]string{bucket, key, versionId}, "."))
  124. } else {
  125. data = []byte(strings.Join([]string{bucket, key}, "."))
  126. }
  127. r := md5.Sum(data)
  128. return base64.URLEncoding.EncodeToString(r[:])
  129. }
  130. func parseDownloadFilePath(input *DownloadFileInput) (needDownloadFile bool, err error) {
  131. input.filePath = input.FilePath
  132. inputFile := input.filePath
  133. isDirRes := isDir(input.filePath)
  134. if isDirRes {
  135. input.filePath = filepath.Clean(filepath.Join(input.filePath, input.Key))
  136. }
  137. input.tempFile = input.filePath + TempFileSuffix
  138. if input.EnableCheckpoint {
  139. input.CheckpointFile = getDownloadCheckPointPath(input.CheckpointFile, input.filePath, input.Bucket, input.Key, input.VersionID)
  140. }
  141. if isDirRes && strings.HasSuffix(input.Key, "/") {
  142. err := os.MkdirAll(filepath.Join(inputFile, input.Key), os.ModePerm)
  143. if err != nil {
  144. return false, InvalidFilePath.withCause(err)
  145. }
  146. return false, nil
  147. }
  148. return true, nil
  149. }
  150. func validateDownloadInput(input *DownloadFileInput, isCustomDomain bool) error {
  151. if err := isValidNames(input.Bucket, input.Key, isCustomDomain); err != nil {
  152. return err
  153. }
  154. if input.PartSize == 0 {
  155. input.PartSize = DefaultPartSize
  156. }
  157. if input.PartSize < MinPartSize || input.PartSize > MaxPartSize {
  158. return newTosClientError("The input part size is invalid, please set it range from 5MB to 5GB", nil)
  159. }
  160. if input.TaskNum < 1 {
  161. input.TaskNum = 1
  162. }
  163. if input.TaskNum > 1000 {
  164. input.TaskNum = 1000
  165. }
  166. return nil
  167. }
  168. func initDownloadCheckpoint(input *DownloadFileInput, headOutput *HeadObjectV2Output) (*downloadCheckpoint, error) {
  169. partsNum := headOutput.ContentLength / input.PartSize
  170. remainder := headOutput.ContentLength % input.PartSize
  171. if remainder != 0 {
  172. partsNum++
  173. }
  174. parts := make([]downloadPartInfo, partsNum)
  175. for i := int64(0); i < partsNum; i++ {
  176. parts[i] = downloadPartInfo{
  177. PartNumber: int(i + 1),
  178. RangeStart: i * input.PartSize,
  179. RangeEnd: (i+1)*input.PartSize - 1,
  180. }
  181. }
  182. if remainder != 0 {
  183. parts[partsNum-1].RangeEnd = (partsNum-1)*input.PartSize + remainder - 1
  184. }
  185. if len(parts) > 10000 {
  186. return nil, newTosClientError("tos: part count too many", nil)
  187. }
  188. return &downloadCheckpoint{
  189. checkpointPath: input.CheckpointFile,
  190. Bucket: input.Bucket,
  191. Key: input.Key,
  192. VersionID: input.VersionID,
  193. PartSize: input.PartSize,
  194. IfMatch: input.IfMatch,
  195. IfModifiedSince: input.IfModifiedSince,
  196. IfNoneMatch: input.IfNoneMatch,
  197. IfUnmodifiedSince: input.IfUnmodifiedSince,
  198. SSECAlgorithm: input.SSECAlgorithm,
  199. SSECKeyMD5: input.SSECKey,
  200. ObjectInfo: objectInfo{
  201. Etag: headOutput.ETag,
  202. HashCrc64ecma: headOutput.HashCrc64ecma,
  203. LastModified: headOutput.LastModified,
  204. ObjectSize: headOutput.ContentLength,
  205. },
  206. FileInfo: downloadFileInfo{
  207. FilePath: input.filePath,
  208. TempFilePath: input.tempFile,
  209. },
  210. PartsInfo: parts,
  211. }, nil
  212. }
  213. func checkAndCreateDir(filePath string) error {
  214. dir := filepath.Dir(filePath)
  215. stat, err := os.Stat(dir)
  216. if err != nil {
  217. err = os.MkdirAll(dir, os.ModePerm)
  218. if err != nil {
  219. return err
  220. }
  221. } else if !stat.IsDir() {
  222. return fmt.Errorf("dir name same as file name. ")
  223. }
  224. return nil
  225. }
  226. func createDownloadTempFile(input *DownloadFileInput, event downloadEvent) error {
  227. wrapErr := func(err error) error {
  228. event.postDownloadEvent(&DownloadEvent{
  229. Type: enum.DownloadEventCreateTempFileFailed,
  230. Bucket: input.Bucket,
  231. Key: input.Key,
  232. VersionID: input.VersionID,
  233. FilePath: input.filePath,
  234. TempFilePath: &input.tempFile,
  235. CheckpointFile: &input.CheckpointFile,
  236. })
  237. return newTosClientError("tos: create temp file failed.", err)
  238. }
  239. err := checkAndCreateDir(input.tempFile)
  240. if err != nil {
  241. return wrapErr(err)
  242. }
  243. file, err := os.Create(input.tempFile)
  244. if err != nil {
  245. return wrapErr(err)
  246. }
  247. _ = file.Close()
  248. event.postDownloadEvent(&DownloadEvent{
  249. Type: enum.DownloadEventCreateTempFileSucceed,
  250. Bucket: input.Bucket,
  251. Key: input.Key,
  252. VersionID: input.VersionID,
  253. FilePath: input.filePath,
  254. TempFilePath: &input.tempFile,
  255. CheckpointFile: &input.CheckpointFile,
  256. })
  257. return nil
  258. }
  259. func getDownloadTasks(cli *ClientV2, ctx context.Context, headOutput *HeadObjectV2Output,
  260. checkpoint *downloadCheckpoint, input *DownloadFileInput) []task {
  261. tasks := make([]task, 0)
  262. consumed := int64(0)
  263. subtotal := int64(0)
  264. for _, part := range checkpoint.PartsInfo {
  265. if !part.IsCompleted {
  266. tasks = append(tasks, &downloadTask{
  267. cli: cli,
  268. ctx: ctx,
  269. input: input,
  270. partNumber: part.PartNumber,
  271. rangeStart: part.RangeStart,
  272. rangeEnd: part.RangeEnd,
  273. consumed: &consumed,
  274. subtotal: &subtotal,
  275. total: headOutput.ContentLength,
  276. enableCRC64: cli.enableCRC,
  277. })
  278. } else {
  279. consumed += part.RangeEnd - part.RangeStart + 1
  280. }
  281. }
  282. return tasks
  283. }
  284. func (d downloadEvent) newDownloadEvent() *DownloadEvent {
  285. return &DownloadEvent{
  286. Bucket: d.input.Bucket,
  287. Key: d.input.Key,
  288. VersionID: d.input.VersionID,
  289. FilePath: d.input.filePath,
  290. CheckpointFile: &d.input.CheckpointFile,
  291. TempFilePath: &d.input.tempFile,
  292. }
  293. }
  294. func (d downloadEvent) newDownloadPartSucceedEvent(part downloadPartInfo) *DownloadEvent {
  295. event := d.newSucceedEvent(enum.DownloadEventDownloadPartSucceed)
  296. event.DowloadPartInfo = &DownloadPartInfo{
  297. PartNumber: part.PartNumber,
  298. RangeStart: part.RangeStart,
  299. RangeEnd: part.RangeEnd,
  300. }
  301. return event
  302. }
  303. func (d downloadEvent) newSucceedEvent(eventType enum.DownloadEventType) *DownloadEvent {
  304. event := d.newDownloadEvent()
  305. event.Type = eventType
  306. return event
  307. }
  308. func (d downloadEvent) newFailedEvent(err error, eventType enum.DownloadEventType) *DownloadEvent {
  309. event := d.newDownloadEvent()
  310. event.Type = eventType
  311. event.Err = err
  312. return event
  313. }
  314. func (d downloadEvent) postDownloadEvent(event *DownloadEvent) {
  315. if d.input.DownloadEventListener != nil {
  316. d.input.DownloadEventListener.EventChange(event)
  317. }
  318. }
  319. func (cli *ClientV2) downloadFile(ctx context.Context,
  320. headOutput *HeadObjectV2Output, checkpoint *downloadCheckpoint, input *DownloadFileInput, event downloadEvent) (*DownloadFileOutput, error) {
  321. // prepare tasks
  322. tasks := getDownloadTasks(cli, ctx, headOutput, checkpoint, input)
  323. routinesNum := min(input.TaskNum, len(tasks))
  324. tg := newTaskGroup(getCancelHandle(input.CancelHook), routinesNum, checkpoint, event, input.EnableCheckpoint, tasks)
  325. tg.RunWorker()
  326. // start adding tasks
  327. postDataTransferStatus(input.DataTransferListener, &DataTransferStatus{
  328. Type: enum.DataTransferStarted,
  329. })
  330. tg.Scheduler()
  331. success, err := tg.Wait()
  332. if err != nil {
  333. _ = os.Remove(input.tempFile)
  334. }
  335. if success < len(tasks) {
  336. return nil, newTosClientError("tos: some download task failed.", nil)
  337. }
  338. // Check CRC64
  339. if cli.enableCRC && headOutput.HashCrc64ecma != 0 && combineCRCInDownload(checkpoint.PartsInfo) != headOutput.HashCrc64ecma {
  340. return nil, newTosClientError("tos: crc of entire file mismatch.", nil)
  341. }
  342. err = os.Rename(input.tempFile, input.filePath)
  343. if err != nil {
  344. event.postDownloadEvent(event.newFailedEvent(err, enum.DownloadEventRenameTempFileFailed))
  345. return nil, err
  346. }
  347. event.postDownloadEvent(event.newSucceedEvent(enum.DownloadEventRenameTempFileSucceed))
  348. _ = os.Remove(checkpoint.checkpointPath)
  349. return &DownloadFileOutput{*headOutput}, nil
  350. }