checkpoint.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. package s3
  2. import (
  3. "crypto/md5"
  4. "encoding/hex"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/ks3sdklib/aws-sdk-go/aws"
  8. "github.com/ks3sdklib/aws-sdk-go/internal/protocol/rest"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. )
  13. const (
  14. DefaultTaskNum int64 = 3
  15. MaxPartNum int64 = 10000
  16. MaxPartSize int64 = 5 * 1024 * 1024 * 1024
  17. MinPartSize int64 = 100 * 1024
  18. DefaultPartSize int64 = 5 * 1024 * 1024
  19. FilePermMode = os.FileMode(0664)
  20. DirPermMode = os.FileMode(0755)
  21. CheckpointFileSuffixUploader = ".ucp"
  22. CheckpointFileSuffixDownloader = ".dcp"
  23. CheckpointFileSuffixCopier = ".ccp"
  24. TempFileSuffix = ".temp"
  25. CheckpointMagic = "B62CAE41-F268-4EC5-839D-FBE475E3FA02"
  26. )
  27. // ------------------------------------ UploadCheckpoint ------------------------------------
  28. type UploadCheckpoint struct {
  29. Magic string
  30. MD5 string
  31. CpFilePath string // checkpoint file full path
  32. UploadFilePath string // Local file path
  33. UploadFileSize int64 // Local file size
  34. UploadFileLastModified string // Local file last modified time
  35. BucketName string // Bucket name
  36. ObjectKey string // Object key
  37. PartSize int64 // Part size
  38. UploadId string // Upload ID
  39. PartETagList []*CompletedPart // Completed parts
  40. }
  41. func newUploadCheckpoint(u *Uploader) (*UploadCheckpoint, error) {
  42. request := u.uploadFileRequest
  43. fileSize := aws.ToLong(request.FileSize)
  44. partSize := u.getPartSize(fileSize, aws.ToLong(request.PartSize))
  45. cp := &UploadCheckpoint{
  46. Magic: CheckpointMagic,
  47. UploadFileSize: fileSize,
  48. BucketName: aws.ToString(request.Bucket),
  49. ObjectKey: aws.ToString(request.Key),
  50. PartSize: partSize,
  51. PartETagList: make([]*CompletedPart, 0),
  52. }
  53. filePath := aws.ToString(request.UploadFile)
  54. if filePath != "" {
  55. fileInfo, err := os.Stat(filePath)
  56. if err != nil {
  57. return nil, err
  58. }
  59. cp.UploadFilePath = filePath
  60. cp.UploadFileLastModified = fileInfo.ModTime().String()
  61. } else {
  62. if request.ObjectMeta != nil {
  63. cp.UploadFileLastModified = aws.ToString(request.ObjectMeta[HTTPHeaderLastModified])
  64. }
  65. }
  66. return cp, nil
  67. }
  68. func generateUploadCpFilePath(request *UploadFileInput) (string, error) {
  69. name := fmt.Sprintf("%s/%s", *request.Bucket, *request.Key)
  70. md5Hash := md5.New()
  71. md5Hash.Write([]byte("ks3://" + rest.EscapePath(name, false)))
  72. destHash := hex.EncodeToString(md5Hash.Sum(nil))
  73. filePath := aws.ToString(request.UploadFile)
  74. absPath, _ := filepath.Abs(filePath)
  75. md5Hash.Reset()
  76. md5Hash.Write([]byte(absPath))
  77. srcHash := hex.EncodeToString(md5Hash.Sum(nil))
  78. var dir string
  79. baseDir := aws.ToString(request.CheckpointDir)
  80. if baseDir == "" {
  81. dir = os.TempDir()
  82. } else {
  83. dir = filepath.Dir(baseDir)
  84. }
  85. cpFilePath := filepath.Join(dir, fmt.Sprintf("%v-%v%v", srcHash, destHash, CheckpointFileSuffixUploader))
  86. return cpFilePath, nil
  87. }
  88. // load checkpoint from local file
  89. func (cp *UploadCheckpoint) load() error {
  90. if cp.CpFilePath == "" {
  91. return nil
  92. }
  93. if !FileExists(cp.CpFilePath) {
  94. return nil
  95. }
  96. // 读取断点文件
  97. contents, err := os.ReadFile(cp.CpFilePath)
  98. if err != nil {
  99. return err
  100. }
  101. ucp := UploadCheckpoint{}
  102. if err = json.Unmarshal(contents, &ucp); err != nil {
  103. return err
  104. }
  105. // 判断断点文件是否有效
  106. if !cp.isValid(ucp) {
  107. err := cp.remove()
  108. if err != nil {
  109. return err
  110. }
  111. }
  112. // 读取断点文件成功,将断点文件中的信息赋值给当前对象
  113. cp.UploadId = ucp.UploadId
  114. cp.PartETagList = ucp.PartETagList
  115. return nil
  116. }
  117. func (cp *UploadCheckpoint) isValid(ucp UploadCheckpoint) bool {
  118. md5sum := ucp.checksum()
  119. if CheckpointMagic != ucp.Magic || md5sum != ucp.MD5 {
  120. return false
  121. }
  122. if cp.BucketName != ucp.BucketName ||
  123. cp.ObjectKey != ucp.ObjectKey ||
  124. cp.PartSize != ucp.PartSize ||
  125. cp.UploadFilePath != ucp.UploadFilePath ||
  126. cp.UploadFileSize != ucp.UploadFileSize ||
  127. cp.UploadFileLastModified != ucp.UploadFileLastModified {
  128. return false
  129. }
  130. if len(ucp.UploadId) == 0 {
  131. return false
  132. }
  133. return true
  134. }
  135. func (cp *UploadCheckpoint) dump() error {
  136. if cp.CpFilePath == "" {
  137. return nil
  138. }
  139. dir := filepath.Dir(cp.CpFilePath)
  140. if !DirExists(dir) {
  141. err := os.MkdirAll(dir, DirPermMode)
  142. if err != nil {
  143. return err
  144. }
  145. }
  146. cp.MD5 = cp.checksum()
  147. str, err := json.Marshal(cp)
  148. if err != nil {
  149. return err
  150. }
  151. return os.WriteFile(cp.CpFilePath, str, FilePermMode)
  152. }
  153. func (cp *UploadCheckpoint) checksum() string {
  154. str := cp.MD5
  155. cp.MD5 = ""
  156. json, _ := json.Marshal(cp)
  157. sum := md5.Sum(json)
  158. md5sum := hex.EncodeToString(sum[:])
  159. cp.MD5 = str
  160. return md5sum
  161. }
  162. func (cp *UploadCheckpoint) remove() error {
  163. if cp.CpFilePath == "" {
  164. return nil
  165. }
  166. return os.Remove(cp.CpFilePath)
  167. }
  168. // ------------------------------------ DownloadCheckpoint ------------------------------------
  169. type DownloadCheckpoint struct {
  170. Magic string
  171. MD5 string
  172. CpFilePath string // checkpoint file full path
  173. DownloadFilePath string // Local file path
  174. BucketName string // Bucket name
  175. ObjectKey string // Object key
  176. ObjectSize int64 // Object size
  177. ObjectLastModified string // Object last modified
  178. PartSize int64 // Part size
  179. PartETagList []*CompletedPart // Completed parts
  180. }
  181. func newDownloadCheckpoint(d *Downloader) (*DownloadCheckpoint, error) {
  182. request := d.downloadFileRequest
  183. meta := d.downloadFileMeta
  184. objectSize, _ := strconv.ParseInt(aws.ToString(meta[HTTPHeaderContentLength]), 10, 64)
  185. lastModified := aws.ToString(meta[HTTPHeaderLastModified])
  186. cp := &DownloadCheckpoint{
  187. Magic: CheckpointMagic,
  188. BucketName: aws.ToString(request.Bucket),
  189. ObjectKey: aws.ToString(request.Key),
  190. DownloadFilePath: aws.ToString(request.DownloadFile),
  191. ObjectSize: objectSize,
  192. ObjectLastModified: lastModified,
  193. PartSize: aws.ToLong(request.PartSize),
  194. PartETagList: make([]*CompletedPart, 0),
  195. }
  196. return cp, nil
  197. }
  198. func generateDownloadCpFilePath(request *DownloadFileInput) (string, error) {
  199. name := fmt.Sprintf("%v/%v", *request.Bucket, *request.Key)
  200. md5Hash := md5.New()
  201. md5Hash.Write([]byte("ks3://" + rest.EscapePath(name, false)))
  202. destHash := hex.EncodeToString(md5Hash.Sum(nil))
  203. filePath := aws.ToString(request.DownloadFile)
  204. absPath, _ := filepath.Abs(filePath)
  205. md5Hash.Reset()
  206. md5Hash.Write([]byte(absPath))
  207. srcHash := hex.EncodeToString(md5Hash.Sum(nil))
  208. var dir string
  209. baseDir := aws.ToString(request.CheckpointDir)
  210. if baseDir == "" {
  211. dir = os.TempDir()
  212. } else {
  213. dir = filepath.Dir(baseDir)
  214. }
  215. cpFilePath := filepath.Join(dir, fmt.Sprintf("%v-%v%v", srcHash, destHash, CheckpointFileSuffixDownloader))
  216. return cpFilePath, nil
  217. }
  218. // load checkpoint from local file
  219. func (cp *DownloadCheckpoint) load() error {
  220. if cp.CpFilePath == "" {
  221. return nil
  222. }
  223. if !FileExists(cp.CpFilePath) {
  224. return nil
  225. }
  226. // 读取断点文件
  227. contents, err := os.ReadFile(cp.CpFilePath)
  228. if err != nil {
  229. return err
  230. }
  231. dcp := DownloadCheckpoint{}
  232. if err = json.Unmarshal(contents, &dcp); err != nil {
  233. return err
  234. }
  235. // 判断断点文件是否有效
  236. if !cp.isValid(dcp) {
  237. cp.remove()
  238. return nil
  239. }
  240. // 读取断点文件成功,将断点文件中的信息赋值给当前对象
  241. cp.PartETagList = dcp.PartETagList
  242. return nil
  243. }
  244. func (cp *DownloadCheckpoint) isValid(dcp DownloadCheckpoint) bool {
  245. md5sum := dcp.checksum()
  246. if CheckpointMagic != dcp.Magic || md5sum != dcp.MD5 {
  247. return false
  248. }
  249. if cp.BucketName != dcp.BucketName ||
  250. cp.ObjectKey != dcp.ObjectKey ||
  251. cp.PartSize != dcp.PartSize ||
  252. cp.DownloadFilePath != dcp.DownloadFilePath ||
  253. cp.ObjectSize != dcp.ObjectSize ||
  254. cp.ObjectLastModified != dcp.ObjectLastModified {
  255. return false
  256. }
  257. return true
  258. }
  259. func (cp *DownloadCheckpoint) dump() error {
  260. if cp.CpFilePath == "" {
  261. return nil
  262. }
  263. dir := filepath.Dir(cp.CpFilePath)
  264. if !DirExists(dir) {
  265. err := os.MkdirAll(dir, DirPermMode)
  266. if err != nil {
  267. return err
  268. }
  269. }
  270. cp.MD5 = cp.checksum()
  271. str, err := json.Marshal(cp)
  272. if err != nil {
  273. return err
  274. }
  275. return os.WriteFile(cp.CpFilePath, str, FilePermMode)
  276. }
  277. func (cp *DownloadCheckpoint) checksum() string {
  278. str := cp.MD5
  279. cp.MD5 = ""
  280. json, _ := json.Marshal(cp)
  281. sum := md5.Sum(json)
  282. md5sum := hex.EncodeToString(sum[:])
  283. cp.MD5 = str
  284. return md5sum
  285. }
  286. func (cp *DownloadCheckpoint) remove() error {
  287. if cp.CpFilePath == "" {
  288. return nil
  289. }
  290. return os.Remove(cp.CpFilePath)
  291. }
  292. // ------------------------------------ CopyCheckpoint ------------------------------------
  293. type CopyCheckpoint struct {
  294. Magic string
  295. MD5 string
  296. CpFilePath string // checkpoint file full path
  297. BucketName string // Bucket name
  298. ObjectKey string // Object key
  299. SrcBucketName string // Source bucket name
  300. SrcObjectKey string // Source object key
  301. SrcObjectSize int64 // Source object size
  302. SrcObjectLastModified string // Source object last modified time
  303. PartSize int64 // Part size
  304. UploadId string // Upload ID
  305. PartETagList []*CompletedPart // Completed parts
  306. }
  307. func newCopyCheckpoint(c *Copier) (*CopyCheckpoint, error) {
  308. request := c.copyFileRequest
  309. meta := c.copyObjectMeta
  310. objectSize, _ := strconv.ParseInt(aws.ToString(meta[HTTPHeaderContentLength]), 10, 64)
  311. lastModified := aws.ToString(meta[HTTPHeaderLastModified])
  312. partSize := c.getPartSize(objectSize, aws.ToLong(request.PartSize))
  313. cp := &CopyCheckpoint{
  314. Magic: CheckpointMagic,
  315. BucketName: aws.ToString(request.Bucket),
  316. ObjectKey: aws.ToString(request.Key),
  317. SrcBucketName: aws.ToString(request.SourceBucket),
  318. SrcObjectKey: aws.ToString(request.SourceKey),
  319. SrcObjectSize: objectSize,
  320. SrcObjectLastModified: lastModified,
  321. PartSize: partSize,
  322. PartETagList: make([]*CompletedPart, 0),
  323. }
  324. return cp, nil
  325. }
  326. func generateCopyCpFilePath(request *CopyFileInput) (string, error) {
  327. dstName := fmt.Sprintf("%s/%s", *request.Bucket, *request.Key)
  328. md5Hash := md5.New()
  329. md5Hash.Write([]byte("ks3://" + rest.EscapePath(dstName, false)))
  330. destHash := hex.EncodeToString(md5Hash.Sum(nil))
  331. srcName := fmt.Sprintf("%s/%s", *request.SourceBucket, *request.SourceKey)
  332. md5Hash.Reset()
  333. md5Hash.Write([]byte(srcName))
  334. srcHash := hex.EncodeToString(md5Hash.Sum(nil))
  335. var dir string
  336. baseDir := aws.ToString(request.CheckpointDir)
  337. if baseDir == "" {
  338. dir = os.TempDir()
  339. } else {
  340. dir = filepath.Dir(baseDir)
  341. }
  342. cpFilePath := filepath.Join(dir, fmt.Sprintf("%v-%v%v", srcHash, destHash, CheckpointFileSuffixCopier))
  343. return cpFilePath, nil
  344. }
  345. // load checkpoint from local file
  346. func (cp *CopyCheckpoint) load() error {
  347. if cp.CpFilePath == "" {
  348. return nil
  349. }
  350. if !FileExists(cp.CpFilePath) {
  351. return nil
  352. }
  353. // 读取断点文件
  354. contents, err := os.ReadFile(cp.CpFilePath)
  355. if err != nil {
  356. return err
  357. }
  358. ccp := CopyCheckpoint{}
  359. if err = json.Unmarshal(contents, &ccp); err != nil {
  360. return err
  361. }
  362. // 判断断点文件是否有效
  363. if !cp.isValid(ccp) {
  364. err := cp.remove()
  365. if err != nil {
  366. return err
  367. }
  368. }
  369. // 读取断点文件成功,将断点文件中的信息赋值给当前对象
  370. cp.UploadId = ccp.UploadId
  371. cp.PartETagList = ccp.PartETagList
  372. return nil
  373. }
  374. func (cp *CopyCheckpoint) isValid(ccp CopyCheckpoint) bool {
  375. md5sum := ccp.checksum()
  376. if CheckpointMagic != ccp.Magic || md5sum != ccp.MD5 {
  377. return false
  378. }
  379. if cp.BucketName != ccp.BucketName ||
  380. cp.ObjectKey != ccp.ObjectKey ||
  381. cp.SrcBucketName != ccp.SrcBucketName ||
  382. cp.SrcObjectKey != ccp.SrcObjectKey ||
  383. cp.SrcObjectSize != ccp.SrcObjectSize ||
  384. cp.SrcObjectLastModified != ccp.SrcObjectLastModified ||
  385. cp.PartSize != ccp.PartSize {
  386. return false
  387. }
  388. if len(ccp.UploadId) == 0 {
  389. return false
  390. }
  391. return true
  392. }
  393. func (cp *CopyCheckpoint) dump() error {
  394. if cp.CpFilePath == "" {
  395. return nil
  396. }
  397. dir := filepath.Dir(cp.CpFilePath)
  398. if !DirExists(dir) {
  399. err := os.MkdirAll(dir, DirPermMode)
  400. if err != nil {
  401. return err
  402. }
  403. }
  404. cp.MD5 = cp.checksum()
  405. str, err := json.Marshal(cp)
  406. if err != nil {
  407. return err
  408. }
  409. return os.WriteFile(cp.CpFilePath, str, FilePermMode)
  410. }
  411. func (cp *CopyCheckpoint) checksum() string {
  412. str := cp.MD5
  413. cp.MD5 = ""
  414. json, _ := json.Marshal(cp)
  415. sum := md5.Sum(json)
  416. md5sum := hex.EncodeToString(sum[:])
  417. cp.MD5 = str
  418. return md5sum
  419. }
  420. func (cp *CopyCheckpoint) remove() error {
  421. if cp.CpFilePath == "" {
  422. return nil
  423. }
  424. return os.Remove(cp.CpFilePath)
  425. }