bucket.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package google
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "net/http"
  21. "net/url"
  22. "strings"
  23. "time"
  24. "cloud.google.com/go/storage"
  25. "yunion.io/x/jsonutils"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/utils"
  29. "yunion.io/x/cloudmux/pkg/cloudprovider"
  30. "yunion.io/x/cloudmux/pkg/multicloud"
  31. )
  32. type SLifecycleRuleAction struct {
  33. Type string
  34. }
  35. type SLifecycleRuleCondition struct {
  36. Age int
  37. }
  38. type SLifecycleRule struct {
  39. Action SLifecycleRuleAction
  40. Condition SLifecycleRuleCondition
  41. }
  42. type SBucketPolicyOnly struct {
  43. Enabled bool
  44. }
  45. type SUniformBucketLevelAccess struct {
  46. Enabled bool
  47. }
  48. type SIamConfiguration struct {
  49. BucketPolicyOnly SBucketPolicyOnly
  50. UniformBucketLevelAccess SUniformBucketLevelAccess
  51. }
  52. type SLifecycle struct {
  53. Rule []SLifecycleRule
  54. }
  55. type SBucket struct {
  56. multicloud.SBaseBucket
  57. GoogleTags
  58. region *SRegion
  59. Kind string
  60. SelfLink string
  61. Name string
  62. ProjectNumber string
  63. Metageneration string
  64. Location string
  65. StorageClass string
  66. Etag string
  67. TimeCreated time.Time
  68. Updated time.Time
  69. Lifecycle SLifecycle
  70. IamConfiguration SIamConfiguration
  71. LocationType string
  72. }
  73. func (b *SBucket) GetProjectId() string {
  74. return b.region.GetProjectId()
  75. }
  76. func (b *SBucket) GetAcl() cloudprovider.TBucketACLType {
  77. iam, err := b.region.GetBucketIam(b.Name)
  78. if err != nil {
  79. return cloudprovider.ACLUnknown
  80. }
  81. acl := cloudprovider.ACLPrivate
  82. allUsers := []SBucketBinding{}
  83. allAuthUsers := []SBucketBinding{}
  84. for _, binding := range iam.Bindings {
  85. if utils.IsInStringArray("allUsers", binding.Members) {
  86. allUsers = append(allUsers, binding)
  87. }
  88. if utils.IsInStringArray("allAuthenticatedUsers", binding.Members) {
  89. allAuthUsers = append(allAuthUsers, binding)
  90. }
  91. }
  92. for _, binding := range allUsers {
  93. switch binding.Role {
  94. case "roles/storage.admin", "roles/storage.objectAdmin":
  95. acl = cloudprovider.ACLPublicReadWrite
  96. case "roles/storage.objectViewer":
  97. if acl != cloudprovider.ACLPublicReadWrite {
  98. acl = cloudprovider.ACLPublicRead
  99. }
  100. }
  101. }
  102. for _, binding := range allAuthUsers {
  103. switch binding.Role {
  104. case "roles/storage.admin", "roles/storage.objectAdmin", "roles/storage.objectViewer":
  105. acl = cloudprovider.ACLAuthRead
  106. }
  107. }
  108. return acl
  109. }
  110. func (region *SRegion) SetBucketAcl(bucket string, acl cloudprovider.TBucketACLType) error {
  111. iam, err := region.GetBucketIam(bucket)
  112. if err != nil {
  113. return errors.Wrap(err, "GetBucketIam")
  114. }
  115. bindings := []SBucketBinding{}
  116. for _, binding := range iam.Bindings {
  117. if !utils.IsInStringArray(string(storage.AllUsers), binding.Members) && !utils.IsInStringArray(string(storage.AllAuthenticatedUsers), binding.Members) {
  118. bindings = append(bindings, binding)
  119. }
  120. }
  121. switch acl {
  122. case cloudprovider.ACLPrivate:
  123. if len(bindings) == len(iam.Bindings) {
  124. return nil
  125. }
  126. case cloudprovider.ACLAuthRead:
  127. bindings = append(bindings, SBucketBinding{
  128. Role: "roles/storage.objectViewer",
  129. Members: []string{"allAuthenticatedUsers"},
  130. })
  131. case cloudprovider.ACLPublicRead:
  132. bindings = append(bindings, SBucketBinding{
  133. Role: "roles/storage.objectViewer",
  134. Members: []string{"allUsers"},
  135. })
  136. case cloudprovider.ACLPublicReadWrite:
  137. bindings = append(bindings, SBucketBinding{
  138. Role: "roles/storage.objectAdmin",
  139. Members: []string{"allUsers"},
  140. })
  141. default:
  142. return fmt.Errorf("unknown acl %s", acl)
  143. }
  144. iam.Bindings = bindings
  145. _, err = region.SetBucketIam(bucket, iam)
  146. if err != nil {
  147. return errors.Wrap(err, "SetBucketIam")
  148. }
  149. return nil
  150. }
  151. func (b *SBucket) SetAcl(acl cloudprovider.TBucketACLType) error {
  152. return b.region.SetBucketAcl(b.Name, acl)
  153. }
  154. func (b *SBucket) GetGlobalId() string {
  155. return b.Name
  156. }
  157. func (b *SBucket) GetName() string {
  158. return b.Name
  159. }
  160. func (b *SBucket) GetLocation() string {
  161. return strings.ToLower(b.Location)
  162. }
  163. func (b *SBucket) GetIRegion() cloudprovider.ICloudRegion {
  164. return b.region
  165. }
  166. func (b *SBucket) GetCreatedAt() time.Time {
  167. return b.TimeCreated
  168. }
  169. func (b *SBucket) GetStorageClass() string {
  170. return b.StorageClass
  171. }
  172. func (b *SBucket) GetAccessUrls() []cloudprovider.SBucketAccessUrl {
  173. return []cloudprovider.SBucketAccessUrl{
  174. {
  175. Url: fmt.Sprintf("https://www.googleapis.com/storage/v1/b/%s", b.Name),
  176. Description: "bucket domain",
  177. Primary: true,
  178. },
  179. {
  180. Url: fmt.Sprintf("https://www.googleapis.com/upload/storage/v1/b/%s/o", b.Name),
  181. Description: "object upload endpoint",
  182. },
  183. {
  184. Url: fmt.Sprintf("https://www.googleapis.com/batch/storage/v1/b/%s", b.Name),
  185. Description: "batch operation",
  186. },
  187. }
  188. }
  189. func (b *SBucket) GetStats() cloudprovider.SBucketStats {
  190. stats, _ := cloudprovider.GetIBucketStats(b)
  191. return stats
  192. }
  193. func (b *SBucket) AbortMultipartUpload(ctx context.Context, key string, uploadId string) error {
  194. resource := fmt.Sprintf("b/%s/o?uploadType=resumable&upload_id=%s", b.Name, uploadId)
  195. return b.region.client.storageAbortUpload(resource)
  196. }
  197. func (b *SBucket) CompleteMultipartUpload(ctx context.Context, key string, uploadId string, partEtags []string) error {
  198. resource := fmt.Sprintf("b/%s/o/%s", b.Name, url.PathEscape(key))
  199. err := b.region.StorageGet(resource, nil)
  200. if err != nil {
  201. return errors.Wrapf(err, "failed to get object %s", key)
  202. }
  203. return nil
  204. }
  205. func (b *SBucket) CopyObject(ctx context.Context, destKey string, srcBucket, srcKey string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
  206. resource := fmt.Sprintf("b/%s/o/%s", srcBucket, url.PathEscape(srcKey))
  207. action := fmt.Sprintf("copyTo/b/%s/o/%s", b.Name, url.PathEscape(destKey))
  208. err := b.region.StorageDo(resource, action, nil, nil)
  209. if err != nil {
  210. return errors.Wrap(err, "CopyObject")
  211. }
  212. err = b.region.SetObjectAcl(b.Name, destKey, cannedAcl)
  213. if err != nil {
  214. return errors.Wrapf(err, "AddObjectAcl(%s)", cannedAcl)
  215. }
  216. err = b.region.SetObjectMeta(b.Name, destKey, meta)
  217. if err != nil {
  218. return errors.Wrap(err, "SetObjectMeta")
  219. }
  220. return nil
  221. }
  222. func (b *SBucket) CopyPart(ctx context.Context, key string, uploadId string, partNumber int, srcBucket string, srcKey string, srcOffset int64, srcLength int64) (string, error) {
  223. return "", cloudprovider.ErrNotSupported
  224. }
  225. func (region *SRegion) DeleteObject(bucket, key string) error {
  226. resource := fmt.Sprintf("b/%s/o/%s", bucket, url.PathEscape(key))
  227. return region.StorageDelete(resource)
  228. }
  229. func (b *SBucket) DeleteObject(ctx context.Context, key string) error {
  230. return b.region.DeleteObject(b.Name, key)
  231. }
  232. func (region *SRegion) DownloadObjectRange(bucket, object string, start, end int64) (io.ReadCloser, error) {
  233. resource := fmt.Sprintf("b/%s/o/%s?alt=media", bucket, url.PathEscape(object))
  234. header := http.Header{}
  235. if start <= 0 {
  236. if end > 0 {
  237. header.Set("Range", fmt.Sprintf("bytes=0-%d", end))
  238. } else {
  239. header.Set("Range", "bytes=-1")
  240. }
  241. } else {
  242. if end > start {
  243. header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
  244. } else {
  245. header.Set("Range", fmt.Sprintf("bytes=%d-", start))
  246. }
  247. }
  248. return region.client.storageDownload(resource, header)
  249. }
  250. func (b *SBucket) GetObject(ctx context.Context, key string, rangeOpt *cloudprovider.SGetObjectRange) (io.ReadCloser, error) {
  251. return b.region.DownloadObjectRange(b.Name, key, rangeOpt.Start, rangeOpt.End)
  252. }
  253. func (region *SRegion) SingedUrl(bucket, key string, method string, expire time.Duration) (string, error) {
  254. if expire > time.Hour*24*7 {
  255. return "", fmt.Errorf(`Expiration Time can\'t be longer than 604800 seconds (7 days)`)
  256. }
  257. opts := &storage.SignedURLOptions{
  258. Scheme: storage.SigningSchemeV4,
  259. Method: method,
  260. GoogleAccessID: region.client.clientEmail,
  261. PrivateKey: []byte(region.client.privateKey),
  262. Expires: time.Now().Add(expire),
  263. }
  264. switch method {
  265. case "GET":
  266. case "PUT":
  267. opts.Headers = []string{"Content-Type:application/octet-stream"}
  268. default:
  269. return "", errors.Wrapf(cloudprovider.ErrNotSupported, "Not support method %s", method)
  270. }
  271. return storage.SignedURL(bucket, key, opts)
  272. }
  273. func (b *SBucket) GetTempUrl(method string, key string, expire time.Duration) (string, error) {
  274. return b.region.SingedUrl(b.Name, key, method, expire)
  275. }
  276. func (b *SBucket) ListObjects(prefix string, marker string, delimiter string, maxCount int) (cloudprovider.SListObjectResult, error) {
  277. result := cloudprovider.SListObjectResult{}
  278. objs, err := b.region.GetObjects(b.Name, prefix, marker, delimiter, maxCount)
  279. if err != nil {
  280. return result, errors.Wrap(err, "GetObjects")
  281. }
  282. result.NextMarker = objs.NextPageToken
  283. log.Errorf("obj count: %d", len(objs.Items))
  284. result.Objects = []cloudprovider.ICloudObject{}
  285. result.CommonPrefixes = []cloudprovider.ICloudObject{}
  286. for i := range objs.Items {
  287. if strings.HasSuffix(objs.Items[i].Name, "/") {
  288. continue
  289. }
  290. objs.Items[i].bucket = b
  291. result.Objects = append(result.Objects, &objs.Items[i])
  292. }
  293. for i := range objs.Prefixes {
  294. obj := &SObject{
  295. bucket: b,
  296. Name: objs.Prefixes[i],
  297. }
  298. result.CommonPrefixes = append(result.CommonPrefixes, obj)
  299. }
  300. return result, nil
  301. }
  302. func (region *SRegion) NewMultipartUpload(bucket, key string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) (string, error) {
  303. body := map[string]string{"name": key}
  304. if len(storageClassStr) > 0 {
  305. body["storageClass"] = storageClassStr
  306. }
  307. for k := range meta {
  308. switch k {
  309. case cloudprovider.META_HEADER_CONTENT_TYPE:
  310. body["contentType"] = meta.Get(k)
  311. case cloudprovider.META_HEADER_CONTENT_ENCODING:
  312. body["contentEncoding"] = meta.Get(k)
  313. case cloudprovider.META_HEADER_CONTENT_DISPOSITION:
  314. body["contentDisposition"] = meta.Get(k)
  315. case cloudprovider.META_HEADER_CONTENT_LANGUAGE:
  316. body["contentLanguage"] = meta.Get(k)
  317. case cloudprovider.META_HEADER_CACHE_CONTROL:
  318. body["cacheControl"] = meta.Get(k)
  319. default:
  320. body[fmt.Sprintf("metadata.%s", k)] = meta.Get(k)
  321. }
  322. }
  323. switch cannedAcl {
  324. case cloudprovider.ACLPrivate:
  325. case cloudprovider.ACLAuthRead:
  326. body["predefinedAcl"] = "authenticatedRead"
  327. case cloudprovider.ACLPublicRead:
  328. body["predefinedAcl"] = "publicRead"
  329. case cloudprovider.ACLPublicReadWrite:
  330. return "", cloudprovider.ErrNotSupported
  331. }
  332. resource := fmt.Sprintf("b/%s/o?uploadType=resumable", bucket)
  333. input := strings.NewReader(jsonutils.Marshal(body).String())
  334. header := http.Header{}
  335. header.Set("Content-Type", "application/json; charset=UTF-8")
  336. header.Set("Content-Length", fmt.Sprintf("%d", input.Len()))
  337. resp, err := region.client.storageUpload(resource, header, input)
  338. if err != nil {
  339. return "", errors.Wrap(err, "storageUpload")
  340. }
  341. defer resp.Body.Close()
  342. location := resp.Header.Get("Location")
  343. query, err := url.ParseQuery(location)
  344. if err != nil {
  345. return "", errors.Wrapf(err, "url.ParseQuery(%s)", location)
  346. }
  347. return query.Get("upload_id"), nil
  348. }
  349. func (b *SBucket) NewMultipartUpload(ctx context.Context, key string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) (string, error) {
  350. return b.region.NewMultipartUpload(b.Name, key, cannedAcl, storageClassStr, meta)
  351. }
  352. func (region *SRegion) UploadPart(bucket, uploadId string, partIndex int, offset int64, part io.Reader, partSize int64, totalSize int64) error {
  353. resource := fmt.Sprintf("b/%s/o?uploadType=resumable&upload_id=%s", bucket, uploadId)
  354. header := http.Header{}
  355. header.Set("Content-Length", fmt.Sprintf("%d", partSize))
  356. header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+partSize-1, totalSize))
  357. resp, err := region.client.storageUploadPart(resource, header, part)
  358. if err != nil {
  359. return errors.Wrap(err, "storageUploadPart")
  360. }
  361. if resp.StatusCode >= 500 {
  362. content, _ := ioutil.ReadAll(resp.Body)
  363. return fmt.Errorf("status code: %d %s", resp.StatusCode, content)
  364. }
  365. defer resp.Body.Close()
  366. return nil
  367. }
  368. func (region *SRegion) CheckUploadRange(bucket string, uploadId string) error {
  369. resource := fmt.Sprintf("b/%s/o?uploadType=resumable&upload_id=%s", bucket, uploadId)
  370. header := http.Header{}
  371. header.Set("Content-Range", "bytes */*")
  372. resp, err := region.client.storageUploadPart(resource, header, nil)
  373. if err != nil {
  374. return errors.Wrap(err, "storageUploadPart")
  375. }
  376. defer resp.Body.Close()
  377. content, err := ioutil.ReadAll(resp.Body)
  378. if err != nil {
  379. return errors.Wrap(err, "ReadAll")
  380. }
  381. fmt.Println("content: ", string(content))
  382. for k, v := range resp.Header {
  383. fmt.Println("k: ", k, "v: ", v)
  384. }
  385. fmt.Println("status code: ", resp.StatusCode)
  386. return nil
  387. }
  388. func (b *SBucket) UploadPart(ctx context.Context, key string, uploadId string, partIndex int, part io.Reader, partSize int64, offset, totalSize int64) (string, error) {
  389. return "", b.region.UploadPart(b.Name, uploadId, partIndex, offset, part, partSize, totalSize)
  390. }
  391. func (b *SBucket) PutObject(ctx context.Context, key string, body io.Reader, sizeBytes int64, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
  392. return b.region.PutObject(b.Name, key, body, sizeBytes, cannedAcl, meta)
  393. }
  394. func (region *SRegion) GetBucket(name string) (*SBucket, error) {
  395. resource := "b/" + name
  396. bucket := &SBucket{region: region}
  397. err := region.StorageGet(resource, bucket)
  398. if err != nil {
  399. return nil, errors.Wrap(err, "GetBucket")
  400. }
  401. return bucket, nil
  402. }
  403. func (region *SRegion) GetBuckets(maxResults int, pageToken string) ([]SBucket, error) {
  404. buckets := []SBucket{}
  405. params := map[string]string{
  406. "project": region.GetProjectId(),
  407. }
  408. err := region.StorageList("b", params, maxResults, pageToken, &buckets)
  409. if err != nil {
  410. return nil, err
  411. }
  412. return buckets, nil
  413. }
  414. func (region *SRegion) CreateBucket(name string, storageClass string, acl cloudprovider.TBucketACLType) (*SBucket, error) {
  415. body := map[string]interface{}{
  416. "name": name,
  417. "location": region.Name,
  418. }
  419. if len(storageClass) > 0 {
  420. body["storageClass"] = storageClass
  421. }
  422. params := url.Values{}
  423. params.Set("predefinedDefaultObjectAcl", "private")
  424. switch acl {
  425. case cloudprovider.ACLPrivate, cloudprovider.ACLUnknown:
  426. params.Set("predefinedAcl", "private")
  427. case cloudprovider.ACLAuthRead:
  428. params.Set("predefinedAcl", "authenticatedRead")
  429. case cloudprovider.ACLPublicRead:
  430. params.Set("predefinedAcl", "publicRead")
  431. case cloudprovider.ACLPublicReadWrite:
  432. params.Set("predefinedAcl", "publicReadWrite")
  433. }
  434. params.Set("project", region.GetProjectId())
  435. bucket := &SBucket{region: region}
  436. resource := fmt.Sprintf("b?%s", params.Encode())
  437. err := region.StorageInsert(resource, jsonutils.Marshal(body), bucket)
  438. if err != nil {
  439. return nil, err
  440. }
  441. return bucket, nil
  442. }
  443. func (region *SRegion) UploadObject(bucket string, params url.Values, header http.Header, input io.Reader) error {
  444. resource := fmt.Sprintf("b/%s/o", bucket)
  445. if len(params) > 0 {
  446. resource = fmt.Sprintf("%s?%s", resource, params.Encode())
  447. }
  448. resp, err := region.client.storageUpload(resource, header, input)
  449. if err != nil {
  450. return errors.Wrap(err, "storageUpload")
  451. }
  452. defer resp.Body.Close()
  453. return nil
  454. }
  455. func (region *SRegion) PutObject(bucket string, name string, input io.Reader, sizeBytes int64, cannedAcl cloudprovider.TBucketACLType, meta http.Header) error {
  456. params := url.Values{}
  457. params.Set("name", name)
  458. params.Set("uploadType", "media")
  459. header := http.Header{}
  460. header.Set("Content-Length", fmt.Sprintf("%v", sizeBytes))
  461. err := region.UploadObject(bucket, params, header, input)
  462. if err != nil {
  463. return errors.Wrap(err, "UploadObject")
  464. }
  465. err = region.SetObjectAcl(bucket, name, cannedAcl)
  466. if err != nil {
  467. return errors.Wrap(err, "SetObjectAcl")
  468. }
  469. return region.SetObjectMeta(bucket, name, meta)
  470. }
  471. func (region *SRegion) DeleteBucket(name string) error {
  472. return region.StorageDelete("b/" + name)
  473. }
  474. type SBucketCORSRule struct {
  475. Cors []SCORSDetails
  476. }
  477. type SCORSDetails struct {
  478. Origin []string `json:"origin"`
  479. Method []string `json:"method"`
  480. ResponseHeader []string `json:"responseHeader"`
  481. MaxAgeSeconds int `json:"maxAgeSeconds"`
  482. }
  483. func (bucket *SBucket) GetCORSRules() ([]cloudprovider.SBucketCORSRule, error) {
  484. res := []cloudprovider.SBucketCORSRule{}
  485. corss := SBucketCORSRule{}
  486. err := bucket.region.StorageGet(fmt.Sprintf("b/%s?fields=cors", bucket.Name), &corss)
  487. if err != nil {
  488. return nil, errors.Wrap(err, "StorageGet cors")
  489. }
  490. for _, cors := range corss.Cors {
  491. temp := cloudprovider.SBucketCORSRule{}
  492. temp.AllowedHeaders = cors.ResponseHeader
  493. temp.AllowedMethods = cors.Method
  494. temp.AllowedOrigins = cors.Origin
  495. temp.MaxAgeSeconds = cors.MaxAgeSeconds
  496. res = append(res, temp)
  497. }
  498. return res, nil
  499. }
  500. func (b *SBucket) SetCORS(rules []cloudprovider.SBucketCORSRule) error {
  501. params := []map[string]interface{}{}
  502. for _, rule := range rules {
  503. params = append(params, map[string]interface{}{
  504. "origin": rule.AllowedOrigins,
  505. "method": rule.AllowedMethods,
  506. "responseHeader": rule.AllowedHeaders,
  507. "maxAgeSeconds": rule.MaxAgeSeconds,
  508. })
  509. }
  510. return b.region.StoragePut(fmt.Sprintf("b/%s?fields=cors", b.Name), jsonutils.Marshal(map[string]interface{}{"cors": params}), nil)
  511. }
  512. func (b *SBucket) DeleteCORS() error {
  513. return b.region.StoragePut(fmt.Sprintf("b/%s?fields=cors", b.Name), nil, nil)
  514. }