bucket.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ksyun
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "strings"
  21. "time"
  22. "github.com/ks3sdklib/aws-sdk-go/aws"
  23. "github.com/ks3sdklib/aws-sdk-go/aws/credentials"
  24. "github.com/ks3sdklib/aws-sdk-go/service/s3"
  25. "yunion.io/x/jsonutils"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/util/fileutils"
  29. "yunion.io/x/cloudmux/pkg/cloudprovider"
  30. "yunion.io/x/cloudmux/pkg/multicloud"
  31. )
  32. type SBucket struct {
  33. multicloud.SBaseBucket
  34. SKsyunTags
  35. region *SRegion
  36. CreationDate time.Time
  37. Name string
  38. Region string
  39. Type string
  40. VisitType string
  41. DataRedundancyType string
  42. }
  43. func (b *SBucket) GetProjectId() string {
  44. return ""
  45. }
  46. func (b *SBucket) GetGlobalId() string {
  47. return b.Name
  48. }
  49. func (b *SBucket) GetName() string {
  50. return b.Name
  51. }
  52. func (b *SBucket) GetAcl() cloudprovider.TBucketACLType {
  53. return b.region.GetBucketAcl(b.Name)
  54. }
  55. func (region *SRegion) GetBucketAcl(bucket string) cloudprovider.TBucketACLType {
  56. svc := region.getS3Client()
  57. input := &s3.GetBucketACLInput{
  58. Bucket: &bucket,
  59. }
  60. resp, err := svc.GetBucketACL(input)
  61. if err != nil {
  62. return cloudprovider.ACLUnknown
  63. }
  64. return cloudprovider.TBucketACLType(s3.GetCannedACL(resp.Grants))
  65. }
  66. func (b *SBucket) GetLocation() string {
  67. return b.Region
  68. }
  69. func (b *SBucket) GetIRegion() cloudprovider.ICloudRegion {
  70. return b.region
  71. }
  72. func (b *SBucket) GetCreatedAt() time.Time {
  73. return b.CreationDate
  74. }
  75. func (b *SBucket) GetStorageClass() string {
  76. return b.DataRedundancyType
  77. }
  78. // https://docs.ksyun.com/documents/6761
  79. func (b *SBucket) GetAccessUrls() []cloudprovider.SBucketAccessUrl {
  80. ret := []cloudprovider.SBucketAccessUrl{
  81. {
  82. Url: fmt.Sprintf("%s.%s", b.Name, s3RegionEndpointMap[b.region.Region]),
  83. Description: "ExtranetEndpoint",
  84. Primary: true,
  85. },
  86. {
  87. Url: fmt.Sprintf("%s.%s", b.Name, strings.Replace(s3RegionEndpointMap[b.region.Region], ".ksyuncs.com", "-internal.ksyuncs.com", 1)),
  88. Description: "IntranetEndpoint",
  89. },
  90. }
  91. return ret
  92. }
  93. func (b *SBucket) GetStats() cloudprovider.SBucketStats {
  94. stats, err := cloudprovider.GetIBucketStats(b)
  95. if err != nil {
  96. log.Errorf("GetStats fail %s", err)
  97. }
  98. return stats
  99. }
  100. func (region *SRegion) SetBucketAcl(bucket string, acl cloudprovider.TBucketACLType) error {
  101. svc := region.getS3Client()
  102. aclStr := string(acl)
  103. input := &s3.PutBucketACLInput{
  104. Bucket: &bucket,
  105. ACL: &aclStr,
  106. }
  107. _, err := svc.PutBucketACL(input)
  108. return err
  109. }
  110. func (b *SBucket) SetAcl(aclStr cloudprovider.TBucketACLType) error {
  111. return b.region.SetBucketAcl(b.Name, aclStr)
  112. }
  113. func (b *SBucket) ListObjects(prefix string, marker string, delimiter string, maxCount int) (cloudprovider.SListObjectResult, error) {
  114. result := cloudprovider.SListObjectResult{}
  115. svc := b.region.getS3Client()
  116. input := &s3.ListObjectsInput{
  117. Bucket: &b.Name,
  118. }
  119. if len(prefix) > 0 {
  120. input.Prefix = &prefix
  121. }
  122. if len(delimiter) > 0 {
  123. input.Delimiter = &delimiter
  124. }
  125. if len(marker) > 0 {
  126. input.Marker = &marker
  127. }
  128. if maxCount > 0 {
  129. cnt := int64(maxCount)
  130. input.MaxKeys = &cnt
  131. }
  132. resp, err := svc.ListObjects(input)
  133. if err != nil {
  134. return result, err
  135. }
  136. result.Objects = make([]cloudprovider.ICloudObject, 0)
  137. for _, object := range resp.Contents {
  138. ksObj := cloudprovider.SBaseCloudObject{}
  139. if object.StorageClass != nil {
  140. ksObj.StorageClass = *object.StorageClass
  141. }
  142. if object.Key != nil {
  143. ksObj.Key = *object.Key
  144. }
  145. if object.Size != nil {
  146. ksObj.SizeBytes = *object.Size
  147. }
  148. if object.ETag != nil {
  149. ksObj.ETag = *object.ETag
  150. }
  151. if object.LastModified != nil {
  152. ksObj.LastModified = *object.LastModified
  153. }
  154. obj := &SObject{
  155. bucket: b,
  156. SBaseCloudObject: ksObj,
  157. }
  158. result.Objects = append(result.Objects, obj)
  159. }
  160. if resp.CommonPrefixes != nil {
  161. result.CommonPrefixes = make([]cloudprovider.ICloudObject, len(resp.CommonPrefixes))
  162. for i, commonPrefix := range resp.CommonPrefixes {
  163. result.CommonPrefixes[i] = &SObject{
  164. bucket: b,
  165. SBaseCloudObject: cloudprovider.SBaseCloudObject{Key: *commonPrefix.Prefix},
  166. }
  167. }
  168. }
  169. if resp.IsTruncated != nil {
  170. result.IsTruncated = *resp.IsTruncated
  171. }
  172. if resp.NextMarker != nil {
  173. result.NextMarker = *resp.NextMarker
  174. }
  175. return result, nil
  176. }
  177. func (b *SBucket) PutObject(ctx context.Context, key string, reader io.Reader, sizeBytes int64, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
  178. svc := b.region.getS3Client()
  179. input := &s3.PutObjectInput{
  180. Bucket: &b.Name,
  181. Key: &key,
  182. }
  183. if sizeBytes > 0 {
  184. input.ContentLength = &sizeBytes
  185. }
  186. if meta != nil {
  187. input.Metadata = make(map[string]*string)
  188. for k, v := range meta {
  189. input.Metadata[k] = &v[0]
  190. }
  191. }
  192. if len(cannedAcl) == 0 {
  193. cannedAcl = b.GetAcl()
  194. }
  195. acl := string(cannedAcl)
  196. input.ACL = &acl
  197. if len(storageClassStr) > 0 {
  198. storageClass := string(storageClassStr)
  199. input.StorageClass = &storageClass
  200. }
  201. seeker, err := fileutils.NewReadSeeker(reader, sizeBytes)
  202. if err != nil {
  203. return errors.Wrap(err, "newFakeSeeker")
  204. }
  205. defer seeker.Close()
  206. input.Body = seeker
  207. _, err = svc.PutObject(input)
  208. if err != nil {
  209. return errors.Wrap(err, "PutObject")
  210. }
  211. return nil
  212. }
  213. func (b *SBucket) NewMultipartUpload(ctx context.Context, key string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) (string, error) {
  214. svc := b.region.getS3Client()
  215. input := &s3.CreateMultipartUploadInput{
  216. Bucket: &b.Name,
  217. Key: &key,
  218. }
  219. if meta != nil {
  220. input.Metadata = make(map[string]*string)
  221. for k, v := range meta {
  222. input.Metadata[k] = &v[0]
  223. }
  224. }
  225. if len(cannedAcl) == 0 {
  226. cannedAcl = b.GetAcl()
  227. }
  228. acl := string(cannedAcl)
  229. input.ACL = &acl
  230. if len(storageClassStr) > 0 {
  231. storageClass := string(storageClassStr)
  232. input.StorageClass = &storageClass
  233. }
  234. output, err := svc.CreateMultipartUpload(input)
  235. if err != nil {
  236. return "", errors.Wrap(err, "CreateMultipartUpload")
  237. }
  238. return *output.UploadID, nil
  239. }
  240. func (b *SBucket) UploadPart(ctx context.Context, key string, uploadId string, partIndex int, reader io.Reader, partSize int64, offset, totalSize int64) (string, error) {
  241. svc := b.region.getS3Client()
  242. pn := int64(partIndex)
  243. input := &s3.UploadPartInput{
  244. Bucket: &b.Name,
  245. Key: &key,
  246. UploadID: &uploadId,
  247. PartNumber: &pn,
  248. }
  249. seeker, err := fileutils.NewReadSeeker(reader, partSize)
  250. if err != nil {
  251. return "", errors.Wrap(err, "newFakeSeeker")
  252. }
  253. defer seeker.Close()
  254. input.Body = seeker
  255. output, err := svc.UploadPart(input)
  256. if err != nil {
  257. return "", errors.Wrap(err, "UploadPart")
  258. }
  259. return *output.ETag, nil
  260. }
  261. func (b *SBucket) CompleteMultipartUpload(ctx context.Context, key string, uploadId string, partEtags []string) error {
  262. svc := b.region.getS3Client()
  263. input := &s3.CompleteMultipartUploadInput{
  264. Bucket: &b.Name,
  265. Key: &key,
  266. UploadID: &uploadId,
  267. MultipartUpload: &s3.CompletedMultipartUpload{},
  268. }
  269. parts := make([]*s3.CompletedPart, len(partEtags))
  270. for i := range partEtags {
  271. pn := int64(i + 1)
  272. parts[i] = &s3.CompletedPart{
  273. PartNumber: &pn,
  274. ETag: &partEtags[i],
  275. }
  276. }
  277. input.MultipartUpload.Parts = parts
  278. _, err := svc.CompleteMultipartUpload(input)
  279. return err
  280. }
  281. func (b *SBucket) AbortMultipartUpload(ctx context.Context, key string, uploadId string) error {
  282. svc := b.region.getS3Client()
  283. input := &s3.AbortMultipartUploadInput{
  284. Bucket: &b.Name,
  285. Key: &key,
  286. UploadID: &uploadId,
  287. }
  288. _, err := svc.AbortMultipartUpload(input)
  289. return err
  290. }
  291. func (b *SBucket) DeleteObject(ctx context.Context, key string) error {
  292. svc := b.region.getS3Client()
  293. input := &s3.DeleteObjectInput{
  294. Bucket: &b.Name,
  295. Key: &key,
  296. }
  297. _, err := svc.DeleteObject(input)
  298. return err
  299. }
  300. func (b *SBucket) GetTempUrl(method string, key string, expire time.Duration) (string, error) {
  301. svc := b.region.getS3Client()
  302. input := &s3.GeneratePresignedUrlInput{
  303. Bucket: &b.Name,
  304. Key: &key,
  305. }
  306. input.HTTPMethod = s3.HTTPMethod(method)
  307. input.Expires = int64(expire / time.Second)
  308. return svc.GeneratePresignedUrl(input)
  309. }
  310. func (b *SBucket) CopyObject(ctx context.Context, destKey string, srcBucket, srcKey string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
  311. svc := b.region.getS3Client()
  312. input := &s3.CopyObjectInput{
  313. Bucket: &b.Name,
  314. Key: &destKey,
  315. SourceBucket: &srcBucket,
  316. SourceKey: &srcKey,
  317. }
  318. if meta != nil {
  319. input.Metadata = make(map[string]*string)
  320. for k, v := range meta {
  321. input.Metadata[k] = &v[0]
  322. }
  323. }
  324. if cannedAcl != cloudprovider.ACLPrivate {
  325. acl := string(cannedAcl)
  326. input.ACL = &acl
  327. }
  328. if storageClassStr != "" {
  329. storageClass := string(storageClassStr)
  330. input.StorageClass = &storageClass
  331. }
  332. _, err := svc.CopyObject(input)
  333. return err
  334. }
  335. func (b *SBucket) GetObject(ctx context.Context, key string, rangeOpt *cloudprovider.SGetObjectRange) (io.ReadCloser, error) {
  336. svc := b.region.getS3Client()
  337. input := &s3.GetObjectInput{
  338. Bucket: &b.Name,
  339. Key: &key,
  340. }
  341. if rangeOpt != nil {
  342. rangeInput := rangeOpt.String()
  343. input.Range = &rangeInput
  344. }
  345. output, err := svc.GetObject(input)
  346. if err != nil {
  347. return nil, errors.Wrap(err, "GetObject")
  348. }
  349. return output.Body, nil
  350. }
  351. func (b *SBucket) CopyPart(ctx context.Context, key string, uploadId string, partNumber int, srcBucket string, srcKey string, srcOffset int64, srcLength int64) (string, error) {
  352. svc := b.region.getS3Client()
  353. pn := int64(partNumber)
  354. copySourceRange := fmt.Sprintf("bytes=%d-%d", srcOffset, srcOffset+srcLength-1)
  355. input := &s3.UploadPartCopyInput{
  356. Bucket: &b.Name,
  357. Key: &key,
  358. UploadID: &uploadId,
  359. PartNumber: &pn,
  360. SourceBucket: &srcBucket,
  361. SourceKey: &srcKey,
  362. CopySourceRange: &copySourceRange,
  363. }
  364. output, err := svc.UploadPartCopy(input)
  365. if err != nil {
  366. return "", errors.Wrap(err, "UploadPartCopy")
  367. }
  368. return *output.CopyPartResult.ETag, nil
  369. }
  370. func (b *SBucket) GetTags() (map[string]string, error) {
  371. svc := b.region.getS3Client()
  372. input := &s3.GetBucketTaggingInput{
  373. Bucket: &b.Name,
  374. }
  375. output, err := svc.GetBucketTagging(input)
  376. if err != nil {
  377. return nil, errors.Wrap(err, "GetBucketTagging")
  378. }
  379. result := map[string]string{}
  380. if output.Tagging == nil {
  381. return nil, nil
  382. }
  383. for _, tag := range output.Tagging.TagSet {
  384. result[*tag.Key] = *tag.Value
  385. }
  386. return result, nil
  387. }
  388. func (b *SBucket) SetTags(tags map[string]string, replace bool) error {
  389. svc := b.region.getS3Client()
  390. input := &s3.PutBucketTaggingInput{
  391. Bucket: &b.Name,
  392. }
  393. if replace {
  394. input.Tagging = &s3.Tagging{
  395. TagSet: make([]*s3.Tag, 0),
  396. }
  397. }
  398. for k, v := range tags {
  399. input.Tagging.TagSet = append(input.Tagging.TagSet, &s3.Tag{Key: &k, Value: &v})
  400. }
  401. _, err := svc.PutBucketTagging(input)
  402. log.Infof("put tagging %s error: %v", jsonutils.Marshal(input).String(), err)
  403. if err != nil {
  404. return errors.Wrapf(err, "PutBucketTagging(%s)", b.Name)
  405. }
  406. return nil
  407. }
  408. func (b *SBucket) ListMultipartUploads() ([]cloudprovider.SBucketMultipartUploads, error) {
  409. result := []cloudprovider.SBucketMultipartUploads{}
  410. svc := b.region.getS3Client()
  411. keyMarker := ""
  412. uploadIDMarker := ""
  413. for {
  414. input := &s3.ListMultipartUploadsInput{
  415. Bucket: &b.Name,
  416. KeyMarker: &keyMarker,
  417. UploadIDMarker: &uploadIDMarker,
  418. }
  419. output, err := svc.ListMultipartUploads(input)
  420. if err != nil {
  421. return nil, errors.Wrap(err, "ListMultipartUploads")
  422. }
  423. for _, upload := range output.Uploads {
  424. result = append(result, cloudprovider.SBucketMultipartUploads{
  425. ObjectName: *upload.Key,
  426. UploadID: *upload.UploadID,
  427. Initiated: *upload.Initiated,
  428. })
  429. }
  430. keyMarker = *output.NextKeyMarker
  431. uploadIDMarker = *output.NextUploadIDMarker
  432. if output.IsTruncated == nil || !*output.IsTruncated {
  433. break
  434. }
  435. }
  436. return result, nil
  437. }
  438. func (region *SRegion) GetIBuckets() ([]cloudprovider.ICloudBucket, error) {
  439. buckets, err := region.client.GetBuckets()
  440. if err != nil {
  441. return nil, err
  442. }
  443. ret := make([]cloudprovider.ICloudBucket, 0)
  444. for i := range buckets {
  445. buckets[i].region = region
  446. if strings.EqualFold(buckets[i].Region, s3RegionMap[region.Region]) {
  447. ret = append(ret, &buckets[i])
  448. }
  449. }
  450. return ret, nil
  451. }
  452. func (region *SRegion) getS3Client() *s3.S3 {
  453. return region.client.getS3Client(region.Region)
  454. }
  455. // https://docs.ksyun.com/documents/6761?type=3
  456. var s3RegionMap = map[string]string{
  457. "": "BEIJING",
  458. "ap-singapore-1": "SINGAPORE",
  459. "cn-beijing-6": "BEIJING",
  460. "cn-guangzhou-1": "GUANGZHOU",
  461. "cn-shanghai-2": "SHANGHAI",
  462. "cn-northwest-1": "QINGYANG",
  463. }
  464. var s3RegionEndpointMap = map[string]string{
  465. "": "ks3-cn-beijing.ksyuncs.com",
  466. "ap-singapore-1": "ks3-sgp.ksyuncs.com",
  467. "cn-beijing-6": "ks3-cn-beijing.ksyuncs.com",
  468. "cn-guangzhou-1": "ks3-cn-guangzhou.ksyuncs.com",
  469. "cn-shanghai-2": "ks3-cn-shanghai.ksyuncs.com",
  470. "cn-northwest-1": "ks3-cn-qingyang.ksyuncs.com",
  471. }
  472. func (cli *SKsyunClient) getS3Client(regionId string) *s3.S3 {
  473. aksk := credentials.NewStaticCredentials(cli.accessKeyId, cli.accessKeySecret, "")
  474. cfg := aws.Config{
  475. Region: s3RegionMap[regionId],
  476. Credentials: aksk,
  477. Endpoint: s3RegionEndpointMap[regionId],
  478. HTTPClient: cli.getDefaultClient(),
  479. SignerVersion: "V4_UNSIGNED_PAYLOAD_SIGNER",
  480. MaxRetries: 1,
  481. }
  482. if cli.debug {
  483. cfg.LogLevel = aws.Debug
  484. }
  485. return s3.New(&cfg)
  486. }
  487. func (cli *SKsyunClient) GetBuckets() ([]SBucket, error) {
  488. svc := cli.getS3Client("")
  489. input := &s3.ListBucketsInput{}
  490. resp, err := svc.ListBuckets(input)
  491. if err != nil {
  492. return nil, err
  493. }
  494. ret := make([]SBucket, 0)
  495. for _, b := range resp.Buckets {
  496. ret = append(ret, SBucket{
  497. Name: *b.Name,
  498. Region: *b.Region,
  499. Type: *b.Type,
  500. VisitType: *b.VisitType,
  501. DataRedundancyType: *b.DataRedundancyType,
  502. CreationDate: *b.CreationDate,
  503. })
  504. }
  505. return ret, nil
  506. }
  507. func (region *SRegion) CreateIBucket(name string, storageClassStr string, aclStr string) error {
  508. svc := region.getS3Client()
  509. input := &s3.CreateBucketInput{
  510. Bucket: &name,
  511. }
  512. if aclStr != "" {
  513. acl := string(aclStr)
  514. input.ACL = &acl
  515. }
  516. _, err := svc.CreateBucket(input)
  517. return err
  518. }
  519. func (region *SRegion) GetIBucketById(name string) (cloudprovider.ICloudBucket, error) {
  520. buckets, err := region.GetIBuckets()
  521. if err != nil {
  522. return nil, errors.Wrap(err, "region.GetBuckets")
  523. }
  524. for _, b := range buckets {
  525. if b.GetName() == name {
  526. return b, nil
  527. }
  528. }
  529. return nil, errors.Wrap(cloudprovider.ErrNotFound, "Bucket Not Found")
  530. }
  531. func (region *SRegion) GetIBucketByName(name string) (cloudprovider.ICloudBucket, error) {
  532. return region.GetIBucketById(name)
  533. }
  534. func (region *SRegion) DeleteIBucket(name string) error {
  535. svc := region.getS3Client()
  536. input := &s3.DeleteBucketInput{
  537. Bucket: &name,
  538. }
  539. _, err := svc.DeleteBucket(input)
  540. return err
  541. }
  542. func (region *SRegion) IBucketExist(name string) (bool, error) {
  543. svc := region.getS3Client()
  544. input := &s3.HeadBucketInput{
  545. Bucket: &name,
  546. }
  547. _, err := svc.HeadBucket(input)
  548. if err != nil {
  549. return false, err
  550. }
  551. return true, nil
  552. }