| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package google
- import (
- "context"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "strings"
- "time"
- "cloud.google.com/go/storage"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/utils"
- "yunion.io/x/cloudmux/pkg/cloudprovider"
- "yunion.io/x/cloudmux/pkg/multicloud"
- )
- type SLifecycleRuleAction struct {
- Type string
- }
- type SLifecycleRuleCondition struct {
- Age int
- }
- type SLifecycleRule struct {
- Action SLifecycleRuleAction
- Condition SLifecycleRuleCondition
- }
- type SBucketPolicyOnly struct {
- Enabled bool
- }
- type SUniformBucketLevelAccess struct {
- Enabled bool
- }
- type SIamConfiguration struct {
- BucketPolicyOnly SBucketPolicyOnly
- UniformBucketLevelAccess SUniformBucketLevelAccess
- }
- type SLifecycle struct {
- Rule []SLifecycleRule
- }
- type SBucket struct {
- multicloud.SBaseBucket
- GoogleTags
- region *SRegion
- Kind string
- SelfLink string
- Name string
- ProjectNumber string
- Metageneration string
- Location string
- StorageClass string
- Etag string
- TimeCreated time.Time
- Updated time.Time
- Lifecycle SLifecycle
- IamConfiguration SIamConfiguration
- LocationType string
- }
- func (b *SBucket) GetProjectId() string {
- return b.region.GetProjectId()
- }
- func (b *SBucket) GetAcl() cloudprovider.TBucketACLType {
- iam, err := b.region.GetBucketIam(b.Name)
- if err != nil {
- return cloudprovider.ACLUnknown
- }
- acl := cloudprovider.ACLPrivate
- allUsers := []SBucketBinding{}
- allAuthUsers := []SBucketBinding{}
- for _, binding := range iam.Bindings {
- if utils.IsInStringArray("allUsers", binding.Members) {
- allUsers = append(allUsers, binding)
- }
- if utils.IsInStringArray("allAuthenticatedUsers", binding.Members) {
- allAuthUsers = append(allAuthUsers, binding)
- }
- }
- for _, binding := range allUsers {
- switch binding.Role {
- case "roles/storage.admin", "roles/storage.objectAdmin":
- acl = cloudprovider.ACLPublicReadWrite
- case "roles/storage.objectViewer":
- if acl != cloudprovider.ACLPublicReadWrite {
- acl = cloudprovider.ACLPublicRead
- }
- }
- }
- for _, binding := range allAuthUsers {
- switch binding.Role {
- case "roles/storage.admin", "roles/storage.objectAdmin", "roles/storage.objectViewer":
- acl = cloudprovider.ACLAuthRead
- }
- }
- return acl
- }
- func (region *SRegion) SetBucketAcl(bucket string, acl cloudprovider.TBucketACLType) error {
- iam, err := region.GetBucketIam(bucket)
- if err != nil {
- return errors.Wrap(err, "GetBucketIam")
- }
- bindings := []SBucketBinding{}
- for _, binding := range iam.Bindings {
- if !utils.IsInStringArray(string(storage.AllUsers), binding.Members) && !utils.IsInStringArray(string(storage.AllAuthenticatedUsers), binding.Members) {
- bindings = append(bindings, binding)
- }
- }
- switch acl {
- case cloudprovider.ACLPrivate:
- if len(bindings) == len(iam.Bindings) {
- return nil
- }
- case cloudprovider.ACLAuthRead:
- bindings = append(bindings, SBucketBinding{
- Role: "roles/storage.objectViewer",
- Members: []string{"allAuthenticatedUsers"},
- })
- case cloudprovider.ACLPublicRead:
- bindings = append(bindings, SBucketBinding{
- Role: "roles/storage.objectViewer",
- Members: []string{"allUsers"},
- })
- case cloudprovider.ACLPublicReadWrite:
- bindings = append(bindings, SBucketBinding{
- Role: "roles/storage.objectAdmin",
- Members: []string{"allUsers"},
- })
- default:
- return fmt.Errorf("unknown acl %s", acl)
- }
- iam.Bindings = bindings
- _, err = region.SetBucketIam(bucket, iam)
- if err != nil {
- return errors.Wrap(err, "SetBucketIam")
- }
- return nil
- }
- func (b *SBucket) SetAcl(acl cloudprovider.TBucketACLType) error {
- return b.region.SetBucketAcl(b.Name, acl)
- }
- func (b *SBucket) GetGlobalId() string {
- return b.Name
- }
- func (b *SBucket) GetName() string {
- return b.Name
- }
- func (b *SBucket) GetLocation() string {
- return strings.ToLower(b.Location)
- }
- func (b *SBucket) GetIRegion() cloudprovider.ICloudRegion {
- return b.region
- }
- func (b *SBucket) GetCreatedAt() time.Time {
- return b.TimeCreated
- }
- func (b *SBucket) GetStorageClass() string {
- return b.StorageClass
- }
- func (b *SBucket) GetAccessUrls() []cloudprovider.SBucketAccessUrl {
- return []cloudprovider.SBucketAccessUrl{
- {
- Url: fmt.Sprintf("https://www.googleapis.com/storage/v1/b/%s", b.Name),
- Description: "bucket domain",
- Primary: true,
- },
- {
- Url: fmt.Sprintf("https://www.googleapis.com/upload/storage/v1/b/%s/o", b.Name),
- Description: "object upload endpoint",
- },
- {
- Url: fmt.Sprintf("https://www.googleapis.com/batch/storage/v1/b/%s", b.Name),
- Description: "batch operation",
- },
- }
- }
- func (b *SBucket) GetStats() cloudprovider.SBucketStats {
- stats, _ := cloudprovider.GetIBucketStats(b)
- return stats
- }
- func (b *SBucket) AbortMultipartUpload(ctx context.Context, key string, uploadId string) error {
- resource := fmt.Sprintf("b/%s/o?uploadType=resumable&upload_id=%s", b.Name, uploadId)
- return b.region.client.storageAbortUpload(resource)
- }
- func (b *SBucket) CompleteMultipartUpload(ctx context.Context, key string, uploadId string, partEtags []string) error {
- resource := fmt.Sprintf("b/%s/o/%s", b.Name, url.PathEscape(key))
- err := b.region.StorageGet(resource, nil)
- if err != nil {
- return errors.Wrapf(err, "failed to get object %s", key)
- }
- return nil
- }
- func (b *SBucket) CopyObject(ctx context.Context, destKey string, srcBucket, srcKey string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
- resource := fmt.Sprintf("b/%s/o/%s", srcBucket, url.PathEscape(srcKey))
- action := fmt.Sprintf("copyTo/b/%s/o/%s", b.Name, url.PathEscape(destKey))
- err := b.region.StorageDo(resource, action, nil, nil)
- if err != nil {
- return errors.Wrap(err, "CopyObject")
- }
- err = b.region.SetObjectAcl(b.Name, destKey, cannedAcl)
- if err != nil {
- return errors.Wrapf(err, "AddObjectAcl(%s)", cannedAcl)
- }
- err = b.region.SetObjectMeta(b.Name, destKey, meta)
- if err != nil {
- return errors.Wrap(err, "SetObjectMeta")
- }
- return nil
- }
- func (b *SBucket) CopyPart(ctx context.Context, key string, uploadId string, partNumber int, srcBucket string, srcKey string, srcOffset int64, srcLength int64) (string, error) {
- return "", cloudprovider.ErrNotSupported
- }
- func (region *SRegion) DeleteObject(bucket, key string) error {
- resource := fmt.Sprintf("b/%s/o/%s", bucket, url.PathEscape(key))
- return region.StorageDelete(resource)
- }
- func (b *SBucket) DeleteObject(ctx context.Context, key string) error {
- return b.region.DeleteObject(b.Name, key)
- }
- func (region *SRegion) DownloadObjectRange(bucket, object string, start, end int64) (io.ReadCloser, error) {
- resource := fmt.Sprintf("b/%s/o/%s?alt=media", bucket, url.PathEscape(object))
- header := http.Header{}
- if start <= 0 {
- if end > 0 {
- header.Set("Range", fmt.Sprintf("bytes=0-%d", end))
- } else {
- header.Set("Range", "bytes=-1")
- }
- } else {
- if end > start {
- header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
- } else {
- header.Set("Range", fmt.Sprintf("bytes=%d-", start))
- }
- }
- return region.client.storageDownload(resource, header)
- }
- func (b *SBucket) GetObject(ctx context.Context, key string, rangeOpt *cloudprovider.SGetObjectRange) (io.ReadCloser, error) {
- return b.region.DownloadObjectRange(b.Name, key, rangeOpt.Start, rangeOpt.End)
- }
- func (region *SRegion) SingedUrl(bucket, key string, method string, expire time.Duration) (string, error) {
- if expire > time.Hour*24*7 {
- return "", fmt.Errorf(`Expiration Time can\'t be longer than 604800 seconds (7 days)`)
- }
- opts := &storage.SignedURLOptions{
- Scheme: storage.SigningSchemeV4,
- Method: method,
- GoogleAccessID: region.client.clientEmail,
- PrivateKey: []byte(region.client.privateKey),
- Expires: time.Now().Add(expire),
- }
- switch method {
- case "GET":
- case "PUT":
- opts.Headers = []string{"Content-Type:application/octet-stream"}
- default:
- return "", errors.Wrapf(cloudprovider.ErrNotSupported, "Not support method %s", method)
- }
- return storage.SignedURL(bucket, key, opts)
- }
- func (b *SBucket) GetTempUrl(method string, key string, expire time.Duration) (string, error) {
- return b.region.SingedUrl(b.Name, key, method, expire)
- }
- func (b *SBucket) ListObjects(prefix string, marker string, delimiter string, maxCount int) (cloudprovider.SListObjectResult, error) {
- result := cloudprovider.SListObjectResult{}
- objs, err := b.region.GetObjects(b.Name, prefix, marker, delimiter, maxCount)
- if err != nil {
- return result, errors.Wrap(err, "GetObjects")
- }
- result.NextMarker = objs.NextPageToken
- log.Errorf("obj count: %d", len(objs.Items))
- result.Objects = []cloudprovider.ICloudObject{}
- result.CommonPrefixes = []cloudprovider.ICloudObject{}
- for i := range objs.Items {
- if strings.HasSuffix(objs.Items[i].Name, "/") {
- continue
- }
- objs.Items[i].bucket = b
- result.Objects = append(result.Objects, &objs.Items[i])
- }
- for i := range objs.Prefixes {
- obj := &SObject{
- bucket: b,
- Name: objs.Prefixes[i],
- }
- result.CommonPrefixes = append(result.CommonPrefixes, obj)
- }
- return result, nil
- }
- func (region *SRegion) NewMultipartUpload(bucket, key string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) (string, error) {
- body := map[string]string{"name": key}
- if len(storageClassStr) > 0 {
- body["storageClass"] = storageClassStr
- }
- for k := range meta {
- switch k {
- case cloudprovider.META_HEADER_CONTENT_TYPE:
- body["contentType"] = meta.Get(k)
- case cloudprovider.META_HEADER_CONTENT_ENCODING:
- body["contentEncoding"] = meta.Get(k)
- case cloudprovider.META_HEADER_CONTENT_DISPOSITION:
- body["contentDisposition"] = meta.Get(k)
- case cloudprovider.META_HEADER_CONTENT_LANGUAGE:
- body["contentLanguage"] = meta.Get(k)
- case cloudprovider.META_HEADER_CACHE_CONTROL:
- body["cacheControl"] = meta.Get(k)
- default:
- body[fmt.Sprintf("metadata.%s", k)] = meta.Get(k)
- }
- }
- switch cannedAcl {
- case cloudprovider.ACLPrivate:
- case cloudprovider.ACLAuthRead:
- body["predefinedAcl"] = "authenticatedRead"
- case cloudprovider.ACLPublicRead:
- body["predefinedAcl"] = "publicRead"
- case cloudprovider.ACLPublicReadWrite:
- return "", cloudprovider.ErrNotSupported
- }
- resource := fmt.Sprintf("b/%s/o?uploadType=resumable", bucket)
- input := strings.NewReader(jsonutils.Marshal(body).String())
- header := http.Header{}
- header.Set("Content-Type", "application/json; charset=UTF-8")
- header.Set("Content-Length", fmt.Sprintf("%d", input.Len()))
- resp, err := region.client.storageUpload(resource, header, input)
- if err != nil {
- return "", errors.Wrap(err, "storageUpload")
- }
- defer resp.Body.Close()
- location := resp.Header.Get("Location")
- query, err := url.ParseQuery(location)
- if err != nil {
- return "", errors.Wrapf(err, "url.ParseQuery(%s)", location)
- }
- return query.Get("upload_id"), nil
- }
- func (b *SBucket) NewMultipartUpload(ctx context.Context, key string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) (string, error) {
- return b.region.NewMultipartUpload(b.Name, key, cannedAcl, storageClassStr, meta)
- }
- func (region *SRegion) UploadPart(bucket, uploadId string, partIndex int, offset int64, part io.Reader, partSize int64, totalSize int64) error {
- resource := fmt.Sprintf("b/%s/o?uploadType=resumable&upload_id=%s", bucket, uploadId)
- header := http.Header{}
- header.Set("Content-Length", fmt.Sprintf("%d", partSize))
- header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+partSize-1, totalSize))
- resp, err := region.client.storageUploadPart(resource, header, part)
- if err != nil {
- return errors.Wrap(err, "storageUploadPart")
- }
- if resp.StatusCode >= 500 {
- content, _ := ioutil.ReadAll(resp.Body)
- return fmt.Errorf("status code: %d %s", resp.StatusCode, content)
- }
- defer resp.Body.Close()
- return nil
- }
- func (region *SRegion) CheckUploadRange(bucket string, uploadId string) error {
- resource := fmt.Sprintf("b/%s/o?uploadType=resumable&upload_id=%s", bucket, uploadId)
- header := http.Header{}
- header.Set("Content-Range", "bytes */*")
- resp, err := region.client.storageUploadPart(resource, header, nil)
- if err != nil {
- return errors.Wrap(err, "storageUploadPart")
- }
- defer resp.Body.Close()
- content, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return errors.Wrap(err, "ReadAll")
- }
- fmt.Println("content: ", string(content))
- for k, v := range resp.Header {
- fmt.Println("k: ", k, "v: ", v)
- }
- fmt.Println("status code: ", resp.StatusCode)
- return nil
- }
- func (b *SBucket) UploadPart(ctx context.Context, key string, uploadId string, partIndex int, part io.Reader, partSize int64, offset, totalSize int64) (string, error) {
- return "", b.region.UploadPart(b.Name, uploadId, partIndex, offset, part, partSize, totalSize)
- }
- func (b *SBucket) PutObject(ctx context.Context, key string, body io.Reader, sizeBytes int64, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
- return b.region.PutObject(b.Name, key, body, sizeBytes, cannedAcl, meta)
- }
- func (region *SRegion) GetBucket(name string) (*SBucket, error) {
- resource := "b/" + name
- bucket := &SBucket{region: region}
- err := region.StorageGet(resource, bucket)
- if err != nil {
- return nil, errors.Wrap(err, "GetBucket")
- }
- return bucket, nil
- }
- func (region *SRegion) GetBuckets(maxResults int, pageToken string) ([]SBucket, error) {
- buckets := []SBucket{}
- params := map[string]string{
- "project": region.GetProjectId(),
- }
- err := region.StorageList("b", params, maxResults, pageToken, &buckets)
- if err != nil {
- return nil, err
- }
- return buckets, nil
- }
- func (region *SRegion) CreateBucket(name string, storageClass string, acl cloudprovider.TBucketACLType) (*SBucket, error) {
- body := map[string]interface{}{
- "name": name,
- "location": region.Name,
- }
- if len(storageClass) > 0 {
- body["storageClass"] = storageClass
- }
- params := url.Values{}
- params.Set("predefinedDefaultObjectAcl", "private")
- switch acl {
- case cloudprovider.ACLPrivate, cloudprovider.ACLUnknown:
- params.Set("predefinedAcl", "private")
- case cloudprovider.ACLAuthRead:
- params.Set("predefinedAcl", "authenticatedRead")
- case cloudprovider.ACLPublicRead:
- params.Set("predefinedAcl", "publicRead")
- case cloudprovider.ACLPublicReadWrite:
- params.Set("predefinedAcl", "publicReadWrite")
- }
- params.Set("project", region.GetProjectId())
- bucket := &SBucket{region: region}
- resource := fmt.Sprintf("b?%s", params.Encode())
- err := region.StorageInsert(resource, jsonutils.Marshal(body), bucket)
- if err != nil {
- return nil, err
- }
- return bucket, nil
- }
- func (region *SRegion) UploadObject(bucket string, params url.Values, header http.Header, input io.Reader) error {
- resource := fmt.Sprintf("b/%s/o", bucket)
- if len(params) > 0 {
- resource = fmt.Sprintf("%s?%s", resource, params.Encode())
- }
- resp, err := region.client.storageUpload(resource, header, input)
- if err != nil {
- return errors.Wrap(err, "storageUpload")
- }
- defer resp.Body.Close()
- return nil
- }
- func (region *SRegion) PutObject(bucket string, name string, input io.Reader, sizeBytes int64, cannedAcl cloudprovider.TBucketACLType, meta http.Header) error {
- params := url.Values{}
- params.Set("name", name)
- params.Set("uploadType", "media")
- header := http.Header{}
- header.Set("Content-Length", fmt.Sprintf("%v", sizeBytes))
- err := region.UploadObject(bucket, params, header, input)
- if err != nil {
- return errors.Wrap(err, "UploadObject")
- }
- err = region.SetObjectAcl(bucket, name, cannedAcl)
- if err != nil {
- return errors.Wrap(err, "SetObjectAcl")
- }
- return region.SetObjectMeta(bucket, name, meta)
- }
- func (region *SRegion) DeleteBucket(name string) error {
- return region.StorageDelete("b/" + name)
- }
- type SBucketCORSRule struct {
- Cors []SCORSDetails
- }
- type SCORSDetails struct {
- Origin []string `json:"origin"`
- Method []string `json:"method"`
- ResponseHeader []string `json:"responseHeader"`
- MaxAgeSeconds int `json:"maxAgeSeconds"`
- }
- func (bucket *SBucket) GetCORSRules() ([]cloudprovider.SBucketCORSRule, error) {
- res := []cloudprovider.SBucketCORSRule{}
- corss := SBucketCORSRule{}
- err := bucket.region.StorageGet(fmt.Sprintf("b/%s?fields=cors", bucket.Name), &corss)
- if err != nil {
- return nil, errors.Wrap(err, "StorageGet cors")
- }
- for _, cors := range corss.Cors {
- temp := cloudprovider.SBucketCORSRule{}
- temp.AllowedHeaders = cors.ResponseHeader
- temp.AllowedMethods = cors.Method
- temp.AllowedOrigins = cors.Origin
- temp.MaxAgeSeconds = cors.MaxAgeSeconds
- res = append(res, temp)
- }
- return res, nil
- }
- func (b *SBucket) SetCORS(rules []cloudprovider.SBucketCORSRule) error {
- params := []map[string]interface{}{}
- for _, rule := range rules {
- params = append(params, map[string]interface{}{
- "origin": rule.AllowedOrigins,
- "method": rule.AllowedMethods,
- "responseHeader": rule.AllowedHeaders,
- "maxAgeSeconds": rule.MaxAgeSeconds,
- })
- }
- return b.region.StoragePut(fmt.Sprintf("b/%s?fields=cors", b.Name), jsonutils.Marshal(map[string]interface{}{"cors": params}), nil)
- }
- func (b *SBucket) DeleteCORS() error {
- return b.region.StoragePut(fmt.Sprintf("b/%s?fields=cors", b.Name), nil, nil)
- }
|