bucket.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package baidu
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "net/url"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "yunion.io/x/cloudmux/pkg/cloudprovider"
  25. "yunion.io/x/cloudmux/pkg/multicloud"
  26. "yunion.io/x/jsonutils"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/gotypes"
  29. "yunion.io/x/pkg/util/httputils"
  30. )
  31. type SBucket struct {
  32. multicloud.SBaseBucket
  33. multicloud.STagBase
  34. region *SRegion
  35. Name string
  36. Location string
  37. CreationDate time.Time
  38. }
  39. func (b *SBucket) GetId() string {
  40. return b.Name
  41. }
  42. func (b *SBucket) GetGlobalId() string {
  43. return b.Name
  44. }
  45. func (b *SBucket) GetName() string {
  46. return b.Name
  47. }
  48. func (b *SBucket) GetLocation() string {
  49. return b.Location
  50. }
  51. func (b *SBucket) GetCreatedAt() time.Time {
  52. return b.CreationDate
  53. }
  54. func (b *SBucket) GetStorageClass() string {
  55. storageClass, err := b.region.GetBucketStorageClass(b.Name)
  56. if err != nil {
  57. return ""
  58. }
  59. return storageClass
  60. }
  61. func (b *SBucket) GetAcl() cloudprovider.TBucketACLType {
  62. acl, err := b.region.GetBucketAcl(b.Name)
  63. if err != nil {
  64. return cloudprovider.ACLUnknown
  65. }
  66. return acl.GetAcl()
  67. }
  68. func (b *SBucket) SetAcl(acl cloudprovider.TBucketACLType) error {
  69. return b.region.SetBucketAcl(b.Name, acl)
  70. }
  71. func (b *SBucket) GetAccessUrls() []cloudprovider.SBucketAccessUrl {
  72. return []cloudprovider.SBucketAccessUrl{
  73. {
  74. Url: fmt.Sprintf("https://%s.%s.bcebos.com", b.Name, b.region.GetId()),
  75. Description: "ExtranetEndpoint",
  76. Primary: true,
  77. },
  78. }
  79. }
  80. func (b *SBucket) GetTags() (map[string]string, error) {
  81. params := url.Values{}
  82. params.Set("tagging", "")
  83. resp, err := b.region.bosRequest(httputils.GET, b.Name, "", params, http.Header{}, nil)
  84. if err != nil {
  85. return nil, errors.Wrap(err, "GetTags")
  86. }
  87. _, body, err := httputils.ParseJSONResponse("", resp, nil, b.region.client.debug)
  88. if err != nil {
  89. return nil, errors.Wrap(err, "ParseJSONResponse")
  90. }
  91. if gotypes.IsNil(body) {
  92. return nil, nil
  93. }
  94. ret := struct {
  95. Tag []struct {
  96. TagKey string `json:"tagKey"`
  97. TagValue string `json:"tagValue"`
  98. } `json:"tag"`
  99. }{}
  100. err = body.Unmarshal(&ret)
  101. if err != nil {
  102. return nil, errors.Wrap(err, "Unmarshal")
  103. }
  104. res := map[string]string{}
  105. for _, tag := range ret.Tag {
  106. res[tag.TagKey] = tag.TagValue
  107. }
  108. return res, nil
  109. }
  110. func (b *SBucket) SetTags(tags map[string]string, replace bool) error {
  111. params := url.Values{}
  112. params.Set("tagging", "")
  113. _, err := b.region.bosRequest(httputils.DELETE, b.Name, "", params, http.Header{}, nil)
  114. if err != nil {
  115. return errors.Wrap(err, "DeleteTagging")
  116. }
  117. if len(tags) == 0 {
  118. return nil
  119. }
  120. input := []map[string]string{}
  121. for k, v := range tags {
  122. input = append(input, map[string]string{
  123. "tagKey": k,
  124. "tagValue": v,
  125. })
  126. }
  127. body := strings.NewReader(jsonutils.Marshal(map[string]interface{}{"tags": input}).String())
  128. _, err = b.region.bosRequest(httputils.PUT, b.Name, "", params, http.Header{}, body)
  129. if err != nil {
  130. return errors.Wrap(err, "SetTags")
  131. }
  132. return nil
  133. }
  134. func (b *SBucket) GetStats() cloudprovider.SBucketStats {
  135. stat, _ := cloudprovider.GetIBucketStats(b)
  136. return stat
  137. }
  138. func (b *SBucket) SetLimit(limit cloudprovider.SBucketStats) error {
  139. return b.region.SetBucketLimit(b.Name, limit)
  140. }
  141. func (region *SRegion) SetBucketLimit(bucketName string, limit cloudprovider.SBucketStats) error {
  142. params := url.Values{}
  143. params.Set("quota", "")
  144. body := map[string]interface{}{
  145. "maxObjectCount": limit.ObjectCount,
  146. "maxCapacityMegaBytes": limit.SizeBytes,
  147. }
  148. _, err := region.bosUpdate(bucketName, "", params, body)
  149. return err
  150. }
  151. func (b *SBucket) LimitSupport() cloudprovider.SBucketStats {
  152. ret, err := b.region.GetBucketLimit(b.Name)
  153. if err != nil {
  154. return cloudprovider.SBucketStats{
  155. ObjectCount: -1,
  156. SizeBytes: -1,
  157. }
  158. }
  159. return ret
  160. }
  161. func (region *SRegion) GetBucketLimit(bucketName string) (cloudprovider.SBucketStats, error) {
  162. params := url.Values{}
  163. params.Set("quota", "")
  164. resp, err := region.bosList(bucketName, "", params)
  165. if err != nil {
  166. return cloudprovider.SBucketStats{}, err
  167. }
  168. ret := struct {
  169. MaxObjectCount int
  170. MaxCapacityMegaBytes int
  171. }{}
  172. err = resp.Unmarshal(&ret)
  173. if err != nil {
  174. return cloudprovider.SBucketStats{
  175. ObjectCount: -1,
  176. SizeBytes: -1,
  177. }, err
  178. }
  179. return cloudprovider.SBucketStats{
  180. ObjectCount: ret.MaxObjectCount,
  181. SizeBytes: int64(ret.MaxCapacityMegaBytes),
  182. }, nil
  183. }
  184. func (b *SBucket) CopyObject(ctx context.Context, destKey string, srcBucket, srcKey string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
  185. header := http.Header{}
  186. if len(cannedAcl) > 0 {
  187. header.Set("x-bce-acl", string(cannedAcl))
  188. }
  189. if len(storageClassStr) > 0 {
  190. header.Set("x-bce-storage-class", storageClassStr)
  191. }
  192. if len(srcBucket) > 0 {
  193. header.Set("x-bce-copy-source", fmt.Sprintf("%s/%s", url.PathEscape(srcBucket), url.PathEscape(srcKey)))
  194. }
  195. for k := range meta {
  196. header.Set(k, meta.Get(k))
  197. }
  198. resp, err := b.region.bosRequest(httputils.PUT, b.Name, destKey, url.Values{}, header, nil)
  199. if err != nil {
  200. return errors.Wrapf(err, "CopyObject %s %s %s %s", b.Name, destKey, srcBucket, srcKey)
  201. }
  202. defer httputils.CloseResponse(resp)
  203. return nil
  204. }
  205. func (b *SBucket) GetObject(ctx context.Context, key string, rangeOpt *cloudprovider.SGetObjectRange) (io.ReadCloser, error) {
  206. header := http.Header{}
  207. if rangeOpt != nil {
  208. header.Set("Range", rangeOpt.String())
  209. }
  210. resp, err := b.region.bosRequest(httputils.GET, b.Name, key, url.Values{}, header, nil)
  211. if err != nil {
  212. return nil, errors.Wrapf(err, "GetObject %s %s", b.Name, key)
  213. }
  214. return resp.Body, nil
  215. }
  216. func (b *SBucket) DeleteObject(ctx context.Context, key string) error {
  217. return b.region.DeleteObject(b.Name, key)
  218. }
  219. func (region *SRegion) DeleteObject(bucketName, key string) error {
  220. resp, err := region.bosRequest(httputils.DELETE, bucketName, key, url.Values{}, http.Header{}, nil)
  221. if err != nil {
  222. return errors.Wrapf(err, "DeleteObject %s %s", bucketName, key)
  223. }
  224. httputils.CloseResponse(resp)
  225. return nil
  226. }
  227. func (b *SBucket) GetTempUrl(method string, key string, expire time.Duration) (string, error) {
  228. uri, err := url.Parse(fmt.Sprintf("https://%s.%s.bcebos.com/%s", b.Name, b.region.GetId(), url.PathEscape(key)))
  229. if err != nil {
  230. return "", errors.Wrapf(err, "Parse %s %s", b.Name, b.region.GetId())
  231. }
  232. header := http.Header{}
  233. sign, err := b.region.client._sign(uri, method, header, int(expire.Seconds()))
  234. if err != nil {
  235. return "", errors.Wrapf(err, "Sign %s %s", b.Name, key)
  236. }
  237. query := url.Values{}
  238. query.Set("authorization", sign)
  239. return fmt.Sprintf("%s?%s", uri.String(), query.Encode()), nil
  240. }
  241. func (b *SBucket) PutObject(ctx context.Context, key string, input io.Reader, sizeBytes int64, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
  242. header := http.Header{}
  243. if len(cannedAcl) > 0 {
  244. header.Set("x-bce-acl", string(cannedAcl))
  245. }
  246. if len(storageClassStr) > 0 {
  247. header.Set("x-bce-storage-class", storageClassStr)
  248. }
  249. for k := range meta {
  250. header.Set(fmt.Sprintf("x-bce-meta-%s", k), meta.Get(k))
  251. }
  252. if sizeBytes > 0 {
  253. header.Set("Content-Length", strconv.FormatInt(sizeBytes, 10))
  254. }
  255. resp, err := b.region.bosRequest(httputils.PUT, b.Name, key, url.Values{}, header, input)
  256. if err != nil {
  257. return errors.Wrapf(err, "PutObject %s %s", b.Name, key)
  258. }
  259. httputils.CloseResponse(resp)
  260. return nil
  261. }
  262. const (
  263. THRESHOLD_100_CONTINUE = 1 << 20
  264. )
  265. func (b *SBucket) NewMultipartUpload(ctx context.Context, key string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) (string, error) {
  266. header := http.Header{}
  267. if len(cannedAcl) > 0 {
  268. header.Set("x-bce-acl", string(cannedAcl))
  269. }
  270. if len(storageClassStr) > 0 {
  271. header.Set("x-bce-storage-class", storageClassStr)
  272. }
  273. for k := range meta {
  274. header.Set(fmt.Sprintf("x-bce-meta-%s", k), meta.Get(k))
  275. }
  276. params := url.Values{}
  277. params.Set("uploads", "")
  278. resp, err := b.region.bosRequest(httputils.POST, b.Name, key, params, header, nil)
  279. if err != nil {
  280. return "", errors.Wrapf(err, "NewMultipartUpload %s %s", b.Name, key)
  281. }
  282. _, body, err := httputils.ParseJSONResponse("", resp, nil, b.region.client.debug)
  283. if err != nil {
  284. return "", errors.Wrapf(err, "ParseJSONResponse %s %s", b.Name, key)
  285. }
  286. if gotypes.IsNil(body) {
  287. return "", errors.Errorf("empty response for NewMultipartUpload %s %s", b.Name, key)
  288. }
  289. return body.GetString("uploadId")
  290. }
  291. func (b *SBucket) UploadPart(ctx context.Context, key string, uploadId string, partIndex int, input io.Reader, partSize int64, offset, totalSize int64) (string, error) {
  292. header := http.Header{}
  293. header.Set("Content-Length", strconv.FormatInt(partSize, 10))
  294. params := url.Values{}
  295. params.Set("uploadId", uploadId)
  296. params.Set("partNumber", strconv.Itoa(partIndex))
  297. if partSize > THRESHOLD_100_CONTINUE {
  298. header.Set("Expect", "100-continue")
  299. }
  300. resp, err := b.region.bosRequest(httputils.PUT, b.Name, key, params, header, input)
  301. if err != nil {
  302. return "", errors.Wrapf(err, "UploadPart %s %s %s", b.Name, key, uploadId)
  303. }
  304. defer httputils.CloseResponse(resp)
  305. return strings.Trim(resp.Header.Get("ETag"), "\""), nil
  306. }
  307. func (b *SBucket) CompleteMultipartUpload(ctx context.Context, key string, uploadId string, partEtags []string) error {
  308. header := http.Header{}
  309. params := url.Values{}
  310. params.Set("uploadId", uploadId)
  311. parts := []map[string]string{}
  312. for i, etag := range partEtags {
  313. parts = append(parts, map[string]string{
  314. "partNumber": strconv.Itoa(i + 1),
  315. "eTag": etag,
  316. })
  317. }
  318. body := strings.NewReader(jsonutils.Marshal(map[string]interface{}{"parts": parts}).String())
  319. resp, err := b.region.bosRequest(httputils.POST, b.Name, key, params, header, body)
  320. if err != nil {
  321. return errors.Wrapf(err, "CompleteMultipartUpload %s %s %s", b.Name, key, uploadId)
  322. }
  323. httputils.CloseResponse(resp)
  324. return nil
  325. }
  326. func (b *SBucket) AbortMultipartUpload(ctx context.Context, key string, uploadId string) error {
  327. params := url.Values{}
  328. params.Set("uploadId", uploadId)
  329. resp, err := b.region.bosRequest(httputils.DELETE, b.Name, key, params, http.Header{}, nil)
  330. if err != nil {
  331. return errors.Wrapf(err, "AbortMultipartUpload %s %s %s", b.Name, key, uploadId)
  332. }
  333. httputils.CloseResponse(resp)
  334. return nil
  335. }
  336. func (b *SBucket) CopyPart(ctx context.Context, key string, uploadId string, partIndex int, srcBucket string, srcKey string, srcOffset int64, srcLength int64) (string, error) {
  337. header := http.Header{}
  338. params := url.Values{}
  339. params.Set("uploadId", uploadId)
  340. params.Set("partNumber", strconv.Itoa(partIndex))
  341. header.Set("x-bce-copy-source", fmt.Sprintf("%s/%s", url.PathEscape(srcBucket), url.PathEscape(srcKey)))
  342. if srcLength > 0 {
  343. header.Set("x-bce-copy-source-range", fmt.Sprintf("bytes=%d-%d", srcOffset, srcOffset+srcLength-1))
  344. }
  345. resp, err := b.region.bosRequest(httputils.PUT, b.Name, key, params, header, nil)
  346. if err != nil {
  347. return "", errors.Wrapf(err, "CopyPart %s %s %s %s %d %d", b.Name, key, uploadId, srcBucket, srcOffset, srcLength)
  348. }
  349. _, body, err := httputils.ParseJSONResponse("", resp, nil, b.region.client.debug)
  350. if err != nil {
  351. return "", errors.Wrapf(err, "ParseJSONResponse %s %s %s", b.Name, key, uploadId)
  352. }
  353. if gotypes.IsNil(body) {
  354. return "", errors.Errorf("empty response for CopyPart %s %s %s %s %d %d", b.Name, key, uploadId, srcBucket, srcOffset, srcLength)
  355. }
  356. return body.GetString("eTag")
  357. }
  358. func (b *SBucket) GetIRegion() cloudprovider.ICloudRegion {
  359. return b.region
  360. }
  361. func (b *SBucket) GetProjectId() string {
  362. return ""
  363. }
  364. func (b *SBucket) ListObjects(prefix string, marker string, delimiter string, maxCount int) (cloudprovider.SListObjectResult, error) {
  365. params := url.Values{}
  366. if len(delimiter) > 0 {
  367. params.Set("delimiter", delimiter)
  368. }
  369. if len(prefix) > 0 {
  370. params.Set("prefix", prefix)
  371. }
  372. if maxCount > 0 {
  373. params.Set("maxKeys", strconv.Itoa(maxCount))
  374. }
  375. if len(marker) > 0 {
  376. params.Set("marker", marker)
  377. }
  378. resp, err := b.region.bosList(b.Name, "", params)
  379. if err != nil {
  380. return cloudprovider.SListObjectResult{}, err
  381. }
  382. ret := struct {
  383. Name string
  384. Prefix string
  385. IsTruncated bool
  386. Marker string
  387. CommonPrefixes []struct {
  388. Prefix string
  389. }
  390. Contents []SObject
  391. }{}
  392. err = resp.Unmarshal(&ret)
  393. if err != nil {
  394. return cloudprovider.SListObjectResult{}, err
  395. }
  396. result := cloudprovider.SListObjectResult{
  397. Objects: make([]cloudprovider.ICloudObject, 0),
  398. CommonPrefixes: make([]cloudprovider.ICloudObject, 0),
  399. }
  400. for _, content := range ret.Contents {
  401. content.bucket = b
  402. result.Objects = append(result.Objects, &content)
  403. }
  404. for _, commonPrefix := range ret.CommonPrefixes {
  405. obj := &SObject{
  406. bucket: b,
  407. Key: commonPrefix.Prefix,
  408. }
  409. result.CommonPrefixes = append(result.CommonPrefixes, obj)
  410. }
  411. result.IsTruncated = ret.IsTruncated
  412. result.NextMarker = ret.Marker
  413. return result, nil
  414. }
  415. func (region *SRegion) ListBuckets() ([]SBucket, error) {
  416. resp, err := region.bosList("", "/", nil)
  417. if err != nil {
  418. return nil, err
  419. }
  420. buckets := []SBucket{}
  421. err = resp.Unmarshal(&buckets, "buckets")
  422. if err != nil {
  423. return nil, err
  424. }
  425. return buckets, nil
  426. }
  427. func (region *SRegion) CreateBucket(name string, storageClassStr string, aclStr string) error {
  428. _, err := region.bosUpdate(name, "/", nil, nil)
  429. if err != nil {
  430. return err
  431. }
  432. if len(storageClassStr) > 0 {
  433. err = region.SetBucketStorageClass(name, storageClassStr)
  434. if err != nil {
  435. return err
  436. }
  437. }
  438. if len(aclStr) > 0 {
  439. err = region.SetBucketAcl(name, cloudprovider.TBucketACLType(aclStr))
  440. if err != nil {
  441. return err
  442. }
  443. }
  444. return nil
  445. }
  446. func (region *SRegion) DeleteBucket(name string) error {
  447. _, err := region.bosDelete(name, "/", nil)
  448. return err
  449. }
  450. func (region *SRegion) bosList(bucketName, res string, params url.Values) (jsonutils.JSONObject, error) {
  451. return region.client.bosList(region.GetId(), bucketName, res, params)
  452. }
  453. func (region *SRegion) bosDelete(bucketName, res string, params url.Values) (jsonutils.JSONObject, error) {
  454. return region.client.bosDelete(region.GetId(), bucketName, res, params)
  455. }
  456. func (region *SRegion) bosUpdate(bucketName, res string, params url.Values, body map[string]interface{}) (jsonutils.JSONObject, error) {
  457. return region.client.bosUpdate(region.GetId(), bucketName, res, params, body)
  458. }
  459. func (region *SRegion) bosRequest(method httputils.THttpMethod, bucketName, res string, params url.Values, header http.Header, body io.Reader) (*http.Response, error) {
  460. return region.client.bosRequest(method, region.GetId(), bucketName, res, params, header, body)
  461. }
  462. func (region *SRegion) GetBucketStorageClass(bucketName string) (string, error) {
  463. params := url.Values{}
  464. params.Set("storageClass", "")
  465. resp, err := region.bosList(bucketName, "", params)
  466. if err != nil {
  467. return "", err
  468. }
  469. return resp.GetString("storageClass")
  470. }
  471. func (region *SRegion) SetBucketStorageClass(bucketName string, storageClass string) error {
  472. params := url.Values{}
  473. params.Set("storageClass", "")
  474. body := map[string]interface{}{
  475. "storageClass": storageClass,
  476. }
  477. _, err := region.bosUpdate(bucketName, "", params, body)
  478. return err
  479. }
  480. type SAccessControl struct {
  481. AccessControlList []SAccessControlList
  482. Owner struct {
  483. Id string
  484. }
  485. }
  486. func (acl *SAccessControl) GetAcl() cloudprovider.TBucketACLType {
  487. aclType := cloudprovider.ACLUnknown
  488. switch len(acl.AccessControlList) {
  489. case 1:
  490. if acl.AccessControlList[0].Grantee[0].Id == acl.Owner.Id && acl.AccessControlList[0].Permission[0] == "FULL_CONTROL" {
  491. aclType = cloudprovider.ACLPrivate
  492. }
  493. case 2:
  494. isRead, isWrite := false, false
  495. for _, g := range acl.AccessControlList {
  496. if g.Grantee[0].Id == "*" {
  497. for _, permission := range g.Permission {
  498. if strings.EqualFold(permission, "READ") {
  499. isRead = true
  500. }
  501. if strings.EqualFold(permission, "WRITE") {
  502. isWrite = true
  503. }
  504. }
  505. }
  506. }
  507. if isRead && isWrite {
  508. aclType = cloudprovider.ACLPublicReadWrite
  509. } else if isRead {
  510. aclType = cloudprovider.ACLPublicRead
  511. }
  512. }
  513. return aclType
  514. }
  515. type SAccessControlList struct {
  516. Grantee []struct {
  517. Id string
  518. }
  519. Permission []string
  520. }
  521. func (region *SRegion) GetBucketAcl(bucketName string) (*SAccessControl, error) {
  522. params := url.Values{}
  523. params.Set("acl", "")
  524. resp, err := region.bosList(bucketName, "", params)
  525. if err != nil {
  526. return nil, err
  527. }
  528. ret := &SAccessControl{}
  529. err = resp.Unmarshal(ret)
  530. if err != nil {
  531. return nil, err
  532. }
  533. return ret, nil
  534. }
  535. func (region *SRegion) SetBucketAcl(bucketName string, acl cloudprovider.TBucketACLType) error {
  536. params := url.Values{}
  537. params.Set("acl", "")
  538. header := http.Header{}
  539. header.Set("x-bce-acl", string(acl))
  540. resp, err := region.bosRequest(httputils.PUT, bucketName, "", params, header, nil)
  541. if err != nil {
  542. return errors.Wrapf(err, "SetBucketAcl %s %s", bucketName, acl)
  543. }
  544. httputils.CloseResponse(resp)
  545. return nil
  546. }