upload_file.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. package tos
  2. import (
  3. "context"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
  8. )
  9. // initUploadPartsInfo initialize parts info from file stat,return TosClientError if failed
  10. func initUploadPartsInfo(uploadFileStat os.FileInfo, partSize int64) ([]uploadPartInfo, error) {
  11. partCount := uploadFileStat.Size() / partSize
  12. lastPartSize := uploadFileStat.Size() % partSize
  13. if lastPartSize != 0 {
  14. partCount++
  15. }
  16. if partCount > 10000 {
  17. return nil, InvalidFilePartNum
  18. }
  19. parts := make([]uploadPartInfo, 0, partCount)
  20. for i := int64(0); i < partCount; i++ {
  21. part := uploadPartInfo{
  22. PartNumber: int(i + 1),
  23. PartSize: partSize,
  24. Offset: uint64(i * partSize),
  25. }
  26. parts = append(parts, part)
  27. }
  28. if lastPartSize != 0 {
  29. parts[partCount-1].PartSize = lastPartSize
  30. }
  31. if uploadFileStat.Size() == 0 {
  32. parts = append(parts, uploadPartInfo{PartNumber: 1, PartSize: 0, Offset: 0})
  33. }
  34. return parts, nil
  35. }
  36. // initUploadCheckpoint initialize checkpoint file, return TosClientError if failed
  37. func initUploadCheckpoint(input *UploadFileInput, stat os.FileInfo) (*uploadCheckpoint, error) {
  38. parts, err := initUploadPartsInfo(stat, input.PartSize)
  39. if err != nil {
  40. return nil, err
  41. }
  42. checkPoint := &uploadCheckpoint{
  43. checkpointPath: input.CheckpointFile,
  44. PartsInfo: parts,
  45. Bucket: input.Bucket,
  46. Key: input.Key,
  47. PartSize: input.PartSize,
  48. SSECAlgorithm: input.SSECAlgorithm,
  49. SSECKeyMD5: input.SSECKeyMD5,
  50. EncodingType: input.ContentEncoding,
  51. FilePath: input.FilePath,
  52. FileInfo: fileInfo{
  53. Size: stat.Size(),
  54. LastModified: stat.ModTime().Unix(),
  55. },
  56. }
  57. return checkPoint, nil
  58. }
  59. func getUploadCheckpointFilePath(checkpointPath, filePath string, bucket, key string) string {
  60. fileName := strings.Join([]string{filepath.Base(filePath), checkpointPathMd5(bucket, key, ""), "upload"}, ".")
  61. if len(checkpointPath) == 0 {
  62. dirName := filepath.Dir(filePath)
  63. return filepath.Join(dirName, fileName)
  64. }
  65. return withSuffixIfDir(checkpointPath, fileName)
  66. }
  67. // validateUploadInput validate upload input, return TosClientError failed
  68. func validateUploadInput(input *UploadFileInput, stat os.FileInfo, isCustomDomain bool) error {
  69. if err := isValidNames(input.Bucket, input.Key, isCustomDomain); err != nil {
  70. return err
  71. }
  72. if input.PartSize == 0 {
  73. input.PartSize = DefaultPartSize
  74. }
  75. if input.PartSize < MinPartSize || input.PartSize > MaxPartSize {
  76. return InvalidPartSize
  77. }
  78. if stat.IsDir() {
  79. return newTosClientError("tos: does not support directory, please specific your file path.", nil)
  80. }
  81. if input.EnableCheckpoint {
  82. // get correct checkpoint path
  83. input.CheckpointFile = getUploadCheckpointFilePath(input.CheckpointFile, input.FilePath, input.Bucket, input.Key)
  84. }
  85. if input.TaskNum < 1 {
  86. input.TaskNum = 1
  87. }
  88. if input.TaskNum > 1000 {
  89. input.TaskNum = 1000
  90. }
  91. return nil
  92. }
  93. func (u *uploadPostEvent) postUploadEvent(event *UploadEvent) {
  94. if u.input.UploadEventListener != nil {
  95. u.input.UploadEventListener.EventChange(event)
  96. }
  97. }
  98. func loadExistUploadCheckPoint(ctx context.Context, cli *ClientV2, input *UploadFileInput, srcFile os.FileInfo) (*uploadCheckpoint, bool) {
  99. checkpoint := &uploadCheckpoint{}
  100. var err error
  101. loadCheckPoint(input.CheckpointFile, checkpoint)
  102. if checkpoint.Valid(srcFile, input.Bucket, input.Key, input.FilePath) {
  103. return checkpoint, true
  104. } else if checkpoint.Bucket != "" && checkpoint.Key != "" && checkpoint.UploadID != "" {
  105. // 尝试去 abort
  106. _, err = cli.AbortMultipartUpload(ctx,
  107. &AbortMultipartUploadInput{
  108. Bucket: checkpoint.Bucket,
  109. Key: checkpoint.Key,
  110. UploadID: checkpoint.UploadID})
  111. if err != nil && cli.logger != nil {
  112. cli.logger.Debug("fail to abort upload task: %s, err:%s", checkpoint.UploadID, err.Error())
  113. }
  114. }
  115. return nil, false
  116. }
  117. // getUploadCheckpoint get struct checkpoint from checkpoint file if checkpointPath is valid,
  118. // or initialize from scratch with function init
  119. func getUploadCheckpoint(ctx context.Context, cli *ClientV2, input *UploadFileInput, srcFile os.FileInfo, init func() (*uploadCheckpoint, error)) (checkpoint *uploadCheckpoint, err error) {
  120. if !input.EnableCheckpoint {
  121. return init()
  122. }
  123. checkpoint, exist := loadExistUploadCheckPoint(ctx, cli, input, srcFile)
  124. if exist {
  125. return checkpoint, nil
  126. }
  127. err = checkAndCreateDir(input.CheckpointFile)
  128. if err != nil {
  129. return nil, InvalidCheckpointFilePath.withCause(err)
  130. }
  131. file, err := os.Create(input.CheckpointFile)
  132. if err != nil {
  133. return nil, newTosClientError("tos: create checkpoint file failed", err)
  134. }
  135. _ = file.Close()
  136. checkpoint, err = init()
  137. if err != nil {
  138. return nil, err
  139. }
  140. err = checkpoint.WriteToFile()
  141. if err != nil {
  142. return nil, err
  143. }
  144. return
  145. }
  146. func bindCancelHookWithAborter(hook CancelHook, aborter func() error) {
  147. if hook == nil {
  148. return
  149. }
  150. cancel := hook.(*canceler)
  151. cancel.aborter = aborter
  152. }
  153. func bindCancelHookWithCleaner(hook CancelHook, cleaner func()) {
  154. if hook == nil {
  155. return
  156. }
  157. cancel := hook.(*canceler)
  158. cancel.cleaner = cleaner
  159. }
  160. func (cli *ClientV2) UploadFile(ctx context.Context, input *UploadFileInput) (output *UploadFileOutput, err error) {
  161. // avoid modifying on origin pointer
  162. input = &(*input)
  163. stat, err := os.Stat(input.FilePath)
  164. if err != nil {
  165. return nil, InvalidSrcFilePath
  166. }
  167. if err = validateUploadInput(input, stat, cli.isCustomDomain); err != nil {
  168. return nil, err
  169. }
  170. init := func() (*uploadCheckpoint, error) {
  171. return initUploadCheckpoint(input, stat)
  172. }
  173. // if the checkpoint file not exist, here we will create it
  174. checkpoint, err := getUploadCheckpoint(ctx, cli, input, stat, init)
  175. if err != nil {
  176. return nil, err
  177. }
  178. event := &uploadPostEvent{
  179. input: input,
  180. checkPoint: checkpoint,
  181. }
  182. if checkpoint.UploadID == "" {
  183. // create multipart upload task
  184. created, err := cli.CreateMultipartUploadV2(ctx, &input.CreateMultipartUploadV2Input)
  185. if err != nil {
  186. event.postUploadEvent(&UploadEvent{
  187. Type: enum.UploadEventCreateMultipartUploadFailed,
  188. Err: err,
  189. Bucket: input.Bucket,
  190. Key: input.Key,
  191. CheckpointFile: &input.CheckpointFile,
  192. })
  193. return nil, err
  194. }
  195. event.postUploadEvent(&UploadEvent{
  196. Type: enum.UploadEventCreateMultipartUploadSucceed,
  197. Bucket: input.Bucket,
  198. Key: input.Key,
  199. UploadID: &created.UploadID,
  200. CheckpointFile: &input.CheckpointFile,
  201. })
  202. checkpoint.UploadID = created.UploadID
  203. }
  204. cleaner := func() {
  205. _ = os.Remove(input.CheckpointFile)
  206. }
  207. event.checkPoint = checkpoint
  208. bindCancelHookWithCleaner(input.CancelHook, cleaner)
  209. return cli.uploadPart(ctx, checkpoint, input, event)
  210. }
  211. func prepareUploadTasks(cli *ClientV2, ctx context.Context, checkpoint *uploadCheckpoint, input *UploadFileInput) []task {
  212. tasks := make([]task, 0)
  213. consumed := int64(0)
  214. subtotal := int64(0)
  215. for _, part := range checkpoint.PartsInfo {
  216. if !part.IsCompleted {
  217. tasks = append(tasks, &uploadTask{
  218. cli: cli,
  219. ctx: ctx,
  220. input: input,
  221. total: checkpoint.FileInfo.Size,
  222. UploadID: checkpoint.UploadID,
  223. PartNumber: part.PartNumber,
  224. subtotal: &subtotal,
  225. consumed: &consumed,
  226. Offset: part.Offset,
  227. PartSize: part.PartSize,
  228. })
  229. } else {
  230. consumed += part.PartSize
  231. }
  232. }
  233. return tasks
  234. }
  235. func (u *uploadPostEvent) newUploadPartSucceedEvent(input *UploadFileInput, part uploadPartInfo) *UploadEvent {
  236. return &UploadEvent{
  237. Type: enum.UploadEventUploadPartSucceed,
  238. Bucket: input.Bucket,
  239. Key: input.Key,
  240. UploadID: part.uploadID,
  241. CheckpointFile: &input.CheckpointFile,
  242. UploadPartInfo: &UploadPartInfo{
  243. PartNumber: part.PartNumber,
  244. PartSize: part.PartSize,
  245. Offset: int64(part.Offset),
  246. ETag: &part.ETag,
  247. HashCrc64ecma: &part.HashCrc64ecma,
  248. },
  249. }
  250. }
  251. func (u *uploadPostEvent) newUploadPartAbortedEvent(input *UploadFileInput, uploadID string, err error) *UploadEvent {
  252. return &UploadEvent{
  253. Type: enum.UploadEventUploadPartAborted,
  254. Err: err,
  255. Bucket: input.Bucket,
  256. Key: input.Key,
  257. UploadID: &uploadID,
  258. CheckpointFile: &input.CheckpointFile,
  259. }
  260. }
  261. func (u *uploadPostEvent) newUploadPartFailedEvent(input *UploadFileInput, uploadID string, err error) *UploadEvent {
  262. return &UploadEvent{
  263. Type: enum.UploadEventUploadPartFailed,
  264. Err: err,
  265. Bucket: input.Bucket,
  266. Key: input.Key,
  267. UploadID: &uploadID,
  268. CheckpointFile: &input.CheckpointFile,
  269. }
  270. }
  271. func (u *uploadPostEvent) newCompleteMultipartUploadFailedEvent(input *UploadFileInput, uploadID string, err error) *UploadEvent {
  272. return &UploadEvent{
  273. Type: enum.UploadEventCompleteMultipartUploadFailed,
  274. Err: err,
  275. Bucket: input.Bucket,
  276. Key: input.Key,
  277. UploadID: &uploadID,
  278. CheckpointFile: &input.CheckpointFile,
  279. }
  280. }
  281. func newCompleteMultipartUploadSucceedEvent(input *UploadFileInput, uploadID string) *UploadEvent {
  282. return &UploadEvent{
  283. Type: enum.UploadEventCompleteMultipartUploadSucceed,
  284. Bucket: input.Bucket,
  285. Key: input.Key,
  286. UploadID: &uploadID,
  287. CheckpointFile: &input.CheckpointFile,
  288. }
  289. }
  290. func postDataTransferStatus(listener DataTransferListener, status *DataTransferStatus) {
  291. if listener != nil {
  292. listener.DataTransferStatusChange(status)
  293. }
  294. }
  295. func getCancelHandle(hook CancelHook) chan struct{} {
  296. if c, ok := hook.(*canceler); ok {
  297. return c.cancelHandle
  298. }
  299. return make(chan struct{})
  300. }
  301. func combineCRCInDownload(parts []downloadPartInfo) uint64 {
  302. if len(parts) == 0 {
  303. return 0
  304. }
  305. crc := parts[0].HashCrc64ecma
  306. for i := 1; i < len(parts); i++ {
  307. crc = CRC64Combine(crc, parts[i].HashCrc64ecma, uint64(parts[i].RangeEnd-parts[i].RangeStart+1))
  308. }
  309. return crc
  310. }
  311. // combineCRCInParts calculates the total CRC of continuous parts
  312. func combineCRCInParts(parts []uploadPartInfo) uint64 {
  313. if parts == nil || len(parts) == 0 {
  314. return 0
  315. }
  316. crc := parts[0].HashCrc64ecma
  317. for i := 1; i < len(parts); i++ {
  318. crc = CRC64Combine(crc, parts[i].HashCrc64ecma, uint64(parts[i].PartSize))
  319. }
  320. return crc
  321. }
  322. func (cli *ClientV2) uploadPart(ctx context.Context, checkpoint *uploadCheckpoint, input *UploadFileInput, event *uploadPostEvent) (*UploadFileOutput, error) {
  323. // prepare tasks
  324. // if amount of tasks >= 10000, err "tos: part count too many" will be raised.
  325. tasks := prepareUploadTasks(cli, ctx, checkpoint, input)
  326. routinesNum := min(input.TaskNum, len(tasks))
  327. cancelHandle := getCancelHandle(input.CancelHook)
  328. tg := newTaskGroup(cancelHandle, routinesNum, checkpoint, event, input.EnableCheckpoint, tasks)
  329. abort := func() error {
  330. _, err := cli.AbortMultipartUpload(ctx,
  331. &AbortMultipartUploadInput{
  332. Bucket: input.Bucket,
  333. Key: input.Key,
  334. UploadID: checkpoint.UploadID})
  335. _ = os.Remove(input.CheckpointFile)
  336. return err
  337. }
  338. bindCancelHookWithAborter(input.CancelHook, abort)
  339. tg.RunWorker()
  340. // start adding tasks
  341. postDataTransferStatus(input.DataTransferListener, &DataTransferStatus{
  342. TotalBytes: checkpoint.FileInfo.Size,
  343. Type: enum.DataTransferStarted,
  344. })
  345. tg.Scheduler()
  346. success, taskErr := tg.Wait()
  347. if taskErr != nil {
  348. if err := abort(); err != nil {
  349. return nil, err
  350. }
  351. return nil, taskErr
  352. }
  353. // handle results
  354. if success < len(tasks) {
  355. return nil, newTosClientError("tos: some upload tasks failed.", nil)
  356. }
  357. complete, err := cli.CompleteMultipartUploadV2(ctx, &CompleteMultipartUploadV2Input{
  358. Bucket: input.Bucket,
  359. Key: input.Key,
  360. UploadID: checkpoint.UploadID,
  361. Parts: checkpoint.GetParts(),
  362. })
  363. if err != nil {
  364. event.postUploadEvent(event.newCompleteMultipartUploadFailedEvent(input, checkpoint.UploadID, err))
  365. return nil, err
  366. }
  367. event.postUploadEvent(newCompleteMultipartUploadSucceedEvent(input, checkpoint.UploadID))
  368. if cli.enableCRC && complete.HashCrc64ecma != 0 && combineCRCInParts(checkpoint.PartsInfo) != complete.HashCrc64ecma {
  369. return nil, newTosClientError("tos: crc of entire file mismatch.", nil)
  370. }
  371. _ = os.Remove(input.CheckpointFile)
  372. return &UploadFileOutput{
  373. RequestInfo: complete.RequestInfo,
  374. Bucket: complete.Bucket,
  375. Key: complete.Key,
  376. UploadID: checkpoint.UploadID,
  377. ETag: complete.ETag,
  378. Location: complete.Location,
  379. VersionID: complete.VersionID,
  380. HashCrc64ecma: complete.HashCrc64ecma,
  381. SSECAlgorithm: checkpoint.SSECAlgorithm,
  382. SSECKeyMD5: checkpoint.SSECKeyMD5,
  383. EncodingType: checkpoint.EncodingType,
  384. }, nil
  385. }