| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package cloudprovider
- import (
- "context"
- "fmt"
- "io"
- "net/http"
- "regexp"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/streamutils"
- "yunion.io/x/s3cli"
- )
- type TBucketACLType string
- const (
- // 50 MB
- MAX_PUT_OBJECT_SIZEBYTES = int64(1024 * 1024 * 50)
- // ACLDefault = TBucketACLType("default")
- ACLPrivate = TBucketACLType(s3cli.CANNED_ACL_PRIVATE)
- ACLAuthRead = TBucketACLType(s3cli.CANNED_ACL_AUTH_READ)
- ACLPublicRead = TBucketACLType(s3cli.CANNED_ACL_PUBLIC_READ)
- ACLPublicReadWrite = TBucketACLType(s3cli.CANNED_ACL_PUBLIC_READ_WRITE)
- ACLUnknown = TBucketACLType("")
- META_HEADER_CACHE_CONTROL = "Cache-Control"
- META_HEADER_CONTENT_TYPE = "Content-Type"
- META_HEADER_CONTENT_DISPOSITION = "Content-Disposition"
- META_HEADER_CONTENT_ENCODING = "Content-Encoding"
- META_HEADER_CONTENT_LANGUAGE = "Content-Language"
- META_HEADER_CONTENT_MD5 = "Content-MD5"
- META_HEADER_PREFIX = "X-Yunion-Meta-"
- )
- type SBucketStats struct {
- SizeBytes int64
- ObjectCount int
- }
- func (s SBucketStats) Equals(s2 SBucketStats) bool {
- if s.SizeBytes == s2.SizeBytes && s.ObjectCount == s2.ObjectCount {
- return true
- } else {
- return false
- }
- }
- type SBucketAccessUrl struct {
- Url string
- Description string
- Primary bool
- }
- type SBucketWebsiteRoutingRule struct {
- ConditionErrorCode string
- ConditionPrefix string
- RedirectProtocol string
- RedirectReplaceKey string
- RedirectReplaceKeyPrefix string
- }
- type SBucketWebsiteConf struct {
- // 主页
- Index string
- // 错误时返回的文档
- ErrorDocument string
- // http或https
- Protocol string
- Rules []SBucketWebsiteRoutingRule
- // 网站访问url,一般由bucketid,region等组成
- Url string
- }
- type SBucketCORSRule struct {
- AllowedMethods []string
- // 允许的源站,可以设为*
- AllowedOrigins []string
- AllowedHeaders []string
- MaxAgeSeconds int
- ExposeHeaders []string
- // 规则区别标识
- Id string
- }
- type SBucketRefererConf struct {
- // 域名列表
- DomainList []string
- // 域名列表
- // enmu: Black-List, White-List
- RefererType string
- // 是否允许空referer 访问
- AllowEmptyRefer bool
- Enabled bool
- }
- type SBucketPolicyStatement struct {
- // 授权的目标主体
- Principal map[string][]string `json:"Principal,omitempty"`
- // 授权的行为
- Action []string `json:"Action,omitempty"`
- // Allow|Deny
- Effect string `json:"Effect,omitempty"`
- // 被授权的资源
- Resource []string `json:"Resource,omitempty"`
- // 触发授权的条件
- Condition map[string]map[string]interface{} `json:"Condition,omitempty"`
- // 解析字段,主账号id:子账号id
- PrincipalId []string
- // map[主账号id:子账号id]子账号名称
- PrincipalNames map[string]string
- // Read|ReadWrite|FullControl
- CannedAction string
- // 资源路径
- ResourcePath []string
- // 根据index 生成
- Id string
- }
- type SBucketPolicyStatementInput struct {
- // 主账号id:子账号id
- PrincipalId []string
- // Read|ReadWrite|FullControl
- CannedAction string
- // Allow|Deny
- Effect string
- // 被授权的资源地址,/*
- ResourcePath []string
- // ip 条件
- IpEquals []string
- IpNotEquals []string
- // 触发授权的条件
- Condition map[string]map[string]interface{}
- }
- type SBucketMultipartUploads struct {
- // object name
- ObjectName string
- UploadID string
- // 发起人
- Initiator string
- // 发起时间
- Initiated time.Time
- }
- type SBaseCloudObject struct {
- Key string
- SizeBytes int64
- StorageClass string
- ETag string
- LastModified time.Time
- Meta http.Header
- }
- type SListObjectResult struct {
- Objects []ICloudObject
- NextMarker string
- CommonPrefixes []ICloudObject
- IsTruncated bool
- }
- // range start from 0
- type SGetObjectRange struct {
- Start int64
- End int64
- }
- func (r SGetObjectRange) SizeBytes() int64 {
- return r.End - r.Start + 1
- }
- var (
- rangeExp = regexp.MustCompile(`(bytes=)?(\d*)-(\d*)`)
- )
- func ParseRange(rangeStr string) SGetObjectRange {
- objRange := SGetObjectRange{}
- if len(rangeStr) > 0 {
- find := rangeExp.FindAllStringSubmatch(rangeStr, -1)
- if len(find) > 0 && len(find[0]) > 3 {
- objRange.Start, _ = strconv.ParseInt(find[0][2], 10, 64)
- objRange.End, _ = strconv.ParseInt(find[0][3], 10, 64)
- }
- }
- return objRange
- }
- func (r SGetObjectRange) String() string {
- if r.Start > 0 && r.End > 0 {
- return fmt.Sprintf("bytes=%d-%d", r.Start, r.End)
- } else if r.Start > 0 && r.End <= 0 {
- return fmt.Sprintf("bytes=%d-", r.Start)
- } else if r.Start <= 0 && r.End > 0 {
- return fmt.Sprintf("bytes=0-%d", r.End)
- } else {
- return ""
- }
- }
- type ICloudBucket interface {
- IVirtualResource
- MaxPartCount() int
- MaxPartSizeBytes() int64
- //GetGlobalId() string
- //GetName() string
- GetAcl() TBucketACLType
- GetLocation() string
- GetIRegion() ICloudRegion
- GetStorageClass() string
- GetAccessUrls() []SBucketAccessUrl
- GetStats() SBucketStats
- GetLimit() SBucketStats
- SetLimit(limit SBucketStats) error
- LimitSupport() SBucketStats
- SetAcl(acl TBucketACLType) error
- ListObjects(prefix string, marker string, delimiter string, maxCount int) (SListObjectResult, error)
- CopyObject(ctx context.Context, destKey string, srcBucket, srcKey string, cannedAcl TBucketACLType, storageClassStr string, meta http.Header) error
- GetObject(ctx context.Context, key string, rangeOpt *SGetObjectRange) (io.ReadCloser, error)
- DeleteObject(ctx context.Context, keys string) error
- GetTempUrl(method string, key string, expire time.Duration) (string, error)
- PutObject(ctx context.Context, key string, input io.Reader, sizeBytes int64, cannedAcl TBucketACLType, storageClassStr string, meta http.Header) error
- NewMultipartUpload(ctx context.Context, key string, cannedAcl TBucketACLType, storageClassStr string, meta http.Header) (string, error)
- UploadPart(ctx context.Context, key string, uploadId string, partIndex int, input io.Reader, partSize int64, offset, totalSize int64) (string, error)
- CopyPart(ctx context.Context, key string, uploadId string, partIndex int, srcBucketName string, srcKey string, srcOffset int64, srcLength int64) (string, error)
- CompleteMultipartUpload(ctx context.Context, key string, uploadId string, partEtags []string) error
- AbortMultipartUpload(ctx context.Context, key string, uploadId string) error
- SetWebsite(conf SBucketWebsiteConf) error
- GetWebsiteConf() (SBucketWebsiteConf, error)
- DeleteWebSiteConf() error
- SetCORS(rules []SBucketCORSRule) error
- GetCORSRules() ([]SBucketCORSRule, error)
- DeleteCORS() error
- SetReferer(conf SBucketRefererConf) error
- GetReferer() (SBucketRefererConf, error)
- GetCdnDomains() ([]SCdnDomain, error)
- GetPolicy() ([]SBucketPolicyStatement, error)
- SetPolicy(policy SBucketPolicyStatementInput) error
- DeletePolicy(id []string) ([]SBucketPolicyStatement, error)
- ListMultipartUploads() ([]SBucketMultipartUploads, error)
- }
- type ICloudObject interface {
- GetIBucket() ICloudBucket
- GetKey() string
- GetSizeBytes() int64
- GetLastModified() time.Time
- GetStorageClass() string
- GetETag() string
- GetMeta() http.Header
- SetMeta(ctx context.Context, meta http.Header) error
- GetAcl() TBucketACLType
- SetAcl(acl TBucketACLType) error
- }
- type SCloudObject struct {
- Key string
- SizeBytes int64
- StorageClass string
- ETag string
- LastModified time.Time
- Meta http.Header
- Acl string
- }
- func ICloudObject2Struct(obj ICloudObject) SCloudObject {
- return SCloudObject{
- Key: obj.GetKey(),
- SizeBytes: obj.GetSizeBytes(),
- StorageClass: obj.GetStorageClass(),
- ETag: obj.GetETag(),
- LastModified: obj.GetLastModified(),
- Meta: obj.GetMeta(),
- Acl: string(obj.GetAcl()),
- }
- }
- func ICloudObject2JSONObject(obj ICloudObject) jsonutils.JSONObject {
- return jsonutils.Marshal(ICloudObject2Struct(obj))
- }
- func (o *SBaseCloudObject) GetKey() string {
- return o.Key
- }
- func (o *SBaseCloudObject) GetSizeBytes() int64 {
- return o.SizeBytes
- }
- func (o *SBaseCloudObject) GetLastModified() time.Time {
- return o.LastModified
- }
- func (o *SBaseCloudObject) GetStorageClass() string {
- return o.StorageClass
- }
- func (o *SBaseCloudObject) GetETag() string {
- return o.ETag
- }
- func (o *SBaseCloudObject) GetMeta() http.Header {
- return o.Meta
- }
- //func (o *SBaseCloudObject) SetMeta(meta http.Header) error {
- // return nil
- //}
- func GetIBucketById(region ICloudRegion, name string) (ICloudBucket, error) {
- buckets, err := region.GetIBuckets()
- if err != nil {
- return nil, errors.Wrap(err, "region.GetIBuckets")
- }
- for i := range buckets {
- if buckets[i].GetGlobalId() == name {
- return buckets[i], nil
- }
- }
- return nil, ErrNotFound
- }
- func GetIBucketByName(region ICloudRegion, name string) (ICloudBucket, error) {
- buckets, err := region.GetIBuckets()
- if err != nil {
- return nil, errors.Wrap(err, "region.GetIBuckets")
- }
- for i := range buckets {
- if buckets[i].GetName() == name {
- return buckets[i], nil
- }
- }
- return nil, ErrNotFound
- }
- func GetIBucketStats(bucket ICloudBucket) (SBucketStats, error) {
- stats := SBucketStats{
- ObjectCount: -1,
- SizeBytes: -1,
- }
- objs, err := bucket.ListObjects("", "", "", 1000)
- if err != nil {
- return stats, errors.Wrap(err, "GetIObjects")
- }
- if objs.IsTruncated {
- return stats, errors.Wrap(ErrTooLarge, "too many objects")
- }
- stats.ObjectCount, stats.SizeBytes = 0, 0
- for _, obj := range objs.Objects {
- stats.SizeBytes += obj.GetSizeBytes()
- stats.ObjectCount += 1
- }
- return stats, nil
- }
- type cloudObjectList []ICloudObject
- func (a cloudObjectList) Len() int { return len(a) }
- func (a cloudObjectList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a cloudObjectList) Less(i, j int) bool { return a[i].GetKey() < a[j].GetKey() }
- func GetPagedObjects(bucket ICloudBucket, objectPrefix string, isRecursive bool, marker string, maxCount int) ([]ICloudObject, string, error) {
- delimiter := "/"
- if isRecursive {
- delimiter = ""
- }
- if maxCount > 1000 || maxCount <= 0 {
- maxCount = 1000
- }
- ret := make([]ICloudObject, 0)
- result, err := bucket.ListObjects(objectPrefix, marker, delimiter, maxCount)
- if err != nil {
- return nil, "", errors.Wrap(err, "bucket.ListObjects")
- }
- // Send all objects
- for i := range result.Objects {
- // if delimited, skip the first object ends with delimiter
- if !isRecursive && result.Objects[i].GetKey() == objectPrefix && strings.HasSuffix(objectPrefix, delimiter) {
- continue
- }
- ret = append(ret, result.Objects[i])
- marker = result.Objects[i].GetKey()
- }
- // Send all common prefixes if any.
- // NOTE: prefixes are only present if the request is delimited.
- if len(result.CommonPrefixes) > 0 {
- ret = append(ret, result.CommonPrefixes...)
- }
- // sort prefix by name in ascending order
- sort.Sort(cloudObjectList(ret))
- // If next marker present, save it for next request.
- if result.NextMarker != "" {
- marker = result.NextMarker
- }
- // If not truncated, no more objects
- if !result.IsTruncated {
- marker = ""
- }
- return ret, marker, nil
- }
- func GetAllObjects(bucket ICloudBucket, objectPrefix string, isRecursive bool) ([]ICloudObject, error) {
- ret := make([]ICloudObject, 0)
- // Save marker for next request.
- var marker string
- for {
- // Get list of objects a maximum of 1000 per request.
- result, marker, err := GetPagedObjects(bucket, objectPrefix, isRecursive, marker, 1000)
- if err != nil {
- return nil, errors.Wrap(err, "bucket.ListObjects")
- }
- ret = append(ret, result...)
- if marker == "" {
- break
- }
- }
- return ret, nil
- }
- func GetIObject(bucket ICloudBucket, objectPrefix string) (ICloudObject, error) {
- tryPrefix := []string{objectPrefix}
- if strings.HasSuffix(objectPrefix, "/") {
- tryPrefix = append(tryPrefix, objectPrefix[:len(objectPrefix)-1])
- }
- for _, pref := range tryPrefix {
- result, err := bucket.ListObjects(pref, "", "", 1)
- if err != nil {
- return nil, errors.Wrap(err, "bucket.ListObjects")
- }
- objects := result.Objects
- if len(objects) > 0 && objects[0].GetKey() == objectPrefix {
- return objects[0], nil
- }
- }
- return nil, ErrNotFound
- }
- func Makedir(ctx context.Context, bucket ICloudBucket, key string) error {
- segs := make([]string, 0)
- for _, seg := range strings.Split(key, "/") {
- if len(seg) > 0 {
- segs = append(segs, seg)
- }
- }
- path := strings.Join(segs, "/") + "/"
- err := bucket.PutObject(ctx, path, strings.NewReader(""), 0, bucket.GetAcl(), "", nil)
- if err != nil {
- return errors.Wrap(err, "PutObject")
- }
- return nil
- }
- func UploadObject(ctx context.Context, bucket ICloudBucket, key string, blocksz int64, input io.Reader, sizeBytes int64, cannedAcl TBucketACLType, storageClass string, meta http.Header, debug bool) error {
- return UploadObjectParallel(ctx, bucket, key, blocksz, newReaderAt(input), sizeBytes, cannedAcl, storageClass, meta, debug, 1)
- }
- type sSeqReader struct {
- reader io.Reader
- offset int64
- }
- func newReaderAt(input io.Reader) io.ReaderAt {
- return &sSeqReader{
- reader: input,
- offset: 0,
- }
- }
- func (sr *sSeqReader) ReadAt(p []byte, offset int64) (int, error) {
- return sr.reader.Read(p)
- }
- type sOffsetReader struct {
- readerAt io.ReaderAt
- offset int64
- }
- func newReader(input io.ReaderAt, inputOffset int64) io.Reader {
- return &sOffsetReader{
- readerAt: input,
- offset: inputOffset,
- }
- }
- func (or *sOffsetReader) Read(p []byte) (int, error) {
- n, err := or.readerAt.ReadAt(p, or.offset)
- or.offset += int64(n)
- return n, err
- }
- type uploadPartOfMultipartJob struct {
- ctx context.Context
- bucket ICloudBucket
- key string
- input io.Reader
- sizeBytes int64
- uploadId string
- partIndex int
- partSize int64
- offset int64
- debug bool
- etags []string
- errs *[]error
- }
- func uploadPartOfMultipartWorker(wg *sync.WaitGroup, queue chan uploadPartOfMultipartJob) {
- defer wg.Done()
- for job := range queue {
- tag, err := uploadPartOfMultipart(job.ctx, job.bucket, job.key, job.input, job.sizeBytes, job.uploadId, job.partIndex, job.partSize, job.offset, job.debug)
- if err != nil {
- *job.errs = append(*job.errs, err)
- } else {
- job.etags[job.partIndex] = tag
- }
- }
- }
- func uploadPartOfMultipart(ctx context.Context, bucket ICloudBucket, key string, input io.Reader, sizeBytes int64, uploadId string, partIndex int, partSize int64, offset int64, debug bool) (string, error) {
- var startAt time.Time
- if debug {
- startAt = time.Now()
- log.Debugf("UploadPart %d %d", partIndex+1, partSize)
- }
- etag, err := bucket.UploadPart(ctx, key, uploadId, partIndex+1, io.LimitReader(input, partSize), partSize, offset, sizeBytes)
- if err != nil {
- return "", errors.Wrapf(err, "bucket.UploadPart %d", partIndex)
- }
- if debug {
- duration := time.Since(startAt)
- rateMbps := calculateRateMbps(partSize, duration)
- log.Debugf("End of uploadPart %d %d takes %f seconds at %fMbps", partIndex+1, partSize, float64(duration)/float64(time.Second), rateMbps)
- }
- return etag, nil
- }
- func UploadObjectParallel(ctx context.Context, bucket ICloudBucket, key string, blocksz int64, input io.ReaderAt, sizeBytes int64, cannedAcl TBucketACLType, storageClass string, meta http.Header, debug bool, parallel int) error {
- if blocksz <= 0 {
- blocksz = MAX_PUT_OBJECT_SIZEBYTES
- }
- if sizeBytes < blocksz {
- if debug {
- log.Debugf("too small, put object in one shot")
- }
- return bucket.PutObject(ctx, key, newReader(input, 0), sizeBytes, cannedAcl, storageClass, meta)
- }
- partSize := blocksz
- partCount := sizeBytes / partSize
- if partCount*partSize < sizeBytes {
- partCount += 1
- }
- if partCount > int64(bucket.MaxPartCount()) {
- partCount = int64(bucket.MaxPartCount())
- partSize = sizeBytes / partCount
- if partSize*partCount < sizeBytes {
- partSize += 1
- }
- if partSize > bucket.MaxPartSizeBytes() {
- return errors.Error("too larget object")
- }
- }
- if debug {
- log.Debugf("multipart upload part count %d part size %d", partCount, partSize)
- }
- uploadId, err := bucket.NewMultipartUpload(ctx, key, cannedAcl, storageClass, meta)
- if err != nil {
- return errors.Wrap(err, "bucket.NewMultipartUpload")
- }
- etags := make([]string, partCount)
- var errs []error
- {
- if parallel < 1 {
- parallel = 1
- }
- queue := make(chan uploadPartOfMultipartJob, parallel)
- wg := &sync.WaitGroup{}
- for i := 0; i < parallel; i++ {
- wg.Add(1)
- go uploadPartOfMultipartWorker(wg, queue)
- }
- for i := 0; i < int(partCount); i += 1 {
- offset := int64(i) * partSize
- blockSize := partSize
- if i == int(partCount)-1 {
- blockSize = sizeBytes - partSize*(partCount-1)
- }
- partIndex := i
- job := uploadPartOfMultipartJob{
- ctx: ctx,
- bucket: bucket,
- key: key,
- input: newReader(input, offset),
- sizeBytes: sizeBytes,
- uploadId: uploadId,
- partIndex: partIndex,
- partSize: blockSize,
- offset: offset,
- debug: debug,
- etags: etags,
- errs: &errs,
- }
- queue <- job
- }
- close(queue)
- wg.Wait()
- }
- if len(errs) > 0 {
- // upload part error
- err2 := bucket.AbortMultipartUpload(ctx, key, uploadId)
- if err2 != nil {
- log.Errorf("bucket.AbortMultipartUpload error %s", err2)
- errs = append(errs, err2)
- }
- return errors.Wrap(errors.NewAggregate(errs), "uploadPartOfMultipart")
- }
- err = bucket.CompleteMultipartUpload(ctx, key, uploadId, etags)
- if err != nil {
- err2 := bucket.AbortMultipartUpload(ctx, key, uploadId)
- if err2 != nil {
- log.Errorf("bucket.AbortMultipartUpload error %s", err2)
- }
- return errors.Wrap(err, "CompleteMultipartUpload")
- }
- return nil
- }
- func DeletePrefix(ctx context.Context, bucket ICloudBucket, prefix string) error {
- objs, err := GetAllObjects(bucket, prefix, true)
- if err != nil {
- return errors.Wrap(err, "bucket.GetIObjects")
- }
- for i := range objs {
- err := bucket.DeleteObject(ctx, objs[i].GetKey())
- if err != nil {
- return errors.Wrap(err, "bucket.DeleteObject")
- }
- }
- return nil
- }
- func MergeMeta(src http.Header, dst http.Header) http.Header {
- if src != nil && dst != nil {
- ret := http.Header{}
- for k, vs := range src {
- for _, v := range vs {
- ret.Add(k, v)
- }
- }
- for k, vs := range dst {
- for _, v := range vs {
- ret.Add(k, v)
- }
- }
- return ret
- } else if src != nil && dst == nil {
- return src
- } else if src == nil && dst != nil {
- return dst
- } else {
- return nil
- }
- }
- func CopyObject(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, dstMeta http.Header, debug bool) error {
- return CopyObjectParallel(ctx, blocksz, dstBucket, dstKey, srcBucket, srcKey, dstMeta, debug, 1)
- }
- type copyPartOfMultipartJob struct {
- ctx context.Context
- dstBucket ICloudBucket
- dstKey string
- srcBucket ICloudBucket
- srcKey string
- rangeOpt *SGetObjectRange
- sizeBytes int64
- uploadId string
- partIndex int
- debug bool
- etags []string
- errs *[]error
- }
- func copyPartOfMultipartWorker(wg *sync.WaitGroup, queue chan copyPartOfMultipartJob) {
- defer wg.Done()
- for job := range queue {
- tag, err := copyPartOfMultipart(job.ctx, job.dstBucket, job.dstKey, job.srcBucket, job.srcKey, job.rangeOpt, job.sizeBytes, job.uploadId, job.partIndex, job.debug)
- if err != nil {
- *job.errs = append(*job.errs, err)
- } else {
- job.etags[job.partIndex] = tag
- }
- }
- }
- func copyPartOfMultipart(ctx context.Context, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, rangeOpt *SGetObjectRange, sizeBytes int64, uploadId string, partIndex int, debug bool) (string, error) {
- partSize := rangeOpt.SizeBytes()
- var startAt time.Time
- if debug {
- startAt = time.Now()
- log.Debugf("CopyPart %d %d range: %s (%d)", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes())
- }
- srcStream, err := srcBucket.GetObject(ctx, srcKey, rangeOpt)
- if err != nil {
- return "", errors.Wrapf(err, "srcBucket.GetObject %d", partIndex)
- }
- defer srcStream.Close()
- etag, err := dstBucket.UploadPart(ctx, dstKey, uploadId, partIndex+1, io.LimitReader(srcStream, partSize), partSize, rangeOpt.Start, sizeBytes)
- if err != nil {
- return "", errors.Wrapf(err, "dstBucket.UploadPart %d", partIndex)
- }
- if debug {
- duration := time.Since(startAt)
- rateMbps := calculateRateMbps(partSize, duration)
- log.Debugf("End of copyPart %d %d range: %s (%d) takes %d seconds at %fMbps", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes(), duration/time.Second, rateMbps)
- }
- return etag, nil
- }
- func CopyObjectParallel(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, dstMeta http.Header, debug bool, parallel int) error {
- srcObj, err := GetIObject(srcBucket, srcKey)
- if err != nil {
- return errors.Wrap(err, "GetIObject")
- }
- if blocksz <= 0 {
- blocksz = MAX_PUT_OBJECT_SIZEBYTES
- }
- sizeBytes := srcObj.GetSizeBytes()
- if sizeBytes < blocksz {
- if debug {
- log.Debugf("too small, copy object in one shot")
- }
- srcStream, err := srcBucket.GetObject(ctx, srcKey, nil)
- if err != nil {
- return errors.Wrap(err, "srcBucket.GetObject")
- }
- defer srcStream.Close()
- err = dstBucket.PutObject(ctx, dstKey, srcStream, sizeBytes, srcObj.GetAcl(), srcObj.GetStorageClass(), MergeMeta(srcObj.GetMeta(), dstMeta))
- if err != nil {
- return errors.Wrap(err, "dstBucket.PutObject")
- }
- return nil
- }
- partSize := blocksz
- partCount := sizeBytes / partSize
- if partCount*partSize < sizeBytes {
- partCount += 1
- }
- if partCount > int64(dstBucket.MaxPartCount()) {
- partCount = int64(dstBucket.MaxPartCount())
- partSize = sizeBytes / partCount
- if partSize*partCount < sizeBytes {
- partSize += 1
- }
- if partSize > dstBucket.MaxPartSizeBytes() {
- return errors.Error("too larget object")
- }
- }
- if debug {
- log.Debugf("multipart upload part count %d part size %d", partCount, partSize)
- }
- uploadId, err := dstBucket.NewMultipartUpload(ctx, dstKey, srcObj.GetAcl(), srcObj.GetStorageClass(), MergeMeta(srcObj.GetMeta(), dstMeta))
- if err != nil {
- return errors.Wrap(err, "bucket.NewMultipartUpload")
- }
- etags := make([]string, partCount)
- var errs []error
- {
- if parallel < 1 {
- parallel = 1
- }
- queue := make(chan copyPartOfMultipartJob, parallel)
- var wg sync.WaitGroup
- for i := 0; i < parallel; i++ {
- wg.Add(1)
- go copyPartOfMultipartWorker(&wg, queue)
- }
- for i := 0; i < int(partCount); i += 1 {
- start := int64(i) * partSize
- blockSize := partSize
- if i == int(partCount)-1 {
- blockSize = sizeBytes - partSize*(partCount-1)
- }
- end := start + blockSize - 1
- rangeOpt := SGetObjectRange{
- Start: start,
- End: end,
- }
- partIndex := i
- job := copyPartOfMultipartJob{
- ctx: ctx,
- dstBucket: dstBucket,
- dstKey: dstKey,
- srcBucket: srcBucket,
- srcKey: srcKey,
- rangeOpt: &rangeOpt,
- sizeBytes: sizeBytes,
- uploadId: uploadId,
- partIndex: partIndex,
- debug: debug,
- etags: etags,
- errs: &errs,
- }
- queue <- job
- }
- close(queue)
- wg.Wait()
- }
- if len(errs) > 0 {
- // upload part error
- err2 := dstBucket.AbortMultipartUpload(ctx, dstKey, uploadId)
- if err2 != nil {
- log.Errorf("bucket.AbortMultipartUpload error %s", err2)
- errs = append(errs, err2)
- }
- return errors.Wrap(errors.NewAggregate(errs), "copyPartOfMultipart")
- }
- err = dstBucket.CompleteMultipartUpload(ctx, dstKey, uploadId, etags)
- if err != nil {
- err2 := dstBucket.AbortMultipartUpload(ctx, dstKey, uploadId)
- if err2 != nil {
- log.Errorf("bucket.AbortMultipartUpload error %s", err2)
- }
- return errors.Wrap(err, "CompleteMultipartUpload")
- }
- return nil
- }
- func CopyPart(ctx context.Context,
- iDstBucket ICloudBucket, dstKey string, uploadId string, partNumber int,
- iSrcBucket ICloudBucket, srcKey string, rangeOpt *SGetObjectRange,
- ) (string, error) {
- return copyPartOfMultipart(ctx, iDstBucket, dstKey, iSrcBucket, srcKey, rangeOpt, 0, uploadId, partNumber, false)
- }
- func ObjectSetMeta(ctx context.Context,
- bucket ICloudBucket, obj ICloudObject,
- meta http.Header,
- ) error {
- return bucket.CopyObject(ctx, obj.GetKey(), bucket.GetName(), obj.GetKey(), obj.GetAcl(), obj.GetStorageClass(), meta)
- }
- func MetaToHttpHeader(metaPrefix string, meta http.Header) http.Header {
- hdr := http.Header{}
- for k, v := range meta {
- if len(v) == 0 || len(v[0]) == 0 {
- continue
- }
- k = http.CanonicalHeaderKey(k)
- switch k {
- case META_HEADER_CACHE_CONTROL,
- META_HEADER_CONTENT_TYPE,
- META_HEADER_CONTENT_DISPOSITION,
- META_HEADER_CONTENT_ENCODING,
- META_HEADER_CONTENT_LANGUAGE,
- META_HEADER_CONTENT_MD5:
- hdr.Set(k, v[0])
- default:
- hdr.Set(fmt.Sprintf("%s%s", metaPrefix, k), v[0])
- }
- }
- return hdr
- }
- func FetchMetaFromHttpHeader(metaPrefix string, headers http.Header) http.Header {
- metaPrefix = http.CanonicalHeaderKey(metaPrefix)
- meta := http.Header{}
- for hdr, vals := range headers {
- hdr = http.CanonicalHeaderKey(hdr)
- if strings.HasPrefix(hdr, metaPrefix) {
- for _, val := range vals {
- meta.Add(hdr[len(metaPrefix):], val)
- }
- }
- }
- for _, hdr := range []string{
- META_HEADER_CONTENT_TYPE,
- META_HEADER_CONTENT_ENCODING,
- META_HEADER_CONTENT_DISPOSITION,
- META_HEADER_CONTENT_LANGUAGE,
- META_HEADER_CACHE_CONTROL,
- } {
- val := headers.Get(hdr)
- if len(val) > 0 {
- meta.Set(hdr, val)
- }
- }
- return meta
- }
- func SetBucketCORS(ibucket ICloudBucket, rules []SBucketCORSRule) error {
- if len(rules) == 0 {
- return nil
- }
- oldRules, err := ibucket.GetCORSRules()
- if err != nil {
- return errors.Wrap(err, "ibucket.GetCORSRules()")
- }
- newSet := []SBucketCORSRule{}
- updateSet := map[int]SBucketCORSRule{}
- for i := range rules {
- index, err := strconv.Atoi(rules[i].Id)
- if err == nil && index < len(oldRules) {
- updateSet[index] = rules[i]
- } else {
- newSet = append(newSet, rules[i])
- }
- }
- updatedRules := []SBucketCORSRule{}
- for i := range oldRules {
- if _, ok := updateSet[i]; !ok {
- updatedRules = append(updatedRules, oldRules[i])
- } else {
- updatedRules = append(updatedRules, updateSet[i])
- }
- }
- updatedRules = append(updatedRules, newSet...)
- err = ibucket.SetCORS(updatedRules)
- if err != nil {
- return errors.Wrap(err, "ibucket.SetCORS(updatedRules)")
- }
- return nil
- }
- func DeleteBucketCORS(ibucket ICloudBucket, id []string) ([]SBucketCORSRule, error) {
- if len(id) == 0 {
- return nil, nil
- }
- deletedRules := []SBucketCORSRule{}
- oldRules, err := ibucket.GetCORSRules()
- if err != nil {
- return nil, errors.Wrap(err, "ibucket.GetCORSRules()")
- }
- excludeMap := map[int]bool{}
- for i := range id {
- index, err := strconv.Atoi(id[i])
- if err == nil && index < len(oldRules) {
- excludeMap[index] = true
- }
- }
- if len(excludeMap) == 0 {
- return nil, nil
- }
- newRules := []SBucketCORSRule{}
- for i := range oldRules {
- if _, ok := excludeMap[i]; !ok {
- newRules = append(newRules, oldRules[i])
- } else {
- deletedRules = append(deletedRules, oldRules[i])
- }
- }
- if len(newRules) == 0 {
- err = ibucket.DeleteCORS()
- if err != nil {
- return nil, errors.Wrapf(err, "ibucket.DeleteCORS()")
- }
- } else {
- err = ibucket.SetCORS(newRules)
- if err != nil {
- return nil, errors.Wrapf(err, "ibucket.SetBucketCORS(newRules)")
- }
- }
- return deletedRules, nil
- }
- func SetBucketTags(ctx context.Context, iBucket ICloudBucket, mangerId string, tags map[string]string) (TagsUpdateInfo, error) {
- ret := TagsUpdateInfo{}
- old, err := iBucket.GetTags()
- if err != nil {
- if errors.Cause(err) == ErrNotImplemented || errors.Cause(err) == ErrNotSupported {
- return ret, nil
- }
- return ret, errors.Wrapf(err, "iBucket.GetTags")
- }
- ret.OldTags, ret.NewTags = old, tags
- if !ret.IsChanged() {
- return ret, nil
- }
- return ret, SetTags(ctx, iBucket, mangerId, tags, true)
- }
- type sOffsetWriter struct {
- writerAt io.WriterAt
- offset int64
- }
- func newWriter(output io.WriterAt, outputOffset int64) io.Writer {
- return &sOffsetWriter{
- writerAt: output,
- offset: outputOffset,
- }
- }
- func (ow *sOffsetWriter) Write(p []byte) (int, error) {
- n, err := ow.writerAt.WriteAt(p, ow.offset)
- ow.offset += int64(n)
- return n, err
- }
- func calculateRateMbps(sizeBytes int64, duration time.Duration) float64 {
- return float64(sizeBytes*8/1000/1000) / (float64(duration) / float64(time.Second))
- }
- type downloadPartOfMultipartJob struct {
- ctx context.Context
- bucket ICloudBucket
- key string
- rangeOpt *SGetObjectRange
- output io.Writer
- partIndex int
- debug bool
- segSizes []int64
- errs *[]error
- callback func(saved int64, written int64)
- }
- func downloadPartOfMultipartWorker(wg *sync.WaitGroup, queue chan downloadPartOfMultipartJob) {
- defer wg.Done()
- for job := range queue {
- sz, err := downloadPartOfMultipart(job.ctx, job.bucket, job.key, job.rangeOpt, job.output, job.partIndex, job.debug, job.callback)
- if err != nil {
- *job.errs = append(*job.errs, err)
- } else {
- job.segSizes[job.partIndex] = sz
- }
- }
- }
- func downloadPartOfMultipart(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.Writer, partIndex int, debug bool, callback func(saved int64, written int64)) (int64, error) {
- partSize := rangeOpt.SizeBytes()
- var startAt time.Time
- if debug {
- startAt = time.Now()
- log.Debugf("downloadPart %d %d range: %s (%d)", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes())
- }
- stream, err := bucket.GetObject(ctx, key, rangeOpt)
- if err != nil {
- return 0, errors.Wrap(err, "bucket.GetObject")
- }
- defer stream.Close()
- prop, err := streamutils.StreamPipe2(stream, output, false, callback)
- if err != nil {
- return 0, errors.Wrap(err, "StreamPipe")
- }
- if debug {
- duration := time.Since(startAt)
- rateMbps := calculateRateMbps(partSize, duration)
- log.Debugf("End of downloadPart %d %d range: %s (%d) takes %f seconds at %fMbps", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes(), float64(duration)/float64(time.Second), rateMbps)
- }
- return prop.Size, nil
- }
- func DownloadObjectParallel(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.WriterAt, outputOffset int64, blocksz int64, debug bool, parallel int) (int64, error) {
- return DownloadObjectParallelWithProgress(ctx, bucket, key, rangeOpt, output, outputOffset, blocksz, debug, parallel, nil)
- }
- type sDownloadProgresser struct {
- totalSize int64
- progress int64
- startTime time.Time
- reportTime time.Time
- callback func(progress float64, progressMbps float64, totalSizeMb int64)
- }
- func newDownloadProgresser(totalSize int64, callback func(progress float64, progressMbps float64, totalSizeMb int64)) *sDownloadProgresser {
- return &sDownloadProgresser{
- totalSize: totalSize,
- startTime: time.Now(),
- reportTime: time.Now(),
- callback: callback,
- }
- }
- func (p *sDownloadProgresser) Progress(_ int64, written int64) {
- p.progress += written
- duration := time.Since(p.startTime)
- progress := float64(p.progress*100) / float64(p.totalSize)
- progressMbps := calculateRateMbps(p.progress, duration)
- if p.callback != nil {
- p.callback(progress, progressMbps, p.totalSize/1000/1000)
- }
- if time.Since(p.reportTime) > time.Second*5 {
- p.reportTime = time.Now()
- log.Infof("Download progress: %d/%d, %f%%, %fMbps", p.progress, p.totalSize, progress, progressMbps)
- }
- }
- func (p *sDownloadProgresser) Summary() {
- duration := time.Since(p.startTime)
- rateMbps := calculateRateMbps(p.progress, duration)
- log.Infof("End of download %d: downloaded %d takes %f seconds at %fMbps", p.totalSize, p.progress, float64(duration)/float64(time.Second), rateMbps)
- }
- func DownloadObjectParallelWithProgress(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.WriterAt, outputOffset int64, blocksz int64, debug bool, parallel int, callback func(progress float64, progressMbps float64, totalSizeMb int64)) (int64, error) {
- if debug {
- log.Debugf("DownloadObjectParallelWithProgress bucket: %s key: %s offset: %d blocksz: %d parallel: %d", bucket.GetName(), key, outputOffset, blocksz, parallel)
- }
- obj, err := GetIObject(bucket, key)
- if err != nil {
- return 0, errors.Wrap(err, "GetIObject")
- }
- if blocksz <= 0 {
- blocksz = MAX_PUT_OBJECT_SIZEBYTES
- }
- sizeBytes := obj.GetSizeBytes()
- if sizeBytes < 0 {
- return 0, errors.Wrapf(errors.ErrServer, "object size is negative (%d)", sizeBytes)
- } else if sizeBytes == 0 {
- return 0, nil
- }
- if rangeOpt == nil {
- rangeOpt = &SGetObjectRange{
- Start: 0,
- End: sizeBytes - 1,
- }
- } else {
- if rangeOpt.End < rangeOpt.Start {
- tmp := rangeOpt.Start
- rangeOpt.Start = rangeOpt.End
- rangeOpt.End = tmp
- }
- if rangeOpt.End >= sizeBytes {
- rangeOpt.End = sizeBytes - 1
- }
- if rangeOpt.Start < 0 {
- rangeOpt.Start = 0
- }
- sizeBytes = rangeOpt.SizeBytes()
- }
- progresser := newDownloadProgresser(sizeBytes, callback)
- defer progresser.Summary()
- progressCallback := progresser.Progress
- if sizeBytes < blocksz {
- if debug {
- log.Debugf("too small, download object in one shot")
- }
- size, err := downloadPartOfMultipart(ctx, bucket, key, rangeOpt, newWriter(output, outputOffset), 0, true, progressCallback)
- if err != nil {
- return 0, errors.Wrap(err, "downloadPartOfMultipart")
- }
- return size, nil
- }
- partSize := blocksz
- partCount := sizeBytes / partSize
- if partCount*partSize < sizeBytes {
- partCount += 1
- }
- if debug {
- log.Debugf("multipart download part count %d part size %d", partCount, partSize)
- }
- var errs []error
- segSizes := make([]int64, partCount)
- {
- if parallel < 1 {
- parallel = 1
- }
- queue := make(chan downloadPartOfMultipartJob, parallel)
- wg := &sync.WaitGroup{}
- for i := 0; i < parallel; i++ {
- wg.Add(1)
- go downloadPartOfMultipartWorker(wg, queue)
- }
- for i := 0; i < int(partCount); i += 1 {
- dstOffset := outputOffset + int64(i)*partSize
- start := rangeOpt.Start + int64(i)*partSize
- if i == int(partCount)-1 {
- partSize = sizeBytes - partSize*(partCount-1)
- }
- end := start + partSize - 1
- srcRangeOpt := SGetObjectRange{
- Start: start,
- End: end,
- }
- partIndex := i
- job := downloadPartOfMultipartJob{
- ctx: ctx,
- bucket: bucket,
- key: key,
- rangeOpt: &srcRangeOpt,
- output: newWriter(output, dstOffset),
- partIndex: partIndex,
- debug: debug,
- segSizes: segSizes,
- errs: &errs,
- callback: progressCallback,
- }
- queue <- job
- }
- close(queue)
- wg.Wait()
- }
- if len(errs) > 0 {
- return 0, errors.Wrap(errors.NewAggregate(errs), "downloadPartOfMultipart")
- }
- totalSize := int64(0)
- for i := range segSizes {
- totalSize += segSizes[i]
- }
- return totalSize, nil
- }
|