bucket.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. // Copyright 2023 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package volcengine
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "net/url"
  21. "time"
  22. tos "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
  23. "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
  24. "yunion.io/x/cloudmux/pkg/cloudprovider"
  25. "yunion.io/x/cloudmux/pkg/multicloud"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/util/fileutils"
  29. )
  30. type SBucket struct {
  31. multicloud.SBaseBucket
  32. VolcEngineTags
  33. region *SRegion
  34. Name string
  35. Location string
  36. CreationDate time.Time
  37. }
  38. func (b *SBucket) GetProjectId() string {
  39. return ""
  40. }
  41. func (b *SBucket) GetGlobalId() string {
  42. return b.Name
  43. }
  44. func (b *SBucket) GetName() string {
  45. return b.Name
  46. }
  47. func (b *SBucket) GetLocation() string {
  48. return b.Location
  49. }
  50. func (b *SBucket) GetIRegion() cloudprovider.ICloudRegion {
  51. return b.region
  52. }
  53. func (b *SBucket) GetCreatedAt() time.Time {
  54. return b.CreationDate
  55. }
  56. func (b *SBucket) GetStorageClass() string {
  57. toscli, err := b.region.GetTosClient()
  58. if err != nil {
  59. return ""
  60. }
  61. input := &tos.HeadBucketInput{
  62. Bucket: b.Name,
  63. }
  64. output, err := toscli.HeadBucket(context.Background(), input)
  65. if err != nil {
  66. return ""
  67. }
  68. return string(output.StorageClass)
  69. }
  70. func (b *SBucket) GetStats() cloudprovider.SBucketStats {
  71. stats, _ := cloudprovider.GetIBucketStats(b)
  72. return stats
  73. }
  74. func (b *SBucket) GetAccessUrls() []cloudprovider.SBucketAccessUrl {
  75. ret := []cloudprovider.SBucketAccessUrl{
  76. {
  77. Url: fmt.Sprintf("%s.%s", b.Name, b.region.getTOSExternalDomain()),
  78. Description: "ExtranetEndpoint",
  79. Primary: true,
  80. },
  81. {
  82. Url: fmt.Sprintf("%s.%s", b.Name, b.region.getTOSInternalDomain()),
  83. Description: "IntranetEndpoint",
  84. },
  85. }
  86. return ret
  87. }
  88. func grantToCannedAcl(acls []tos.GrantV2) cloudprovider.TBucketACLType {
  89. isWrite, isRead := false, false
  90. for _, acl := range acls {
  91. if acl.GranteeV2.Type != enum.GranteeGroup || acl.GranteeV2.Canned != enum.CannedAllUsers {
  92. continue
  93. }
  94. switch acl.Permission {
  95. case enum.PermissionWrite:
  96. isWrite = true
  97. case enum.PermissionRead:
  98. isRead = true
  99. }
  100. }
  101. if isWrite && isRead {
  102. return cloudprovider.ACLPublicReadWrite
  103. }
  104. if isRead {
  105. return cloudprovider.ACLPublicRead
  106. }
  107. return cloudprovider.ACLPrivate
  108. }
  109. func (b *SBucket) GetAcl() cloudprovider.TBucketACLType {
  110. acl := cloudprovider.ACLPrivate
  111. toscli, err := b.region.GetTosClient()
  112. if err != nil {
  113. log.Errorf("GetTosClient fail %s", err)
  114. return acl
  115. }
  116. input := tos.GetBucketACLInput{}
  117. input.Bucket = b.Name
  118. output, err := toscli.GetBucketACL(context.Background(), &input)
  119. if err != nil {
  120. log.Errorf("toscli.GetBucketAcl fail %s", err)
  121. return acl
  122. }
  123. return grantToCannedAcl(output.Grants)
  124. }
  125. func (b *SBucket) SetAcl(aclStr cloudprovider.TBucketACLType) error {
  126. toscli, err := b.region.GetTosClient()
  127. if err != nil {
  128. return errors.Wrapf(err, "Get TosClient")
  129. }
  130. input := tos.PutBucketACLInput{}
  131. input.Bucket = b.Name
  132. input.ACLType = enum.ACLType(aclStr)
  133. _, err = toscli.PutBucketACL(context.Background(), &input)
  134. if err != nil {
  135. return errors.Wrapf(err, "PutBucketAcl")
  136. }
  137. return nil
  138. }
  139. func (b *SBucket) NewMultipartUpload(ctx context.Context, key string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) (string, error) {
  140. toscli, err := b.region.GetTosClient()
  141. if err != nil {
  142. return "", errors.Wrapf(err, "GetTosClient")
  143. }
  144. input := &tos.CreateMultipartUploadV2Input{
  145. Bucket: b.Name,
  146. Key: key,
  147. ACL: enum.ACLType(cannedAcl),
  148. StorageClass: enum.StorageClassType(storageClassStr),
  149. Meta: map[string]string{},
  150. }
  151. for k := range meta {
  152. input.Meta[k] = meta.Get(k)
  153. }
  154. output, err := toscli.CreateMultipartUploadV2(ctx, input)
  155. if err != nil {
  156. return "", err
  157. }
  158. return output.UploadID, nil
  159. }
  160. func (b *SBucket) AbortMultipartUpload(ctx context.Context, key string, uploadId string) error {
  161. toscli, err := b.region.GetTosClient()
  162. if err != nil {
  163. return errors.Wrapf(err, "GetTosClient")
  164. }
  165. _, err = toscli.AbortMultipartUpload(ctx, &tos.AbortMultipartUploadInput{Bucket: b.Name, Key: key, UploadID: uploadId})
  166. if err != nil {
  167. return errors.Wrapf(err, "AbortMultipartUploadWithContext")
  168. }
  169. return nil
  170. }
  171. func (b *SBucket) CompleteMultipartUpload(ctx context.Context, key string, uploadId string, partEtags []string) error {
  172. toscli, err := b.region.GetTosClient()
  173. if err != nil {
  174. return errors.Wrap(err, "GetTosClient")
  175. }
  176. parts := make([]tos.UploadedPartV2, len(partEtags))
  177. for i := range partEtags {
  178. parts[i].PartNumber = int(i + 1)
  179. parts[i].ETag = partEtags[i]
  180. }
  181. _, err = toscli.CompleteMultipartUploadV2(ctx, &tos.CompleteMultipartUploadV2Input{Bucket: b.Name, Key: key, UploadID: uploadId, Parts: parts})
  182. if err != nil {
  183. return errors.Wrapf(err, "CompleteMultipartUploadV2")
  184. }
  185. return nil
  186. }
  187. func (b *SBucket) CopyObject(ctx context.Context, destKey string, srcBucket, srcKey string, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
  188. toscli, err := b.region.GetTosClient()
  189. if err != nil {
  190. return errors.Wrap(err, "GetTosClient")
  191. }
  192. if len(cannedAcl) == 0 {
  193. cannedAcl = b.GetAcl()
  194. }
  195. var metaDir string
  196. metaHdr := make(map[string]string)
  197. cacheControl := ""
  198. if meta != nil {
  199. for k, v := range meta {
  200. if len(v) == 0 || len(v[0]) == 0 {
  201. continue
  202. }
  203. switch http.CanonicalHeaderKey(k) {
  204. case cloudprovider.META_HEADER_CACHE_CONTROL:
  205. cacheControl = v[0]
  206. case cloudprovider.META_HEADER_CONTENT_TYPE:
  207. cacheControl = v[0]
  208. case cloudprovider.META_HEADER_CONTENT_LANGUAGE:
  209. cacheControl = v[0]
  210. case cloudprovider.META_HEADER_CONTENT_ENCODING:
  211. cacheControl = v[0]
  212. case cloudprovider.META_HEADER_CONTENT_DISPOSITION:
  213. cacheControl = v[0]
  214. default:
  215. metaHdr[k] = v[0]
  216. }
  217. }
  218. metaDir = "REPLACE"
  219. } else {
  220. metaDir = "COPY"
  221. }
  222. input := tos.CopyObjectInput{SrcBucket: srcBucket, Bucket: b.Name, Key: destKey, SrcKey: url.PathEscape(srcKey), StorageClass: enum.StorageClassType(storageClassStr), ACL: enum.ACLType(cannedAcl), MetadataDirective: enum.MetadataDirectiveType(metaDir)}
  223. if len(cacheControl) > 0 {
  224. input.CacheControl = cacheControl
  225. }
  226. if len(metaHdr) > 0 {
  227. input.Meta = metaHdr
  228. }
  229. _, err = toscli.CopyObject(ctx, &input)
  230. if err != nil {
  231. return errors.Wrapf(err, "CopyObject")
  232. }
  233. return nil
  234. }
  235. func (b *SBucket) CopyPart(ctx context.Context, key string, uploadId string, partNumber int, srcBucket string, srcKey string, srcOffset int64, srcLength int64) (string, error) {
  236. toscli, err := b.region.GetTosClient()
  237. if err != nil {
  238. return "", errors.Wrap(err, "GetTosClient")
  239. }
  240. input := tos.UploadPartCopyV2Input{}
  241. input.Bucket = b.Name
  242. input.Key = key
  243. input.UploadID = uploadId
  244. input.PartNumber = partNumber
  245. input.SrcBucket = srcBucket
  246. input.SrcKey = srcKey
  247. if srcLength > 0 {
  248. input.CopySourceRange = fmt.Sprintf("bytes=%d-%d", srcOffset, srcOffset+srcLength-1)
  249. }
  250. output, err := toscli.UploadPartCopyV2(ctx, &input)
  251. if err != nil {
  252. return "", errors.Wrapf(err, "CopyPart")
  253. }
  254. return output.ETag, nil
  255. }
  256. func (b *SBucket) UploadPart(ctx context.Context, key string, uploadId string, partIndex int, part io.Reader, partSize int64, offset, totalSize int64) (string, error) {
  257. toscli, err := b.region.GetTosClient()
  258. if err != nil {
  259. return "", errors.Wrap(err, "GetTosClient")
  260. }
  261. input := tos.UploadPartV2Input{}
  262. input.Bucket = b.Name
  263. input.Key = key
  264. input.UploadID = uploadId
  265. input.PartNumber = int(partIndex)
  266. seeker, err := fileutils.NewReadSeeker(part, partSize)
  267. if err != nil {
  268. return "", errors.Wrap(err, "newFakeSeeker")
  269. }
  270. defer seeker.Close()
  271. input.Content = seeker
  272. input.ContentLength = partSize
  273. output, err := toscli.UploadPartV2(ctx, &input)
  274. if err != nil {
  275. return "", errors.Wrapf(err, "UploadPart")
  276. }
  277. return output.ETag, nil
  278. }
  279. func (b *SBucket) DeleteObject(ctx context.Context, key string) error {
  280. toscli, err := b.region.GetTosClient()
  281. if err != nil {
  282. return errors.Wrap(err, "GetTosClient")
  283. }
  284. input := tos.DeleteObjectV2Input{
  285. Bucket: b.Name,
  286. Key: key,
  287. }
  288. _, err = toscli.DeleteObjectV2(ctx, &input)
  289. if err != nil {
  290. return errors.Wrap(err, "DeleteObject")
  291. }
  292. return nil
  293. }
  294. func (b *SBucket) GetObject(ctx context.Context, key string, rangeOpt *cloudprovider.SGetObjectRange) (io.ReadCloser, error) {
  295. toscli, err := b.region.GetTosClient()
  296. if err != nil {
  297. return nil, errors.Wrap(err, "GetTosClient")
  298. }
  299. input := tos.GetObjectV2Input{
  300. Bucket: b.Name,
  301. Key: key,
  302. RangeStart: rangeOpt.Start,
  303. RangeEnd: rangeOpt.End,
  304. }
  305. output, err := toscli.GetObjectV2(ctx, &input)
  306. if err != nil {
  307. return nil, errors.Wrap(err, "DeleteObject")
  308. }
  309. return output.Content, nil
  310. }
  311. func (b *SBucket) ListObjects(prefix string, marker string, delimiter string, maxCount int) (cloudprovider.SListObjectResult, error) {
  312. result := cloudprovider.SListObjectResult{}
  313. toscli, err := b.region.GetTosClient()
  314. if err != nil {
  315. return result, errors.Wrap(err, "GetTosClient")
  316. }
  317. input := tos.ListObjectsV2Input{}
  318. input.Bucket = b.Name
  319. if len(prefix) > 0 {
  320. input.Prefix = prefix
  321. }
  322. if len(marker) > 0 {
  323. input.Marker = marker
  324. }
  325. if len(delimiter) > 0 {
  326. input.Delimiter = delimiter
  327. }
  328. if maxCount > 0 {
  329. input.MaxKeys = maxCount
  330. }
  331. output, err := toscli.ListObjectsV2(context.Background(), &input)
  332. if err != nil {
  333. return result, errors.Wrap(err, "ListObjects")
  334. }
  335. for _, object := range output.Contents {
  336. obj := &SObject{
  337. bucket: b,
  338. SBaseCloudObject: cloudprovider.SBaseCloudObject{
  339. StorageClass: string(object.StorageClass),
  340. Key: object.Key,
  341. SizeBytes: object.Size,
  342. ETag: object.ETag,
  343. LastModified: object.LastModified,
  344. },
  345. }
  346. result.Objects = append(result.Objects, obj)
  347. }
  348. if output.CommonPrefixes != nil {
  349. result.CommonPrefixes = make([]cloudprovider.ICloudObject, len(output.CommonPrefixes))
  350. for i, commonPrefix := range output.CommonPrefixes {
  351. result.CommonPrefixes[i] = &SObject{
  352. bucket: b,
  353. SBaseCloudObject: cloudprovider.SBaseCloudObject{Key: commonPrefix.Prefix},
  354. }
  355. }
  356. }
  357. result.IsTruncated = output.IsTruncated
  358. result.NextMarker = output.NextMarker
  359. return result, nil
  360. }
  361. func (b *SBucket) GetTempUrl(method string, key string, expire time.Duration) (string, error) {
  362. toscli, err := b.region.GetTosClient()
  363. if err != nil {
  364. return "", errors.Wrapf(err, "GetTosClient")
  365. }
  366. input := &tos.PreSignedURLInput{
  367. HTTPMethod: enum.HttpMethodGet,
  368. Bucket: b.Name,
  369. Key: key,
  370. Expires: int64(expire.Seconds()),
  371. }
  372. output, err := toscli.PreSignedURL(input)
  373. if err != nil {
  374. return "", err
  375. }
  376. return output.SignedUrl, nil
  377. }
  378. func (b *SBucket) PutObject(ctx context.Context, key string, body io.Reader, sizeBytes int64, cannedAcl cloudprovider.TBucketACLType, storageClassStr string, meta http.Header) error {
  379. if sizeBytes < 0 {
  380. return errors.Error("context length expected")
  381. }
  382. toscli, err := b.region.GetTosClient()
  383. if err != nil {
  384. return errors.Wrapf(err, "GetTosClient")
  385. }
  386. input := tos.PutObjectV2Input{}
  387. input.Bucket = b.Name
  388. input.Key = key
  389. seeker, err := fileutils.NewReadSeeker(body, sizeBytes)
  390. if err != nil {
  391. return errors.Wrap(err, "newFakeSeeker")
  392. }
  393. defer seeker.Close()
  394. input.Content = body
  395. input.ContentLength = sizeBytes
  396. if meta != nil {
  397. metaHdr := make(map[string]string)
  398. for k, v := range meta {
  399. if len(v) == 0 || len(v[0]) == 0 {
  400. continue
  401. }
  402. switch http.CanonicalHeaderKey(k) {
  403. case cloudprovider.META_HEADER_CACHE_CONTROL:
  404. input.CacheControl = v[0]
  405. case cloudprovider.META_HEADER_CONTENT_TYPE:
  406. input.ContentType = v[0]
  407. case cloudprovider.META_HEADER_CONTENT_MD5:
  408. input.ContentMD5 = v[0]
  409. case cloudprovider.META_HEADER_CONTENT_LANGUAGE:
  410. input.ContentEncoding = v[0]
  411. case cloudprovider.META_HEADER_CONTENT_ENCODING:
  412. input.ContentDisposition = v[0]
  413. default:
  414. metaHdr[k] = v[0]
  415. }
  416. }
  417. if len(metaHdr) > 0 {
  418. input.Meta = metaHdr
  419. }
  420. }
  421. if len(cannedAcl) > 0 {
  422. cannedAcl = b.GetAcl()
  423. }
  424. input.ACL = enum.ACLType(cannedAcl)
  425. if len(storageClassStr) > 0 {
  426. input.StorageClass = enum.StorageClassType(storageClassStr)
  427. }
  428. _, err = toscli.PutObjectV2(ctx, &input)
  429. if err != nil {
  430. return errors.Wrapf(err, "PutObject")
  431. }
  432. return nil
  433. }