objectstore.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cloudprovider
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "regexp"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "yunion.io/x/jsonutils"
  27. "yunion.io/x/log"
  28. "yunion.io/x/pkg/errors"
  29. "yunion.io/x/pkg/util/streamutils"
  30. "yunion.io/x/s3cli"
  31. )
  32. type TBucketACLType string
  33. const (
  34. // 50 MB
  35. MAX_PUT_OBJECT_SIZEBYTES = int64(1024 * 1024 * 50)
  36. // ACLDefault = TBucketACLType("default")
  37. ACLPrivate = TBucketACLType(s3cli.CANNED_ACL_PRIVATE)
  38. ACLAuthRead = TBucketACLType(s3cli.CANNED_ACL_AUTH_READ)
  39. ACLPublicRead = TBucketACLType(s3cli.CANNED_ACL_PUBLIC_READ)
  40. ACLPublicReadWrite = TBucketACLType(s3cli.CANNED_ACL_PUBLIC_READ_WRITE)
  41. ACLUnknown = TBucketACLType("")
  42. META_HEADER_CACHE_CONTROL = "Cache-Control"
  43. META_HEADER_CONTENT_TYPE = "Content-Type"
  44. META_HEADER_CONTENT_DISPOSITION = "Content-Disposition"
  45. META_HEADER_CONTENT_ENCODING = "Content-Encoding"
  46. META_HEADER_CONTENT_LANGUAGE = "Content-Language"
  47. META_HEADER_CONTENT_MD5 = "Content-MD5"
  48. META_HEADER_PREFIX = "X-Yunion-Meta-"
  49. )
  50. type SBucketStats struct {
  51. SizeBytes int64
  52. ObjectCount int
  53. }
  54. func (s SBucketStats) Equals(s2 SBucketStats) bool {
  55. if s.SizeBytes == s2.SizeBytes && s.ObjectCount == s2.ObjectCount {
  56. return true
  57. } else {
  58. return false
  59. }
  60. }
  61. type SBucketAccessUrl struct {
  62. Url string
  63. Description string
  64. Primary bool
  65. }
  66. type SBucketWebsiteRoutingRule struct {
  67. ConditionErrorCode string
  68. ConditionPrefix string
  69. RedirectProtocol string
  70. RedirectReplaceKey string
  71. RedirectReplaceKeyPrefix string
  72. }
  73. type SBucketWebsiteConf struct {
  74. // 主页
  75. Index string
  76. // 错误时返回的文档
  77. ErrorDocument string
  78. // http或https
  79. Protocol string
  80. Rules []SBucketWebsiteRoutingRule
  81. // 网站访问url,一般由bucketid,region等组成
  82. Url string
  83. }
  84. type SBucketCORSRule struct {
  85. AllowedMethods []string
  86. // 允许的源站,可以设为*
  87. AllowedOrigins []string
  88. AllowedHeaders []string
  89. MaxAgeSeconds int
  90. ExposeHeaders []string
  91. // 规则区别标识
  92. Id string
  93. }
  94. type SBucketRefererConf struct {
  95. // 域名列表
  96. DomainList []string
  97. // 域名列表
  98. // enmu: Black-List, White-List
  99. RefererType string
  100. // 是否允许空referer 访问
  101. AllowEmptyRefer bool
  102. Enabled bool
  103. }
  104. type SBucketPolicyStatement struct {
  105. // 授权的目标主体
  106. Principal map[string][]string `json:"Principal,omitempty"`
  107. // 授权的行为
  108. Action []string `json:"Action,omitempty"`
  109. // Allow|Deny
  110. Effect string `json:"Effect,omitempty"`
  111. // 被授权的资源
  112. Resource []string `json:"Resource,omitempty"`
  113. // 触发授权的条件
  114. Condition map[string]map[string]interface{} `json:"Condition,omitempty"`
  115. // 解析字段,主账号id:子账号id
  116. PrincipalId []string
  117. // map[主账号id:子账号id]子账号名称
  118. PrincipalNames map[string]string
  119. // Read|ReadWrite|FullControl
  120. CannedAction string
  121. // 资源路径
  122. ResourcePath []string
  123. // 根据index 生成
  124. Id string
  125. }
  126. type SBucketPolicyStatementInput struct {
  127. // 主账号id:子账号id
  128. PrincipalId []string
  129. // Read|ReadWrite|FullControl
  130. CannedAction string
  131. // Allow|Deny
  132. Effect string
  133. // 被授权的资源地址,/*
  134. ResourcePath []string
  135. // ip 条件
  136. IpEquals []string
  137. IpNotEquals []string
  138. // 触发授权的条件
  139. Condition map[string]map[string]interface{}
  140. }
  141. type SBucketMultipartUploads struct {
  142. // object name
  143. ObjectName string
  144. UploadID string
  145. // 发起人
  146. Initiator string
  147. // 发起时间
  148. Initiated time.Time
  149. }
  150. type SBaseCloudObject struct {
  151. Key string
  152. SizeBytes int64
  153. StorageClass string
  154. ETag string
  155. LastModified time.Time
  156. Meta http.Header
  157. }
  158. type SListObjectResult struct {
  159. Objects []ICloudObject
  160. NextMarker string
  161. CommonPrefixes []ICloudObject
  162. IsTruncated bool
  163. }
  164. // range start from 0
  165. type SGetObjectRange struct {
  166. Start int64
  167. End int64
  168. }
  169. func (r SGetObjectRange) SizeBytes() int64 {
  170. return r.End - r.Start + 1
  171. }
  172. var (
  173. rangeExp = regexp.MustCompile(`(bytes=)?(\d*)-(\d*)`)
  174. )
  175. func ParseRange(rangeStr string) SGetObjectRange {
  176. objRange := SGetObjectRange{}
  177. if len(rangeStr) > 0 {
  178. find := rangeExp.FindAllStringSubmatch(rangeStr, -1)
  179. if len(find) > 0 && len(find[0]) > 3 {
  180. objRange.Start, _ = strconv.ParseInt(find[0][2], 10, 64)
  181. objRange.End, _ = strconv.ParseInt(find[0][3], 10, 64)
  182. }
  183. }
  184. return objRange
  185. }
  186. func (r SGetObjectRange) String() string {
  187. if r.Start > 0 && r.End > 0 {
  188. return fmt.Sprintf("bytes=%d-%d", r.Start, r.End)
  189. } else if r.Start > 0 && r.End <= 0 {
  190. return fmt.Sprintf("bytes=%d-", r.Start)
  191. } else if r.Start <= 0 && r.End > 0 {
  192. return fmt.Sprintf("bytes=0-%d", r.End)
  193. } else {
  194. return ""
  195. }
  196. }
  197. type ICloudBucket interface {
  198. IVirtualResource
  199. MaxPartCount() int
  200. MaxPartSizeBytes() int64
  201. //GetGlobalId() string
  202. //GetName() string
  203. GetAcl() TBucketACLType
  204. GetLocation() string
  205. GetIRegion() ICloudRegion
  206. GetStorageClass() string
  207. GetAccessUrls() []SBucketAccessUrl
  208. GetStats() SBucketStats
  209. GetLimit() SBucketStats
  210. SetLimit(limit SBucketStats) error
  211. LimitSupport() SBucketStats
  212. SetAcl(acl TBucketACLType) error
  213. ListObjects(prefix string, marker string, delimiter string, maxCount int) (SListObjectResult, error)
  214. CopyObject(ctx context.Context, destKey string, srcBucket, srcKey string, cannedAcl TBucketACLType, storageClassStr string, meta http.Header) error
  215. GetObject(ctx context.Context, key string, rangeOpt *SGetObjectRange) (io.ReadCloser, error)
  216. DeleteObject(ctx context.Context, keys string) error
  217. GetTempUrl(method string, key string, expire time.Duration) (string, error)
  218. PutObject(ctx context.Context, key string, input io.Reader, sizeBytes int64, cannedAcl TBucketACLType, storageClassStr string, meta http.Header) error
  219. NewMultipartUpload(ctx context.Context, key string, cannedAcl TBucketACLType, storageClassStr string, meta http.Header) (string, error)
  220. UploadPart(ctx context.Context, key string, uploadId string, partIndex int, input io.Reader, partSize int64, offset, totalSize int64) (string, error)
  221. CopyPart(ctx context.Context, key string, uploadId string, partIndex int, srcBucketName string, srcKey string, srcOffset int64, srcLength int64) (string, error)
  222. CompleteMultipartUpload(ctx context.Context, key string, uploadId string, partEtags []string) error
  223. AbortMultipartUpload(ctx context.Context, key string, uploadId string) error
  224. SetWebsite(conf SBucketWebsiteConf) error
  225. GetWebsiteConf() (SBucketWebsiteConf, error)
  226. DeleteWebSiteConf() error
  227. SetCORS(rules []SBucketCORSRule) error
  228. GetCORSRules() ([]SBucketCORSRule, error)
  229. DeleteCORS() error
  230. SetReferer(conf SBucketRefererConf) error
  231. GetReferer() (SBucketRefererConf, error)
  232. GetCdnDomains() ([]SCdnDomain, error)
  233. GetPolicy() ([]SBucketPolicyStatement, error)
  234. SetPolicy(policy SBucketPolicyStatementInput) error
  235. DeletePolicy(id []string) ([]SBucketPolicyStatement, error)
  236. ListMultipartUploads() ([]SBucketMultipartUploads, error)
  237. }
  238. type ICloudObject interface {
  239. GetIBucket() ICloudBucket
  240. GetKey() string
  241. GetSizeBytes() int64
  242. GetLastModified() time.Time
  243. GetStorageClass() string
  244. GetETag() string
  245. GetMeta() http.Header
  246. SetMeta(ctx context.Context, meta http.Header) error
  247. GetAcl() TBucketACLType
  248. SetAcl(acl TBucketACLType) error
  249. }
  250. type SCloudObject struct {
  251. Key string
  252. SizeBytes int64
  253. StorageClass string
  254. ETag string
  255. LastModified time.Time
  256. Meta http.Header
  257. Acl string
  258. }
  259. func ICloudObject2Struct(obj ICloudObject) SCloudObject {
  260. return SCloudObject{
  261. Key: obj.GetKey(),
  262. SizeBytes: obj.GetSizeBytes(),
  263. StorageClass: obj.GetStorageClass(),
  264. ETag: obj.GetETag(),
  265. LastModified: obj.GetLastModified(),
  266. Meta: obj.GetMeta(),
  267. Acl: string(obj.GetAcl()),
  268. }
  269. }
  270. func ICloudObject2JSONObject(obj ICloudObject) jsonutils.JSONObject {
  271. return jsonutils.Marshal(ICloudObject2Struct(obj))
  272. }
  273. func (o *SBaseCloudObject) GetKey() string {
  274. return o.Key
  275. }
  276. func (o *SBaseCloudObject) GetSizeBytes() int64 {
  277. return o.SizeBytes
  278. }
  279. func (o *SBaseCloudObject) GetLastModified() time.Time {
  280. return o.LastModified
  281. }
  282. func (o *SBaseCloudObject) GetStorageClass() string {
  283. return o.StorageClass
  284. }
  285. func (o *SBaseCloudObject) GetETag() string {
  286. return o.ETag
  287. }
  288. func (o *SBaseCloudObject) GetMeta() http.Header {
  289. return o.Meta
  290. }
  291. //func (o *SBaseCloudObject) SetMeta(meta http.Header) error {
  292. // return nil
  293. //}
  294. func GetIBucketById(region ICloudRegion, name string) (ICloudBucket, error) {
  295. buckets, err := region.GetIBuckets()
  296. if err != nil {
  297. return nil, errors.Wrap(err, "region.GetIBuckets")
  298. }
  299. for i := range buckets {
  300. if buckets[i].GetGlobalId() == name {
  301. return buckets[i], nil
  302. }
  303. }
  304. return nil, ErrNotFound
  305. }
  306. func GetIBucketByName(region ICloudRegion, name string) (ICloudBucket, error) {
  307. buckets, err := region.GetIBuckets()
  308. if err != nil {
  309. return nil, errors.Wrap(err, "region.GetIBuckets")
  310. }
  311. for i := range buckets {
  312. if buckets[i].GetName() == name {
  313. return buckets[i], nil
  314. }
  315. }
  316. return nil, ErrNotFound
  317. }
  318. func GetIBucketStats(bucket ICloudBucket) (SBucketStats, error) {
  319. stats := SBucketStats{
  320. ObjectCount: -1,
  321. SizeBytes: -1,
  322. }
  323. objs, err := bucket.ListObjects("", "", "", 1000)
  324. if err != nil {
  325. return stats, errors.Wrap(err, "GetIObjects")
  326. }
  327. if objs.IsTruncated {
  328. return stats, errors.Wrap(ErrTooLarge, "too many objects")
  329. }
  330. stats.ObjectCount, stats.SizeBytes = 0, 0
  331. for _, obj := range objs.Objects {
  332. stats.SizeBytes += obj.GetSizeBytes()
  333. stats.ObjectCount += 1
  334. }
  335. return stats, nil
  336. }
  337. type cloudObjectList []ICloudObject
  338. func (a cloudObjectList) Len() int { return len(a) }
  339. func (a cloudObjectList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  340. func (a cloudObjectList) Less(i, j int) bool { return a[i].GetKey() < a[j].GetKey() }
  341. func GetPagedObjects(bucket ICloudBucket, objectPrefix string, isRecursive bool, marker string, maxCount int) ([]ICloudObject, string, error) {
  342. delimiter := "/"
  343. if isRecursive {
  344. delimiter = ""
  345. }
  346. if maxCount > 1000 || maxCount <= 0 {
  347. maxCount = 1000
  348. }
  349. ret := make([]ICloudObject, 0)
  350. result, err := bucket.ListObjects(objectPrefix, marker, delimiter, maxCount)
  351. if err != nil {
  352. return nil, "", errors.Wrap(err, "bucket.ListObjects")
  353. }
  354. // Send all objects
  355. for i := range result.Objects {
  356. // if delimited, skip the first object ends with delimiter
  357. if !isRecursive && result.Objects[i].GetKey() == objectPrefix && strings.HasSuffix(objectPrefix, delimiter) {
  358. continue
  359. }
  360. ret = append(ret, result.Objects[i])
  361. marker = result.Objects[i].GetKey()
  362. }
  363. // Send all common prefixes if any.
  364. // NOTE: prefixes are only present if the request is delimited.
  365. if len(result.CommonPrefixes) > 0 {
  366. ret = append(ret, result.CommonPrefixes...)
  367. }
  368. // sort prefix by name in ascending order
  369. sort.Sort(cloudObjectList(ret))
  370. // If next marker present, save it for next request.
  371. if result.NextMarker != "" {
  372. marker = result.NextMarker
  373. }
  374. // If not truncated, no more objects
  375. if !result.IsTruncated {
  376. marker = ""
  377. }
  378. return ret, marker, nil
  379. }
  380. func GetAllObjects(bucket ICloudBucket, objectPrefix string, isRecursive bool) ([]ICloudObject, error) {
  381. ret := make([]ICloudObject, 0)
  382. // Save marker for next request.
  383. var marker string
  384. for {
  385. // Get list of objects a maximum of 1000 per request.
  386. result, marker, err := GetPagedObjects(bucket, objectPrefix, isRecursive, marker, 1000)
  387. if err != nil {
  388. return nil, errors.Wrap(err, "bucket.ListObjects")
  389. }
  390. ret = append(ret, result...)
  391. if marker == "" {
  392. break
  393. }
  394. }
  395. return ret, nil
  396. }
  397. func GetIObject(bucket ICloudBucket, objectPrefix string) (ICloudObject, error) {
  398. tryPrefix := []string{objectPrefix}
  399. if strings.HasSuffix(objectPrefix, "/") {
  400. tryPrefix = append(tryPrefix, objectPrefix[:len(objectPrefix)-1])
  401. }
  402. for _, pref := range tryPrefix {
  403. result, err := bucket.ListObjects(pref, "", "", 1)
  404. if err != nil {
  405. return nil, errors.Wrap(err, "bucket.ListObjects")
  406. }
  407. objects := result.Objects
  408. if len(objects) > 0 && objects[0].GetKey() == objectPrefix {
  409. return objects[0], nil
  410. }
  411. }
  412. return nil, ErrNotFound
  413. }
  414. func Makedir(ctx context.Context, bucket ICloudBucket, key string) error {
  415. segs := make([]string, 0)
  416. for _, seg := range strings.Split(key, "/") {
  417. if len(seg) > 0 {
  418. segs = append(segs, seg)
  419. }
  420. }
  421. path := strings.Join(segs, "/") + "/"
  422. err := bucket.PutObject(ctx, path, strings.NewReader(""), 0, bucket.GetAcl(), "", nil)
  423. if err != nil {
  424. return errors.Wrap(err, "PutObject")
  425. }
  426. return nil
  427. }
  428. func UploadObject(ctx context.Context, bucket ICloudBucket, key string, blocksz int64, input io.Reader, sizeBytes int64, cannedAcl TBucketACLType, storageClass string, meta http.Header, debug bool) error {
  429. return UploadObjectParallel(ctx, bucket, key, blocksz, newReaderAt(input), sizeBytes, cannedAcl, storageClass, meta, debug, 1)
  430. }
  431. type sSeqReader struct {
  432. reader io.Reader
  433. offset int64
  434. }
  435. func newReaderAt(input io.Reader) io.ReaderAt {
  436. return &sSeqReader{
  437. reader: input,
  438. offset: 0,
  439. }
  440. }
  441. func (sr *sSeqReader) ReadAt(p []byte, offset int64) (int, error) {
  442. return sr.reader.Read(p)
  443. }
  444. type sOffsetReader struct {
  445. readerAt io.ReaderAt
  446. offset int64
  447. }
  448. func newReader(input io.ReaderAt, inputOffset int64) io.Reader {
  449. return &sOffsetReader{
  450. readerAt: input,
  451. offset: inputOffset,
  452. }
  453. }
  454. func (or *sOffsetReader) Read(p []byte) (int, error) {
  455. n, err := or.readerAt.ReadAt(p, or.offset)
  456. or.offset += int64(n)
  457. return n, err
  458. }
  459. type uploadPartOfMultipartJob struct {
  460. ctx context.Context
  461. bucket ICloudBucket
  462. key string
  463. input io.Reader
  464. sizeBytes int64
  465. uploadId string
  466. partIndex int
  467. partSize int64
  468. offset int64
  469. debug bool
  470. etags []string
  471. errs *[]error
  472. }
  473. func uploadPartOfMultipartWorker(wg *sync.WaitGroup, queue chan uploadPartOfMultipartJob) {
  474. defer wg.Done()
  475. for job := range queue {
  476. tag, err := uploadPartOfMultipart(job.ctx, job.bucket, job.key, job.input, job.sizeBytes, job.uploadId, job.partIndex, job.partSize, job.offset, job.debug)
  477. if err != nil {
  478. *job.errs = append(*job.errs, err)
  479. } else {
  480. job.etags[job.partIndex] = tag
  481. }
  482. }
  483. }
  484. func uploadPartOfMultipart(ctx context.Context, bucket ICloudBucket, key string, input io.Reader, sizeBytes int64, uploadId string, partIndex int, partSize int64, offset int64, debug bool) (string, error) {
  485. var startAt time.Time
  486. if debug {
  487. startAt = time.Now()
  488. log.Debugf("UploadPart %d %d", partIndex+1, partSize)
  489. }
  490. etag, err := bucket.UploadPart(ctx, key, uploadId, partIndex+1, io.LimitReader(input, partSize), partSize, offset, sizeBytes)
  491. if err != nil {
  492. return "", errors.Wrapf(err, "bucket.UploadPart %d", partIndex)
  493. }
  494. if debug {
  495. duration := time.Since(startAt)
  496. rateMbps := calculateRateMbps(partSize, duration)
  497. log.Debugf("End of uploadPart %d %d takes %f seconds at %fMbps", partIndex+1, partSize, float64(duration)/float64(time.Second), rateMbps)
  498. }
  499. return etag, nil
  500. }
  501. func UploadObjectParallel(ctx context.Context, bucket ICloudBucket, key string, blocksz int64, input io.ReaderAt, sizeBytes int64, cannedAcl TBucketACLType, storageClass string, meta http.Header, debug bool, parallel int) error {
  502. if blocksz <= 0 {
  503. blocksz = MAX_PUT_OBJECT_SIZEBYTES
  504. }
  505. if sizeBytes < blocksz {
  506. if debug {
  507. log.Debugf("too small, put object in one shot")
  508. }
  509. return bucket.PutObject(ctx, key, newReader(input, 0), sizeBytes, cannedAcl, storageClass, meta)
  510. }
  511. partSize := blocksz
  512. partCount := sizeBytes / partSize
  513. if partCount*partSize < sizeBytes {
  514. partCount += 1
  515. }
  516. if partCount > int64(bucket.MaxPartCount()) {
  517. partCount = int64(bucket.MaxPartCount())
  518. partSize = sizeBytes / partCount
  519. if partSize*partCount < sizeBytes {
  520. partSize += 1
  521. }
  522. if partSize > bucket.MaxPartSizeBytes() {
  523. return errors.Error("too larget object")
  524. }
  525. }
  526. if debug {
  527. log.Debugf("multipart upload part count %d part size %d", partCount, partSize)
  528. }
  529. uploadId, err := bucket.NewMultipartUpload(ctx, key, cannedAcl, storageClass, meta)
  530. if err != nil {
  531. return errors.Wrap(err, "bucket.NewMultipartUpload")
  532. }
  533. etags := make([]string, partCount)
  534. var errs []error
  535. {
  536. if parallel < 1 {
  537. parallel = 1
  538. }
  539. queue := make(chan uploadPartOfMultipartJob, parallel)
  540. wg := &sync.WaitGroup{}
  541. for i := 0; i < parallel; i++ {
  542. wg.Add(1)
  543. go uploadPartOfMultipartWorker(wg, queue)
  544. }
  545. for i := 0; i < int(partCount); i += 1 {
  546. offset := int64(i) * partSize
  547. blockSize := partSize
  548. if i == int(partCount)-1 {
  549. blockSize = sizeBytes - partSize*(partCount-1)
  550. }
  551. partIndex := i
  552. job := uploadPartOfMultipartJob{
  553. ctx: ctx,
  554. bucket: bucket,
  555. key: key,
  556. input: newReader(input, offset),
  557. sizeBytes: sizeBytes,
  558. uploadId: uploadId,
  559. partIndex: partIndex,
  560. partSize: blockSize,
  561. offset: offset,
  562. debug: debug,
  563. etags: etags,
  564. errs: &errs,
  565. }
  566. queue <- job
  567. }
  568. close(queue)
  569. wg.Wait()
  570. }
  571. if len(errs) > 0 {
  572. // upload part error
  573. err2 := bucket.AbortMultipartUpload(ctx, key, uploadId)
  574. if err2 != nil {
  575. log.Errorf("bucket.AbortMultipartUpload error %s", err2)
  576. errs = append(errs, err2)
  577. }
  578. return errors.Wrap(errors.NewAggregate(errs), "uploadPartOfMultipart")
  579. }
  580. err = bucket.CompleteMultipartUpload(ctx, key, uploadId, etags)
  581. if err != nil {
  582. err2 := bucket.AbortMultipartUpload(ctx, key, uploadId)
  583. if err2 != nil {
  584. log.Errorf("bucket.AbortMultipartUpload error %s", err2)
  585. }
  586. return errors.Wrap(err, "CompleteMultipartUpload")
  587. }
  588. return nil
  589. }
  590. func DeletePrefix(ctx context.Context, bucket ICloudBucket, prefix string) error {
  591. objs, err := GetAllObjects(bucket, prefix, true)
  592. if err != nil {
  593. return errors.Wrap(err, "bucket.GetIObjects")
  594. }
  595. for i := range objs {
  596. err := bucket.DeleteObject(ctx, objs[i].GetKey())
  597. if err != nil {
  598. return errors.Wrap(err, "bucket.DeleteObject")
  599. }
  600. }
  601. return nil
  602. }
  603. func MergeMeta(src http.Header, dst http.Header) http.Header {
  604. if src != nil && dst != nil {
  605. ret := http.Header{}
  606. for k, vs := range src {
  607. for _, v := range vs {
  608. ret.Add(k, v)
  609. }
  610. }
  611. for k, vs := range dst {
  612. for _, v := range vs {
  613. ret.Add(k, v)
  614. }
  615. }
  616. return ret
  617. } else if src != nil && dst == nil {
  618. return src
  619. } else if src == nil && dst != nil {
  620. return dst
  621. } else {
  622. return nil
  623. }
  624. }
  625. func CopyObject(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, dstMeta http.Header, debug bool) error {
  626. return CopyObjectParallel(ctx, blocksz, dstBucket, dstKey, srcBucket, srcKey, dstMeta, debug, 1)
  627. }
  628. type copyPartOfMultipartJob struct {
  629. ctx context.Context
  630. dstBucket ICloudBucket
  631. dstKey string
  632. srcBucket ICloudBucket
  633. srcKey string
  634. rangeOpt *SGetObjectRange
  635. sizeBytes int64
  636. uploadId string
  637. partIndex int
  638. debug bool
  639. etags []string
  640. errs *[]error
  641. }
  642. func copyPartOfMultipartWorker(wg *sync.WaitGroup, queue chan copyPartOfMultipartJob) {
  643. defer wg.Done()
  644. for job := range queue {
  645. tag, err := copyPartOfMultipart(job.ctx, job.dstBucket, job.dstKey, job.srcBucket, job.srcKey, job.rangeOpt, job.sizeBytes, job.uploadId, job.partIndex, job.debug)
  646. if err != nil {
  647. *job.errs = append(*job.errs, err)
  648. } else {
  649. job.etags[job.partIndex] = tag
  650. }
  651. }
  652. }
  653. func copyPartOfMultipart(ctx context.Context, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, rangeOpt *SGetObjectRange, sizeBytes int64, uploadId string, partIndex int, debug bool) (string, error) {
  654. partSize := rangeOpt.SizeBytes()
  655. var startAt time.Time
  656. if debug {
  657. startAt = time.Now()
  658. log.Debugf("CopyPart %d %d range: %s (%d)", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes())
  659. }
  660. srcStream, err := srcBucket.GetObject(ctx, srcKey, rangeOpt)
  661. if err != nil {
  662. return "", errors.Wrapf(err, "srcBucket.GetObject %d", partIndex)
  663. }
  664. defer srcStream.Close()
  665. etag, err := dstBucket.UploadPart(ctx, dstKey, uploadId, partIndex+1, io.LimitReader(srcStream, partSize), partSize, rangeOpt.Start, sizeBytes)
  666. if err != nil {
  667. return "", errors.Wrapf(err, "dstBucket.UploadPart %d", partIndex)
  668. }
  669. if debug {
  670. duration := time.Since(startAt)
  671. rateMbps := calculateRateMbps(partSize, duration)
  672. log.Debugf("End of copyPart %d %d range: %s (%d) takes %d seconds at %fMbps", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes(), duration/time.Second, rateMbps)
  673. }
  674. return etag, nil
  675. }
  676. func CopyObjectParallel(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, dstMeta http.Header, debug bool, parallel int) error {
  677. srcObj, err := GetIObject(srcBucket, srcKey)
  678. if err != nil {
  679. return errors.Wrap(err, "GetIObject")
  680. }
  681. if blocksz <= 0 {
  682. blocksz = MAX_PUT_OBJECT_SIZEBYTES
  683. }
  684. sizeBytes := srcObj.GetSizeBytes()
  685. if sizeBytes < blocksz {
  686. if debug {
  687. log.Debugf("too small, copy object in one shot")
  688. }
  689. srcStream, err := srcBucket.GetObject(ctx, srcKey, nil)
  690. if err != nil {
  691. return errors.Wrap(err, "srcBucket.GetObject")
  692. }
  693. defer srcStream.Close()
  694. err = dstBucket.PutObject(ctx, dstKey, srcStream, sizeBytes, srcObj.GetAcl(), srcObj.GetStorageClass(), MergeMeta(srcObj.GetMeta(), dstMeta))
  695. if err != nil {
  696. return errors.Wrap(err, "dstBucket.PutObject")
  697. }
  698. return nil
  699. }
  700. partSize := blocksz
  701. partCount := sizeBytes / partSize
  702. if partCount*partSize < sizeBytes {
  703. partCount += 1
  704. }
  705. if partCount > int64(dstBucket.MaxPartCount()) {
  706. partCount = int64(dstBucket.MaxPartCount())
  707. partSize = sizeBytes / partCount
  708. if partSize*partCount < sizeBytes {
  709. partSize += 1
  710. }
  711. if partSize > dstBucket.MaxPartSizeBytes() {
  712. return errors.Error("too larget object")
  713. }
  714. }
  715. if debug {
  716. log.Debugf("multipart upload part count %d part size %d", partCount, partSize)
  717. }
  718. uploadId, err := dstBucket.NewMultipartUpload(ctx, dstKey, srcObj.GetAcl(), srcObj.GetStorageClass(), MergeMeta(srcObj.GetMeta(), dstMeta))
  719. if err != nil {
  720. return errors.Wrap(err, "bucket.NewMultipartUpload")
  721. }
  722. etags := make([]string, partCount)
  723. var errs []error
  724. {
  725. if parallel < 1 {
  726. parallel = 1
  727. }
  728. queue := make(chan copyPartOfMultipartJob, parallel)
  729. var wg sync.WaitGroup
  730. for i := 0; i < parallel; i++ {
  731. wg.Add(1)
  732. go copyPartOfMultipartWorker(&wg, queue)
  733. }
  734. for i := 0; i < int(partCount); i += 1 {
  735. start := int64(i) * partSize
  736. blockSize := partSize
  737. if i == int(partCount)-1 {
  738. blockSize = sizeBytes - partSize*(partCount-1)
  739. }
  740. end := start + blockSize - 1
  741. rangeOpt := SGetObjectRange{
  742. Start: start,
  743. End: end,
  744. }
  745. partIndex := i
  746. job := copyPartOfMultipartJob{
  747. ctx: ctx,
  748. dstBucket: dstBucket,
  749. dstKey: dstKey,
  750. srcBucket: srcBucket,
  751. srcKey: srcKey,
  752. rangeOpt: &rangeOpt,
  753. sizeBytes: sizeBytes,
  754. uploadId: uploadId,
  755. partIndex: partIndex,
  756. debug: debug,
  757. etags: etags,
  758. errs: &errs,
  759. }
  760. queue <- job
  761. }
  762. close(queue)
  763. wg.Wait()
  764. }
  765. if len(errs) > 0 {
  766. // upload part error
  767. err2 := dstBucket.AbortMultipartUpload(ctx, dstKey, uploadId)
  768. if err2 != nil {
  769. log.Errorf("bucket.AbortMultipartUpload error %s", err2)
  770. errs = append(errs, err2)
  771. }
  772. return errors.Wrap(errors.NewAggregate(errs), "copyPartOfMultipart")
  773. }
  774. err = dstBucket.CompleteMultipartUpload(ctx, dstKey, uploadId, etags)
  775. if err != nil {
  776. err2 := dstBucket.AbortMultipartUpload(ctx, dstKey, uploadId)
  777. if err2 != nil {
  778. log.Errorf("bucket.AbortMultipartUpload error %s", err2)
  779. }
  780. return errors.Wrap(err, "CompleteMultipartUpload")
  781. }
  782. return nil
  783. }
  784. func CopyPart(ctx context.Context,
  785. iDstBucket ICloudBucket, dstKey string, uploadId string, partNumber int,
  786. iSrcBucket ICloudBucket, srcKey string, rangeOpt *SGetObjectRange,
  787. ) (string, error) {
  788. return copyPartOfMultipart(ctx, iDstBucket, dstKey, iSrcBucket, srcKey, rangeOpt, 0, uploadId, partNumber, false)
  789. }
  790. func ObjectSetMeta(ctx context.Context,
  791. bucket ICloudBucket, obj ICloudObject,
  792. meta http.Header,
  793. ) error {
  794. return bucket.CopyObject(ctx, obj.GetKey(), bucket.GetName(), obj.GetKey(), obj.GetAcl(), obj.GetStorageClass(), meta)
  795. }
  796. func MetaToHttpHeader(metaPrefix string, meta http.Header) http.Header {
  797. hdr := http.Header{}
  798. for k, v := range meta {
  799. if len(v) == 0 || len(v[0]) == 0 {
  800. continue
  801. }
  802. k = http.CanonicalHeaderKey(k)
  803. switch k {
  804. case META_HEADER_CACHE_CONTROL,
  805. META_HEADER_CONTENT_TYPE,
  806. META_HEADER_CONTENT_DISPOSITION,
  807. META_HEADER_CONTENT_ENCODING,
  808. META_HEADER_CONTENT_LANGUAGE,
  809. META_HEADER_CONTENT_MD5:
  810. hdr.Set(k, v[0])
  811. default:
  812. hdr.Set(fmt.Sprintf("%s%s", metaPrefix, k), v[0])
  813. }
  814. }
  815. return hdr
  816. }
  817. func FetchMetaFromHttpHeader(metaPrefix string, headers http.Header) http.Header {
  818. metaPrefix = http.CanonicalHeaderKey(metaPrefix)
  819. meta := http.Header{}
  820. for hdr, vals := range headers {
  821. hdr = http.CanonicalHeaderKey(hdr)
  822. if strings.HasPrefix(hdr, metaPrefix) {
  823. for _, val := range vals {
  824. meta.Add(hdr[len(metaPrefix):], val)
  825. }
  826. }
  827. }
  828. for _, hdr := range []string{
  829. META_HEADER_CONTENT_TYPE,
  830. META_HEADER_CONTENT_ENCODING,
  831. META_HEADER_CONTENT_DISPOSITION,
  832. META_HEADER_CONTENT_LANGUAGE,
  833. META_HEADER_CACHE_CONTROL,
  834. } {
  835. val := headers.Get(hdr)
  836. if len(val) > 0 {
  837. meta.Set(hdr, val)
  838. }
  839. }
  840. return meta
  841. }
  842. func SetBucketCORS(ibucket ICloudBucket, rules []SBucketCORSRule) error {
  843. if len(rules) == 0 {
  844. return nil
  845. }
  846. oldRules, err := ibucket.GetCORSRules()
  847. if err != nil {
  848. return errors.Wrap(err, "ibucket.GetCORSRules()")
  849. }
  850. newSet := []SBucketCORSRule{}
  851. updateSet := map[int]SBucketCORSRule{}
  852. for i := range rules {
  853. index, err := strconv.Atoi(rules[i].Id)
  854. if err == nil && index < len(oldRules) {
  855. updateSet[index] = rules[i]
  856. } else {
  857. newSet = append(newSet, rules[i])
  858. }
  859. }
  860. updatedRules := []SBucketCORSRule{}
  861. for i := range oldRules {
  862. if _, ok := updateSet[i]; !ok {
  863. updatedRules = append(updatedRules, oldRules[i])
  864. } else {
  865. updatedRules = append(updatedRules, updateSet[i])
  866. }
  867. }
  868. updatedRules = append(updatedRules, newSet...)
  869. err = ibucket.SetCORS(updatedRules)
  870. if err != nil {
  871. return errors.Wrap(err, "ibucket.SetCORS(updatedRules)")
  872. }
  873. return nil
  874. }
  875. func DeleteBucketCORS(ibucket ICloudBucket, id []string) ([]SBucketCORSRule, error) {
  876. if len(id) == 0 {
  877. return nil, nil
  878. }
  879. deletedRules := []SBucketCORSRule{}
  880. oldRules, err := ibucket.GetCORSRules()
  881. if err != nil {
  882. return nil, errors.Wrap(err, "ibucket.GetCORSRules()")
  883. }
  884. excludeMap := map[int]bool{}
  885. for i := range id {
  886. index, err := strconv.Atoi(id[i])
  887. if err == nil && index < len(oldRules) {
  888. excludeMap[index] = true
  889. }
  890. }
  891. if len(excludeMap) == 0 {
  892. return nil, nil
  893. }
  894. newRules := []SBucketCORSRule{}
  895. for i := range oldRules {
  896. if _, ok := excludeMap[i]; !ok {
  897. newRules = append(newRules, oldRules[i])
  898. } else {
  899. deletedRules = append(deletedRules, oldRules[i])
  900. }
  901. }
  902. if len(newRules) == 0 {
  903. err = ibucket.DeleteCORS()
  904. if err != nil {
  905. return nil, errors.Wrapf(err, "ibucket.DeleteCORS()")
  906. }
  907. } else {
  908. err = ibucket.SetCORS(newRules)
  909. if err != nil {
  910. return nil, errors.Wrapf(err, "ibucket.SetBucketCORS(newRules)")
  911. }
  912. }
  913. return deletedRules, nil
  914. }
  915. func SetBucketTags(ctx context.Context, iBucket ICloudBucket, mangerId string, tags map[string]string) (TagsUpdateInfo, error) {
  916. ret := TagsUpdateInfo{}
  917. old, err := iBucket.GetTags()
  918. if err != nil {
  919. if errors.Cause(err) == ErrNotImplemented || errors.Cause(err) == ErrNotSupported {
  920. return ret, nil
  921. }
  922. return ret, errors.Wrapf(err, "iBucket.GetTags")
  923. }
  924. ret.OldTags, ret.NewTags = old, tags
  925. if !ret.IsChanged() {
  926. return ret, nil
  927. }
  928. return ret, SetTags(ctx, iBucket, mangerId, tags, true)
  929. }
  930. type sOffsetWriter struct {
  931. writerAt io.WriterAt
  932. offset int64
  933. }
  934. func newWriter(output io.WriterAt, outputOffset int64) io.Writer {
  935. return &sOffsetWriter{
  936. writerAt: output,
  937. offset: outputOffset,
  938. }
  939. }
  940. func (ow *sOffsetWriter) Write(p []byte) (int, error) {
  941. n, err := ow.writerAt.WriteAt(p, ow.offset)
  942. ow.offset += int64(n)
  943. return n, err
  944. }
  945. func calculateRateMbps(sizeBytes int64, duration time.Duration) float64 {
  946. return float64(sizeBytes*8/1000/1000) / (float64(duration) / float64(time.Second))
  947. }
  948. type downloadPartOfMultipartJob struct {
  949. ctx context.Context
  950. bucket ICloudBucket
  951. key string
  952. rangeOpt *SGetObjectRange
  953. output io.Writer
  954. partIndex int
  955. debug bool
  956. segSizes []int64
  957. errs *[]error
  958. callback func(saved int64, written int64)
  959. }
  960. func downloadPartOfMultipartWorker(wg *sync.WaitGroup, queue chan downloadPartOfMultipartJob) {
  961. defer wg.Done()
  962. for job := range queue {
  963. sz, err := downloadPartOfMultipart(job.ctx, job.bucket, job.key, job.rangeOpt, job.output, job.partIndex, job.debug, job.callback)
  964. if err != nil {
  965. *job.errs = append(*job.errs, err)
  966. } else {
  967. job.segSizes[job.partIndex] = sz
  968. }
  969. }
  970. }
  971. func downloadPartOfMultipart(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.Writer, partIndex int, debug bool, callback func(saved int64, written int64)) (int64, error) {
  972. partSize := rangeOpt.SizeBytes()
  973. var startAt time.Time
  974. if debug {
  975. startAt = time.Now()
  976. log.Debugf("downloadPart %d %d range: %s (%d)", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes())
  977. }
  978. stream, err := bucket.GetObject(ctx, key, rangeOpt)
  979. if err != nil {
  980. return 0, errors.Wrap(err, "bucket.GetObject")
  981. }
  982. defer stream.Close()
  983. prop, err := streamutils.StreamPipe2(stream, output, false, callback)
  984. if err != nil {
  985. return 0, errors.Wrap(err, "StreamPipe")
  986. }
  987. if debug {
  988. duration := time.Since(startAt)
  989. rateMbps := calculateRateMbps(partSize, duration)
  990. log.Debugf("End of downloadPart %d %d range: %s (%d) takes %f seconds at %fMbps", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes(), float64(duration)/float64(time.Second), rateMbps)
  991. }
  992. return prop.Size, nil
  993. }
  994. func DownloadObjectParallel(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.WriterAt, outputOffset int64, blocksz int64, debug bool, parallel int) (int64, error) {
  995. return DownloadObjectParallelWithProgress(ctx, bucket, key, rangeOpt, output, outputOffset, blocksz, debug, parallel, nil)
  996. }
  997. type sDownloadProgresser struct {
  998. totalSize int64
  999. progress int64
  1000. startTime time.Time
  1001. reportTime time.Time
  1002. callback func(progress float64, progressMbps float64, totalSizeMb int64)
  1003. }
  1004. func newDownloadProgresser(totalSize int64, callback func(progress float64, progressMbps float64, totalSizeMb int64)) *sDownloadProgresser {
  1005. return &sDownloadProgresser{
  1006. totalSize: totalSize,
  1007. startTime: time.Now(),
  1008. reportTime: time.Now(),
  1009. callback: callback,
  1010. }
  1011. }
  1012. func (p *sDownloadProgresser) Progress(_ int64, written int64) {
  1013. p.progress += written
  1014. duration := time.Since(p.startTime)
  1015. progress := float64(p.progress*100) / float64(p.totalSize)
  1016. progressMbps := calculateRateMbps(p.progress, duration)
  1017. if p.callback != nil {
  1018. p.callback(progress, progressMbps, p.totalSize/1000/1000)
  1019. }
  1020. if time.Since(p.reportTime) > time.Second*5 {
  1021. p.reportTime = time.Now()
  1022. log.Infof("Download progress: %d/%d, %f%%, %fMbps", p.progress, p.totalSize, progress, progressMbps)
  1023. }
  1024. }
  1025. func (p *sDownloadProgresser) Summary() {
  1026. duration := time.Since(p.startTime)
  1027. rateMbps := calculateRateMbps(p.progress, duration)
  1028. log.Infof("End of download %d: downloaded %d takes %f seconds at %fMbps", p.totalSize, p.progress, float64(duration)/float64(time.Second), rateMbps)
  1029. }
  1030. func DownloadObjectParallelWithProgress(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.WriterAt, outputOffset int64, blocksz int64, debug bool, parallel int, callback func(progress float64, progressMbps float64, totalSizeMb int64)) (int64, error) {
  1031. if debug {
  1032. log.Debugf("DownloadObjectParallelWithProgress bucket: %s key: %s offset: %d blocksz: %d parallel: %d", bucket.GetName(), key, outputOffset, blocksz, parallel)
  1033. }
  1034. obj, err := GetIObject(bucket, key)
  1035. if err != nil {
  1036. return 0, errors.Wrap(err, "GetIObject")
  1037. }
  1038. if blocksz <= 0 {
  1039. blocksz = MAX_PUT_OBJECT_SIZEBYTES
  1040. }
  1041. sizeBytes := obj.GetSizeBytes()
  1042. if sizeBytes < 0 {
  1043. return 0, errors.Wrapf(errors.ErrServer, "object size is negative (%d)", sizeBytes)
  1044. } else if sizeBytes == 0 {
  1045. return 0, nil
  1046. }
  1047. if rangeOpt == nil {
  1048. rangeOpt = &SGetObjectRange{
  1049. Start: 0,
  1050. End: sizeBytes - 1,
  1051. }
  1052. } else {
  1053. if rangeOpt.End < rangeOpt.Start {
  1054. tmp := rangeOpt.Start
  1055. rangeOpt.Start = rangeOpt.End
  1056. rangeOpt.End = tmp
  1057. }
  1058. if rangeOpt.End >= sizeBytes {
  1059. rangeOpt.End = sizeBytes - 1
  1060. }
  1061. if rangeOpt.Start < 0 {
  1062. rangeOpt.Start = 0
  1063. }
  1064. sizeBytes = rangeOpt.SizeBytes()
  1065. }
  1066. progresser := newDownloadProgresser(sizeBytes, callback)
  1067. defer progresser.Summary()
  1068. progressCallback := progresser.Progress
  1069. if sizeBytes < blocksz {
  1070. if debug {
  1071. log.Debugf("too small, download object in one shot")
  1072. }
  1073. size, err := downloadPartOfMultipart(ctx, bucket, key, rangeOpt, newWriter(output, outputOffset), 0, true, progressCallback)
  1074. if err != nil {
  1075. return 0, errors.Wrap(err, "downloadPartOfMultipart")
  1076. }
  1077. return size, nil
  1078. }
  1079. partSize := blocksz
  1080. partCount := sizeBytes / partSize
  1081. if partCount*partSize < sizeBytes {
  1082. partCount += 1
  1083. }
  1084. if debug {
  1085. log.Debugf("multipart download part count %d part size %d", partCount, partSize)
  1086. }
  1087. var errs []error
  1088. segSizes := make([]int64, partCount)
  1089. {
  1090. if parallel < 1 {
  1091. parallel = 1
  1092. }
  1093. queue := make(chan downloadPartOfMultipartJob, parallel)
  1094. wg := &sync.WaitGroup{}
  1095. for i := 0; i < parallel; i++ {
  1096. wg.Add(1)
  1097. go downloadPartOfMultipartWorker(wg, queue)
  1098. }
  1099. for i := 0; i < int(partCount); i += 1 {
  1100. dstOffset := outputOffset + int64(i)*partSize
  1101. start := rangeOpt.Start + int64(i)*partSize
  1102. if i == int(partCount)-1 {
  1103. partSize = sizeBytes - partSize*(partCount-1)
  1104. }
  1105. end := start + partSize - 1
  1106. srcRangeOpt := SGetObjectRange{
  1107. Start: start,
  1108. End: end,
  1109. }
  1110. partIndex := i
  1111. job := downloadPartOfMultipartJob{
  1112. ctx: ctx,
  1113. bucket: bucket,
  1114. key: key,
  1115. rangeOpt: &srcRangeOpt,
  1116. output: newWriter(output, dstOffset),
  1117. partIndex: partIndex,
  1118. debug: debug,
  1119. segSizes: segSizes,
  1120. errs: &errs,
  1121. callback: progressCallback,
  1122. }
  1123. queue <- job
  1124. }
  1125. close(queue)
  1126. wg.Wait()
  1127. }
  1128. if len(errs) > 0 {
  1129. return 0, errors.Wrap(errors.NewAggregate(errs), "downloadPartOfMultipart")
  1130. }
  1131. totalSize := int64(0)
  1132. for i := range segSizes {
  1133. totalSize += segSizes[i]
  1134. }
  1135. return totalSize, nil
  1136. }