| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464 |
- // Copyright 2023 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package volcengine
- import (
- "context"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "time"
- tos "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
- "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
- "yunion.io/x/cloudmux/pkg/cloudprovider"
- "yunion.io/x/cloudmux/pkg/multicloud"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/fileutils"
- )
- type SBucket struct {
- multicloud.SBaseBucket
- VolcEngineTags
- region *SRegion
- Name string
- Location string
- CreationDate time.Time
- }
- func (b *SBucket) GetProjectId() string {
- return ""
- }
- func (b *SBucket) GetGlobalId() string {
- return b.Name
- }
- func (b *SBucket) GetName() string {
- return b.Name
- }
- func (b *SBucket) GetLocation() string {
- return b.Location
- }
- func (b *SBucket) GetIRegion() cloudprovider.ICloudRegion {
- return b.region
- }
- func (b *SBucket) GetCreatedAt() time.Time {
- return b.CreationDate
- }
- func (b *SBucket) GetStorageClass() string {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return ""
- }
- input := &tos.HeadBucketInput{
- Bucket: b.Name,
- }
- output, err := toscli.HeadBucket(context.Background(), input)
- if err != nil {
- return ""
- }
- return string(output.StorageClass)
- }
- func (b *SBucket) GetStats() cloudprovider.SBucketStats {
- stats, _ := cloudprovider.GetIBucketStats(b)
- return stats
- }
- func (b *SBucket) GetAccessUrls() []cloudprovider.SBucketAccessUrl {
- ret := []cloudprovider.SBucketAccessUrl{
- {
- Url: fmt.Sprintf("%s.%s", b.Name, b.region.getTOSExternalDomain()),
- Description: "ExtranetEndpoint",
- Primary: true,
- },
- {
- Url: fmt.Sprintf("%s.%s", b.Name, b.region.getTOSInternalDomain()),
- Description: "IntranetEndpoint",
- },
- }
- return ret
- }
- func grantToCannedAcl(acls []tos.GrantV2) cloudprovider.TBucketACLType {
- isWrite, isRead := false, false
- for _, acl := range acls {
- if acl.GranteeV2.Type != enum.GranteeGroup || acl.GranteeV2.Canned != enum.CannedAllUsers {
- continue
- }
- switch acl.Permission {
- case enum.PermissionWrite:
- isWrite = true
- case enum.PermissionRead:
- isRead = true
- }
- }
- if isWrite && isRead {
- return cloudprovider.ACLPublicReadWrite
- }
- if isRead {
- return cloudprovider.ACLPublicRead
- }
- return cloudprovider.ACLPrivate
- }
- func (b *SBucket) GetAcl() cloudprovider.TBucketACLType {
- acl := cloudprovider.ACLPrivate
- toscli, err := b.region.GetTosClient()
- if err != nil {
- log.Errorf("GetTosClient fail %s", err)
- return acl
- }
- input := tos.GetBucketACLInput{}
- input.Bucket = b.Name
- output, err := toscli.GetBucketACL(context.Background(), &input)
- if err != nil {
- log.Errorf("toscli.GetBucketAcl fail %s", err)
- return acl
- }
- return grantToCannedAcl(output.Grants)
- }
- func (b *SBucket) SetAcl(aclStr cloudprovider.TBucketACLType) error {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return errors.Wrapf(err, "Get TosClient")
- }
- input := tos.PutBucketACLInput{}
- input.Bucket = b.Name
- input.ACLType = enum.ACLType(aclStr)
- _, err = toscli.PutBucketACL(context.Background(), &input)
- if err != nil {
- return errors.Wrapf(err, "PutBucketAcl")
- }
- return nil
- }
- func (b *SBucket) NewMultipartUpload(ctx context.Context, key string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) (string, error) {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return "", errors.Wrapf(err, "GetTosClient")
- }
- input := &tos.CreateMultipartUploadV2Input{
- Bucket: b.Name,
- Key: key,
- ACL: enum.ACLType(cannedAcl),
- StorageClass: enum.StorageClassType(storageClassStr),
- Meta: map[string]string{},
- }
- for k := range meta {
- input.Meta[k] = meta.Get(k)
- }
- output, err := toscli.CreateMultipartUploadV2(ctx, input)
- if err != nil {
- return "", err
- }
- return output.UploadID, nil
- }
- func (b *SBucket) AbortMultipartUpload(ctx context.Context, key string, uploadId string) error {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return errors.Wrapf(err, "GetTosClient")
- }
- _, err = toscli.AbortMultipartUpload(ctx, &tos.AbortMultipartUploadInput{Bucket: b.Name, Key: key, UploadID: uploadId})
- if err != nil {
- return errors.Wrapf(err, "AbortMultipartUploadWithContext")
- }
- return nil
- }
- func (b *SBucket) CompleteMultipartUpload(ctx context.Context, key string, uploadId string, partEtags []string) error {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return errors.Wrap(err, "GetTosClient")
- }
- parts := make([]tos.UploadedPartV2, len(partEtags))
- for i := range partEtags {
- parts[i].PartNumber = int(i + 1)
- parts[i].ETag = partEtags[i]
- }
- _, err = toscli.CompleteMultipartUploadV2(ctx, &tos.CompleteMultipartUploadV2Input{Bucket: b.Name, Key: key, UploadID: uploadId, Parts: parts})
- if err != nil {
- return errors.Wrapf(err, "CompleteMultipartUploadV2")
- }
- return nil
- }
- func (b *SBucket) CopyObject(ctx context.Context, destKey string, srcBucket, srcKey string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return errors.Wrap(err, "GetTosClient")
- }
- if len(cannedAcl) == 0 {
- cannedAcl = b.GetAcl()
- }
- var metaDir string
- metaHdr := make(map[string]string)
- cacheControl := ""
- if meta != nil {
- for k, v := range meta {
- if len(v) == 0 || len(v[0]) == 0 {
- continue
- }
- switch http.CanonicalHeaderKey(k) {
- case cloudprovider.META_HEADER_CACHE_CONTROL:
- cacheControl = v[0]
- case cloudprovider.META_HEADER_CONTENT_TYPE:
- cacheControl = v[0]
- case cloudprovider.META_HEADER_CONTENT_LANGUAGE:
- cacheControl = v[0]
- case cloudprovider.META_HEADER_CONTENT_ENCODING:
- cacheControl = v[0]
- case cloudprovider.META_HEADER_CONTENT_DISPOSITION:
- cacheControl = v[0]
- default:
- metaHdr[k] = v[0]
- }
- }
- metaDir = "REPLACE"
- } else {
- metaDir = "COPY"
- }
- input := tos.CopyObjectInput{SrcBucket: srcBucket, Bucket: b.Name, Key: destKey, SrcKey: url.PathEscape(srcKey), StorageClass: enum.StorageClassType(storageClassStr), ACL: enum.ACLType(cannedAcl), MetadataDirective: enum.MetadataDirectiveType(metaDir)}
- if len(cacheControl) > 0 {
- input.CacheControl = cacheControl
- }
- if len(metaHdr) > 0 {
- input.Meta = metaHdr
- }
- _, err = toscli.CopyObject(ctx, &input)
- if err != nil {
- return errors.Wrapf(err, "CopyObject")
- }
- return nil
- }
- func (b *SBucket) CopyPart(ctx context.Context, key string, uploadId string, partNumber int, srcBucket string, srcKey string, srcOffset int64, srcLength int64) (string, error) {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return "", errors.Wrap(err, "GetTosClient")
- }
- input := tos.UploadPartCopyV2Input{}
- input.Bucket = b.Name
- input.Key = key
- input.UploadID = uploadId
- input.PartNumber = partNumber
- input.SrcBucket = srcBucket
- input.SrcKey = srcKey
- if srcLength > 0 {
- input.CopySourceRange = fmt.Sprintf("bytes=%d-%d", srcOffset, srcOffset+srcLength-1)
- }
- output, err := toscli.UploadPartCopyV2(ctx, &input)
- if err != nil {
- return "", errors.Wrapf(err, "CopyPart")
- }
- return output.ETag, nil
- }
- func (b *SBucket) UploadPart(ctx context.Context, key string, uploadId string, partIndex int, part io.Reader, partSize int64, offset, totalSize int64) (string, error) {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return "", errors.Wrap(err, "GetTosClient")
- }
- input := tos.UploadPartV2Input{}
- input.Bucket = b.Name
- input.Key = key
- input.UploadID = uploadId
- input.PartNumber = int(partIndex)
- seeker, err := fileutils.NewReadSeeker(part, partSize)
- if err != nil {
- return "", errors.Wrap(err, "newFakeSeeker")
- }
- defer seeker.Close()
- input.Content = seeker
- input.ContentLength = partSize
- output, err := toscli.UploadPartV2(ctx, &input)
- if err != nil {
- return "", errors.Wrapf(err, "UploadPart")
- }
- return output.ETag, nil
- }
- func (b *SBucket) DeleteObject(ctx context.Context, key string) error {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return errors.Wrap(err, "GetTosClient")
- }
- input := tos.DeleteObjectV2Input{
- Bucket: b.Name,
- Key: key,
- }
- _, err = toscli.DeleteObjectV2(ctx, &input)
- if err != nil {
- return errors.Wrap(err, "DeleteObject")
- }
- return nil
- }
- func (b *SBucket) GetObject(ctx context.Context, key string, rangeOpt *cloudprovider.SGetObjectRange) (io.ReadCloser, error) {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return nil, errors.Wrap(err, "GetTosClient")
- }
- input := tos.GetObjectV2Input{
- Bucket: b.Name,
- Key: key,
- RangeStart: rangeOpt.Start,
- RangeEnd: rangeOpt.End,
- }
- output, err := toscli.GetObjectV2(ctx, &input)
- if err != nil {
- return nil, errors.Wrap(err, "DeleteObject")
- }
- return output.Content, nil
- }
- func (b *SBucket) ListObjects(prefix string, marker string, delimiter string, maxCount int) (cloudprovider.SListObjectResult, error) {
- result := cloudprovider.SListObjectResult{}
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return result, errors.Wrap(err, "GetTosClient")
- }
- input := tos.ListObjectsV2Input{}
- input.Bucket = b.Name
- if len(prefix) > 0 {
- input.Prefix = prefix
- }
- if len(marker) > 0 {
- input.Marker = marker
- }
- if len(delimiter) > 0 {
- input.Delimiter = delimiter
- }
- if maxCount > 0 {
- input.MaxKeys = maxCount
- }
- output, err := toscli.ListObjectsV2(context.Background(), &input)
- if err != nil {
- return result, errors.Wrap(err, "ListObjects")
- }
- for _, object := range output.Contents {
- obj := &SObject{
- bucket: b,
- SBaseCloudObject: cloudprovider.SBaseCloudObject{
- StorageClass: string(object.StorageClass),
- Key: object.Key,
- SizeBytes: object.Size,
- ETag: object.ETag,
- LastModified: object.LastModified,
- },
- }
- result.Objects = append(result.Objects, obj)
- }
- if output.CommonPrefixes != nil {
- result.CommonPrefixes = make([]cloudprovider.ICloudObject, len(output.CommonPrefixes))
- for i, commonPrefix := range output.CommonPrefixes {
- result.CommonPrefixes[i] = &SObject{
- bucket: b,
- SBaseCloudObject: cloudprovider.SBaseCloudObject{Key: commonPrefix.Prefix},
- }
- }
- }
- result.IsTruncated = output.IsTruncated
- result.NextMarker = output.NextMarker
- return result, nil
- }
- func (b *SBucket) GetTempUrl(method string, key string, expire time.Duration) (string, error) {
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return "", errors.Wrapf(err, "GetTosClient")
- }
- input := &tos.PreSignedURLInput{
- HTTPMethod: enum.HttpMethodGet,
- Bucket: b.Name,
- Key: key,
- Expires: int64(expire.Seconds()),
- }
- output, err := toscli.PreSignedURL(input)
- if err != nil {
- return "", err
- }
- return output.SignedUrl, nil
- }
- func (b *SBucket) PutObject(ctx context.Context, key string, body io.Reader, sizeBytes int64, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
- if sizeBytes < 0 {
- return errors.Error("context length expected")
- }
- toscli, err := b.region.GetTosClient()
- if err != nil {
- return errors.Wrapf(err, "GetTosClient")
- }
- input := tos.PutObjectV2Input{}
- input.Bucket = b.Name
- input.Key = key
- seeker, err := fileutils.NewReadSeeker(body, sizeBytes)
- if err != nil {
- return errors.Wrap(err, "newFakeSeeker")
- }
- defer seeker.Close()
- input.Content = body
- input.ContentLength = sizeBytes
- if meta != nil {
- metaHdr := make(map[string]string)
- for k, v := range meta {
- if len(v) == 0 || len(v[0]) == 0 {
- continue
- }
- switch http.CanonicalHeaderKey(k) {
- case cloudprovider.META_HEADER_CACHE_CONTROL:
- input.CacheControl = v[0]
- case cloudprovider.META_HEADER_CONTENT_TYPE:
- input.ContentType = v[0]
- case cloudprovider.META_HEADER_CONTENT_MD5:
- input.ContentMD5 = v[0]
- case cloudprovider.META_HEADER_CONTENT_LANGUAGE:
- input.ContentEncoding = v[0]
- case cloudprovider.META_HEADER_CONTENT_ENCODING:
- input.ContentDisposition = v[0]
- default:
- metaHdr[k] = v[0]
- }
- }
- if len(metaHdr) > 0 {
- input.Meta = metaHdr
- }
- }
- if len(cannedAcl) > 0 {
- cannedAcl = b.GetAcl()
- }
- input.ACL = enum.ACLType(cannedAcl)
- if len(storageClassStr) > 0 {
- input.StorageClass = enum.StorageClassType(storageClassStr)
- }
- _, err = toscli.PutObjectV2(ctx, &input)
- if err != nil {
- return errors.Wrapf(err, "PutObject")
- }
- return nil
- }
|