uploader.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. package s3
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/ks3sdklib/aws-sdk-go/aws"
  7. "github.com/ks3sdklib/aws-sdk-go/internal/crc"
  8. "io"
  9. "os"
  10. "path/filepath"
  11. "sort"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. )
  18. type UploadFileInput struct {
  19. // The name of the bucket.
  20. Bucket *string `location:"uri" locationName:"Bucket" type:"string" required:"true"`
  21. // Object key of the object.
  22. Key *string `location:"uri" locationName:"Key" type:"string" required:"true"`
  23. // The path of the file to be uploaded.
  24. UploadFile *string `type:"string" required:"true"`
  25. // The size of the file to be uploaded.
  26. FileSize *int64 `type:"integer"`
  27. // The file part fetcher.
  28. FilePartFetcher *FilePartFetcher `type:"structure"`
  29. // The object metadata.
  30. ObjectMeta map[string]*string `type:"structure"`
  31. // The size of each part.
  32. PartSize *int64 `type:"integer"`
  33. // The number of tasks to upload the file.
  34. TaskNum *int64 `type:"integer"`
  35. // Whether to enable checkpoint.
  36. EnableCheckpoint *bool `type:"boolean"`
  37. // The directory to store the checkpoint file.
  38. CheckpointDir *string `type:"string"`
  39. // The checkpoint file path.
  40. CheckpointFile *string `type:"string"`
  41. // The canned ACL to apply to the object.
  42. ACL *string `location:"header" locationName:"x-amz-acl" type:"string"`
  43. // Specifies caching behavior along the request/reply chain.
  44. CacheControl *string `location:"header" locationName:"Cache-Control" type:"string"`
  45. // Specifies presentational information for the object.
  46. ContentDisposition *string `location:"header" locationName:"Content-Disposition" type:"string"`
  47. // Specifies what content encodings have been applied to the object and thus
  48. // what decoding mechanisms must be applied to obtain the media-type referenced
  49. // by the Content-Type header field.
  50. ContentEncoding *string `location:"header" locationName:"Content-Encoding" type:"string"`
  51. // A standard MIME type describing the format of the object data.
  52. ContentType *string `location:"header" locationName:"Content-Type" type:"string"`
  53. // The date and time at which the object is no longer cacheable.
  54. Expires *time.Time `location:"header" locationName:"Expires" type:"timestamp" timestampFormat:"rfc822"`
  55. // A map of metadata to store with the object in S3.
  56. Metadata map[string]*string `location:"headers" locationName:"x-amz-meta-" type:"map"`
  57. // The type of storage to use for the object. Defaults to 'STANDARD'.
  58. StorageClass *string `location:"header" locationName:"x-amz-storage-class" type:"string"`
  59. // Specifies the object tag of the object. Multiple tags can be set at the same time, such as: TagA=A&TagB=B.
  60. // Note: Key and Value need to be URL-encoded first. If an item does not have "=", the Value is considered to be an empty string.
  61. Tagging *string `location:"header" locationName:"x-amz-tagging" type:"string"`
  62. // Specifies whether the object is forbidden to overwrite.
  63. ForbidOverwrite *bool `location:"header" locationName:"x-amz-forbid-overwrite" type:"boolean"`
  64. // Allows grantee to read the object data and its metadata.
  65. GrantRead *string `location:"header" locationName:"x-amz-grant-read" type:"string"`
  66. // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object.
  67. GrantFullControl *string `location:"header" locationName:"x-amz-grant-full-control" type:"string"`
  68. // The Server-side encryption algorithm used when storing this object in KS3, eg: AES256.
  69. ServerSideEncryption *string `location:"header" locationName:"x-amz-server-side-encryption" type:"string"`
  70. // Specifies the algorithm to use to when encrypting the object, eg: AES256.
  71. SSECustomerAlgorithm *string `location:"header" locationName:"x-amz-server-side-encryption-customer-algorithm" type:"string"`
  72. // Specifies the customer-provided encryption key for KS3 to use in encrypting data.
  73. SSECustomerKey *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key" type:"string"`
  74. // Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321.
  75. SSECustomerKeyMD5 *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key-MD5" type:"string"`
  76. // Progress callback function
  77. ProgressFn aws.ProgressFunc `location:"function"`
  78. }
  79. type UploadFileOutput struct {
  80. Bucket *string
  81. Key *string
  82. ETag *string
  83. ChecksumCRC64ECMA *string
  84. }
  85. type FilePartFetcher interface {
  86. Fetch(objectRange []int64) (io.ReadSeeker, error)
  87. }
  88. func (c *S3) UploadFile(request *UploadFileInput) (*UploadFileOutput, error) {
  89. return c.UploadFileWithContext(context.Background(), request)
  90. }
  91. func (c *S3) UploadFileWithContext(ctx context.Context, request *UploadFileInput) (*UploadFileOutput, error) {
  92. return newUploader(c, ctx, request).uploadFile()
  93. }
  94. type Uploader struct {
  95. client *S3
  96. context context.Context
  97. uploadFileRequest *UploadFileInput
  98. uploadCheckpoint *UploadCheckpoint
  99. CompletedSize int64
  100. mu sync.Mutex
  101. error error
  102. }
  103. func newUploader(s3 *S3, ctx context.Context, request *UploadFileInput) *Uploader {
  104. return &Uploader{
  105. client: s3,
  106. context: ctx,
  107. uploadFileRequest: request,
  108. }
  109. }
  110. func (u *Uploader) uploadFile() (*UploadFileOutput, error) {
  111. err := u.validate()
  112. if err != nil {
  113. return nil, err
  114. }
  115. if aws.ToString(u.uploadFileRequest.UploadFile) != "" && aws.ToLong(u.uploadFileRequest.FileSize) <= aws.ToLong(u.uploadFileRequest.PartSize) {
  116. return u.putObject()
  117. }
  118. return u.multipartUpload()
  119. }
  120. func (u *Uploader) validate() error {
  121. request := u.uploadFileRequest
  122. if request == nil {
  123. return errors.New("upload file request is required")
  124. }
  125. if aws.ToString(request.Bucket) == "" {
  126. return errors.New("bucket is required")
  127. }
  128. if aws.ToString(request.Key) == "" {
  129. return errors.New("key is required")
  130. }
  131. err := u.normalizeUploadPath()
  132. if err != nil {
  133. return err
  134. }
  135. filePath := aws.ToString(request.UploadFile)
  136. if filePath == "" && request.FilePartFetcher == nil {
  137. return errors.New("upload file or file part fetcher is required")
  138. }
  139. if filePath != "" {
  140. fileInfo, err := os.Stat(filePath)
  141. if err != nil {
  142. return err
  143. }
  144. if fileInfo.IsDir() {
  145. return errors.New("upload file not a file")
  146. }
  147. request.FileSize = aws.Long(fileInfo.Size())
  148. } else {
  149. if request.ObjectMeta != nil {
  150. fileSize, _ := strconv.ParseInt(aws.ToString(request.ObjectMeta[HTTPHeaderContentLength]), 10, 64)
  151. request.FileSize = aws.Long(fileSize)
  152. }
  153. }
  154. if request.FilePartFetcher != nil && request.FileSize == nil {
  155. return errors.New("file size is required")
  156. }
  157. if request.PartSize == nil {
  158. request.PartSize = aws.Long(DefaultPartSize)
  159. } else if aws.ToLong(request.PartSize) < MinPartSize {
  160. request.PartSize = aws.Long(MinPartSize)
  161. } else if aws.ToLong(request.PartSize) > MaxPartSize {
  162. request.PartSize = aws.Long(MaxPartSize)
  163. }
  164. if aws.ToLong(request.TaskNum) <= 0 {
  165. request.TaskNum = aws.Long(DefaultTaskNum)
  166. }
  167. return nil
  168. }
  169. func (u *Uploader) putObject() (*UploadFileOutput, error) {
  170. request := u.uploadFileRequest
  171. fd, err := os.Open(aws.ToString(request.UploadFile))
  172. if err != nil {
  173. return nil, err
  174. }
  175. defer fd.Close()
  176. resp, err := u.client.PutObjectWithContext(u.context, &PutObjectInput{
  177. Bucket: request.Bucket,
  178. Key: request.Key,
  179. Body: fd,
  180. ACL: request.ACL,
  181. CacheControl: request.CacheControl,
  182. ContentDisposition: request.ContentDisposition,
  183. ContentEncoding: request.ContentEncoding,
  184. ContentType: request.ContentType,
  185. Expires: request.Expires,
  186. Metadata: request.Metadata,
  187. StorageClass: request.StorageClass,
  188. Tagging: request.Tagging,
  189. ForbidOverwrite: request.ForbidOverwrite,
  190. GrantRead: request.GrantRead,
  191. GrantFullControl: request.GrantFullControl,
  192. ServerSideEncryption: request.ServerSideEncryption,
  193. SSECustomerAlgorithm: request.SSECustomerAlgorithm,
  194. SSECustomerKey: request.SSECustomerKey,
  195. SSECustomerKeyMD5: request.SSECustomerKeyMD5,
  196. ProgressFn: request.ProgressFn,
  197. })
  198. if err != nil {
  199. return nil, err
  200. }
  201. return &UploadFileOutput{
  202. Bucket: request.Bucket,
  203. Key: request.Key,
  204. ETag: resp.ETag,
  205. ChecksumCRC64ECMA: resp.Metadata[HTTPHeaderAmzChecksumCrc64ecma],
  206. }, nil
  207. }
  208. func (u *Uploader) multipartUpload() (*UploadFileOutput, error) {
  209. ucp, err := newUploadCheckpoint(u)
  210. if err != nil {
  211. return nil, err
  212. }
  213. u.uploadCheckpoint = ucp
  214. if aws.ToBoolean(u.uploadFileRequest.EnableCheckpoint) {
  215. cpFilePath := aws.ToString(u.uploadFileRequest.CheckpointFile)
  216. if cpFilePath == "" {
  217. cpFilePath, err = generateUploadCpFilePath(u.uploadFileRequest)
  218. if err != nil {
  219. return nil, err
  220. }
  221. }
  222. ucp.CpFilePath = cpFilePath
  223. err = ucp.load()
  224. if err != nil {
  225. return nil, err
  226. }
  227. if ucp.UploadId != "" && !u.isUploadIdValid() {
  228. ucp.UploadId = ""
  229. ucp.PartETagList = make([]*CompletedPart, 0)
  230. ucp.remove()
  231. }
  232. }
  233. if ucp.UploadId == "" {
  234. ucp.UploadId, err = u.initUploadId()
  235. if err != nil {
  236. return nil, err
  237. }
  238. ucp.dump()
  239. }
  240. fileSize := ucp.UploadFileSize
  241. partSize := ucp.PartSize
  242. totalPartNum := (fileSize-1)/partSize + 1
  243. tasks := make(chan UploadPartTask, totalPartNum)
  244. var i int64
  245. for i = 0; i < totalPartNum; i++ {
  246. partNum := i + 1
  247. offset := i * partSize
  248. actualPartSize := u.getActualPartSize(fileSize, partSize, partNum)
  249. partETag := u.getPartETag(partNum)
  250. if partETag != nil {
  251. u.publishProgress(actualPartSize)
  252. } else {
  253. uploadPartTask := UploadPartTask{
  254. partNumber: partNum,
  255. offset: offset,
  256. actualPartSize: actualPartSize,
  257. }
  258. tasks <- uploadPartTask
  259. }
  260. }
  261. close(tasks)
  262. var wg sync.WaitGroup
  263. for i = 0; i < aws.ToLong(u.uploadFileRequest.TaskNum); i++ {
  264. wg.Add(1)
  265. go u.runTask(tasks, &wg)
  266. }
  267. wg.Wait()
  268. if u.error != nil {
  269. return nil, u.error
  270. }
  271. completedMultipartUpload := u.getMultipartUploadParts()
  272. resp, err := u.completeMultipartUpload(completedMultipartUpload)
  273. if err != nil {
  274. return nil, err
  275. }
  276. if u.client.Config.CrcCheckEnabled {
  277. clientCrc64 := u.getCrc64Ecma(completedMultipartUpload.Parts)
  278. serverCrc64, _ := strconv.ParseUint(aws.ToString(resp.ChecksumCRC64ECMA), 10, 64)
  279. u.client.Config.LogDebug("check file crc64, client crc64:%d, server crc64:%d", clientCrc64, serverCrc64)
  280. if serverCrc64 != 0 && clientCrc64 != serverCrc64 {
  281. return nil, errors.New(fmt.Sprintf("crc64 check failed, client crc64:%d, server crc64:%d", clientCrc64, serverCrc64))
  282. }
  283. }
  284. return u.getUploadFileOutput(resp), nil
  285. }
  286. func (u *Uploader) getUploadFileOutput(resp *CompleteMultipartUploadOutput) *UploadFileOutput {
  287. return &UploadFileOutput{
  288. Bucket: resp.Bucket,
  289. Key: resp.Key,
  290. ETag: resp.ETag,
  291. ChecksumCRC64ECMA: resp.ChecksumCRC64ECMA,
  292. }
  293. }
  294. func (u *Uploader) getPartSize(fileSize int64, originPartSize int64) int64 {
  295. partSize := originPartSize
  296. totalPartNum := (fileSize-1)/partSize + 1
  297. for totalPartNum > MaxPartNum {
  298. partSize += originPartSize
  299. totalPartNum = (fileSize-1)/partSize + 1
  300. }
  301. return partSize
  302. }
  303. func (u *Uploader) getActualPartSize(fileSize int64, partSize int64, partNum int64) int64 {
  304. offset := (partNum - 1) * partSize
  305. actualPartSize := partSize
  306. if offset+partSize >= fileSize {
  307. actualPartSize = fileSize - offset
  308. }
  309. return actualPartSize
  310. }
  311. func (u *Uploader) getPartETag(partNumber int64) *CompletedPart {
  312. for _, partETag := range u.uploadCheckpoint.PartETagList {
  313. if *partETag.PartNumber == partNumber {
  314. return partETag
  315. }
  316. }
  317. return nil
  318. }
  319. type UploadPartTask struct {
  320. partNumber int64
  321. offset int64
  322. actualPartSize int64
  323. }
  324. func (u *Uploader) runTask(tasks <-chan UploadPartTask, wg *sync.WaitGroup) {
  325. defer wg.Done()
  326. for task := range tasks {
  327. if u.error != nil {
  328. return
  329. }
  330. partETag, err := u.uploadPart(task)
  331. if err != nil {
  332. u.setError(err)
  333. return
  334. }
  335. u.updatePart(partETag)
  336. }
  337. }
  338. func (u *Uploader) uploadPart(task UploadPartTask) (CompletedPart, error) {
  339. request := u.uploadFileRequest
  340. ucp := u.uploadCheckpoint
  341. offset := task.offset
  342. actualPartSize := task.actualPartSize
  343. var partETag CompletedPart
  344. var reader io.ReadSeeker
  345. if ucp.UploadFilePath != "" {
  346. fd, err := os.Open(ucp.UploadFilePath)
  347. if err != nil {
  348. return partETag, err
  349. }
  350. defer fd.Close()
  351. reader = io.NewSectionReader(fd, offset, actualPartSize)
  352. } else {
  353. var err error
  354. reader, err = (*u.uploadFileRequest.FilePartFetcher).Fetch([]int64{offset, offset + actualPartSize - 1})
  355. if err != nil {
  356. return partETag, err
  357. }
  358. }
  359. resp, err := u.client.UploadPartWithContext(u.context, &UploadPartInput{
  360. Bucket: aws.String(ucp.BucketName),
  361. Key: aws.String(ucp.ObjectKey),
  362. UploadID: aws.String(ucp.UploadId),
  363. PartNumber: aws.Long(task.partNumber),
  364. Body: reader,
  365. ContentLength: aws.Long(actualPartSize),
  366. SSECustomerAlgorithm: request.SSECustomerAlgorithm,
  367. SSECustomerKey: request.SSECustomerKey,
  368. SSECustomerKeyMD5: request.SSECustomerKeyMD5,
  369. })
  370. if err != nil {
  371. return partETag, err
  372. }
  373. partETag.PartNumber = aws.Long(task.partNumber)
  374. partETag.ETag = resp.ETag
  375. partETag.ChecksumCRC64ECMA = resp.ChecksumCRC64ECMA
  376. u.publishProgress(actualPartSize)
  377. return partETag, nil
  378. }
  379. func (u *Uploader) updatePart(partETag CompletedPart) {
  380. u.mu.Lock()
  381. defer u.mu.Unlock()
  382. u.uploadCheckpoint.PartETagList = append(u.uploadCheckpoint.PartETagList, &partETag)
  383. u.uploadCheckpoint.dump()
  384. }
  385. func (u *Uploader) setError(err error) {
  386. u.mu.Lock()
  387. defer u.mu.Unlock()
  388. if u.error == nil {
  389. u.error = err
  390. }
  391. }
  392. type CompletedParts []*CompletedPart
  393. func (cp CompletedParts) Len() int {
  394. return len(cp)
  395. }
  396. func (cp CompletedParts) Less(i, j int) bool {
  397. return *cp[i].PartNumber < *cp[j].PartNumber
  398. }
  399. func (cp CompletedParts) Swap(i, j int) {
  400. cp[i], cp[j] = cp[j], cp[i]
  401. }
  402. func (u *Uploader) getMultipartUploadParts() *CompletedMultipartUpload {
  403. partETags := u.uploadCheckpoint.PartETagList
  404. // 按照PartNumber排序
  405. sort.Sort(CompletedParts(partETags))
  406. return &CompletedMultipartUpload{
  407. Parts: partETags,
  408. }
  409. }
  410. func (u *Uploader) completeMultipartUpload(completedMultipartUpload *CompletedMultipartUpload) (*CompleteMultipartUploadOutput, error) {
  411. resp, err := u.client.CompleteMultipartUploadWithContext(u.context, &CompleteMultipartUploadInput{
  412. Bucket: u.uploadFileRequest.Bucket,
  413. Key: u.uploadFileRequest.Key,
  414. UploadID: aws.String(u.uploadCheckpoint.UploadId),
  415. MultipartUpload: completedMultipartUpload,
  416. ForbidOverwrite: u.uploadFileRequest.ForbidOverwrite,
  417. })
  418. if err != nil {
  419. return nil, err
  420. }
  421. u.uploadCheckpoint.remove()
  422. return resp, err
  423. }
  424. func (u *Uploader) publishProgress(actualPartSize int64) {
  425. if u.uploadFileRequest.ProgressFn != nil {
  426. atomic.AddInt64(&u.CompletedSize, actualPartSize)
  427. u.uploadFileRequest.ProgressFn(actualPartSize, u.CompletedSize, aws.ToLong(u.uploadFileRequest.FileSize))
  428. }
  429. }
  430. func (u *Uploader) getCrc64Ecma(parts []*CompletedPart) uint64 {
  431. if parts == nil || len(parts) == 0 {
  432. return 0
  433. }
  434. fileSize := u.uploadCheckpoint.UploadFileSize
  435. partSize := u.uploadCheckpoint.PartSize
  436. crcTemp, _ := strconv.ParseUint(*parts[0].ChecksumCRC64ECMA, 10, 64)
  437. for i := 1; i < len(parts); i++ {
  438. crc2, _ := strconv.ParseUint(*parts[i].ChecksumCRC64ECMA, 10, 64)
  439. actualPartSize := u.getActualPartSize(fileSize, partSize, *parts[i].PartNumber)
  440. crcTemp = crc.CRC64Combine(crcTemp, crc2, (uint64)(actualPartSize))
  441. }
  442. return crcTemp
  443. }
  444. func (u *Uploader) initUploadId() (string, error) {
  445. request := u.uploadFileRequest
  446. resp, err := u.client.CreateMultipartUploadWithContext(u.context, &CreateMultipartUploadInput{
  447. Bucket: request.Bucket,
  448. Key: request.Key,
  449. ACL: request.ACL,
  450. CacheControl: request.CacheControl,
  451. ContentDisposition: request.ContentDisposition,
  452. ContentEncoding: request.ContentEncoding,
  453. ContentType: request.ContentType,
  454. Expires: request.Expires,
  455. Metadata: request.Metadata,
  456. StorageClass: request.StorageClass,
  457. Tagging: request.Tagging,
  458. ForbidOverwrite: request.ForbidOverwrite,
  459. GrantRead: request.GrantRead,
  460. GrantFullControl: request.GrantFullControl,
  461. ServerSideEncryption: request.ServerSideEncryption,
  462. SSECustomerAlgorithm: request.SSECustomerAlgorithm,
  463. SSECustomerKey: request.SSECustomerKey,
  464. SSECustomerKeyMD5: request.SSECustomerKeyMD5,
  465. })
  466. if err != nil {
  467. return "", err
  468. }
  469. return aws.ToString(resp.UploadID), nil
  470. }
  471. func (u *Uploader) isUploadIdValid() bool {
  472. _, err := u.client.ListPartsWithContext(u.context, &ListPartsInput{
  473. Bucket: u.uploadFileRequest.Bucket,
  474. Key: u.uploadFileRequest.Key,
  475. UploadID: aws.String(u.uploadCheckpoint.UploadId),
  476. })
  477. if err != nil && strings.Contains(err.Error(), "NoSuchUpload") {
  478. return false
  479. }
  480. return true
  481. }
  482. func (u *Uploader) normalizeUploadPath() error {
  483. uploadPath := aws.ToString(u.uploadFileRequest.UploadFile)
  484. if uploadPath == "" {
  485. return nil
  486. }
  487. // 规范化路径
  488. normalizedPath := filepath.Clean(uploadPath)
  489. // 获取绝对路径
  490. absPath, err := filepath.Abs(normalizedPath)
  491. if err != nil {
  492. return err
  493. }
  494. u.uploadFileRequest.UploadFile = aws.String(absPath)
  495. return nil
  496. }