object.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package object
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "net/url"
  21. "os"
  22. "strings"
  23. "time"
  24. "yunion.io/x/cloudmux/pkg/cloudprovider"
  25. "yunion.io/x/cloudmux/pkg/multicloud/objectstore"
  26. "yunion.io/x/log"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/util/streamutils"
  29. )
  30. type SObjectBackupStorage struct {
  31. BackupStorageId string
  32. bucket string
  33. extBucket string
  34. store *objectstore.SObjectStoreClient
  35. extStore *objectstore.SObjectStoreClient
  36. }
  37. func newObjectBackupStorage(backupStorageId, bucketUrl, accessKey, secret string, signVer objectstore.S3SignVersion, bucketUrlExt string) (*SObjectBackupStorage, error) {
  38. bucket, endpoint, err := parseBucketUrl(bucketUrl)
  39. if err != nil {
  40. return nil, errors.Wrapf(err, "parseBucketUrl %s", bucketUrl)
  41. }
  42. cfg := objectstore.NewObjectStoreClientConfig(endpoint, accessKey, secret)
  43. if len(signVer) > 0 {
  44. cfg = cfg.SignVersion(signVer)
  45. }
  46. store, err := objectstore.NewObjectStoreClient(cfg)
  47. if err != nil {
  48. return nil, errors.Wrap(err, "NewObjectStoreClient")
  49. }
  50. var extStore *objectstore.SObjectStoreClient
  51. var extBucket string
  52. if len(bucketUrlExt) > 0 {
  53. b, extEndpoint, err := parseBucketUrl(bucketUrlExt)
  54. if err != nil {
  55. log.Errorf("parseBucketUrl %s: %v", bucketUrlExt, err)
  56. } else {
  57. extCfg := objectstore.NewObjectStoreClientConfig(extEndpoint, accessKey, secret)
  58. extStore, _ = objectstore.NewObjectStoreClient(extCfg)
  59. if extStore != nil {
  60. extBucket = b
  61. }
  62. }
  63. }
  64. return &SObjectBackupStorage{
  65. BackupStorageId: backupStorageId,
  66. bucket: bucket,
  67. store: store,
  68. extBucket: extBucket,
  69. extStore: extStore,
  70. }, nil
  71. }
  72. func parseBucketUrl(bucketUrl string) (string, string, error) {
  73. bu, err := url.Parse(bucketUrl)
  74. if err != nil {
  75. return "", "", errors.Wrapf(err, "ur.Parse %s", bucketUrl)
  76. }
  77. for len(bu.Path) > 0 && bu.Path[0] == '/' {
  78. bu.Path = bu.Path[1:]
  79. }
  80. if len(bu.Path) > 0 {
  81. bucket := strings.TrimRight(bu.Path, "/")
  82. bu.Path = ""
  83. return bucket, fmt.Sprintf("%s://%s", bu.Scheme, bu.Host), nil
  84. } else {
  85. parts := strings.Split(bu.Host, ".")
  86. if len(parts) < 3 {
  87. return "", "", errors.Wrapf(errors.ErrInvalidFormat, "host %s should have at least 3 segments", bu.Host)
  88. }
  89. return parts[0], fmt.Sprintf("%s//%s", bu.Scheme, bu.Host), nil
  90. }
  91. }
  92. const backupPathPrefix = "backups"
  93. const backupInstancePathPrefix = "backuppacks"
  94. func (s *SObjectBackupStorage) getBackupKey(backupId string) string {
  95. return fmt.Sprintf("%s/%s", backupPathPrefix, backupId)
  96. }
  97. func (s *SObjectBackupStorage) getBackupInstanceKey(backupInstancePackName string) string {
  98. return fmt.Sprintf("%s/%s", backupInstancePathPrefix, backupInstancePackName)
  99. }
  100. func (s *SObjectBackupStorage) getBucket() (cloudprovider.ICloudBucket, error) {
  101. bucket, err := s.store.GetIRegion().GetIBucketByName(s.bucket)
  102. if err != nil {
  103. return nil, errors.Wrap(err, "IBucketExist")
  104. }
  105. return bucket, nil
  106. }
  107. func (s *SObjectBackupStorage) getExtBucket() (cloudprovider.ICloudBucket, error) {
  108. if s.extStore == nil || len(s.extBucket) == 0 {
  109. return nil, errors.Wrap(errors.ErrInvalidStatus, "extStore is nil or extBucket is empty")
  110. }
  111. bucket, err := s.extStore.GetIRegion().GetIBucketByName(s.extBucket)
  112. if err != nil {
  113. return nil, errors.Wrap(err, "GetIBucketByName")
  114. }
  115. return bucket, nil
  116. }
  117. func (s *SObjectBackupStorage) SaveBackupFrom(ctx context.Context, srcFile io.Reader, fileSize int64, backupId string) error {
  118. return s.saveObject(ctx, srcFile, fileSize, backupId, s.getBackupKey)
  119. }
  120. func (s *SObjectBackupStorage) SaveBackupInstanceFrom(ctx context.Context, srcFile io.Reader, fileSize int64, backupId string) error {
  121. return s.saveObject(ctx, srcFile, fileSize, backupId, s.getBackupInstanceKey)
  122. }
  123. func (s *SObjectBackupStorage) saveObject(ctx context.Context, srcFile io.Reader, fileSize int64, id string, getKeyFunc func(string) string) error {
  124. bucket, err := s.getBucket()
  125. if err != nil {
  126. return errors.Wrap(err, "getBucket")
  127. }
  128. err = cloudprovider.UploadObject(ctx, bucket, getKeyFunc(id), 200*1024*1024, srcFile, fileSize, cloudprovider.ACLPrivate, "", nil, false)
  129. if err != nil {
  130. return errors.Wrapf(err, "UploadObject %d %s", fileSize, getKeyFunc(id))
  131. }
  132. return nil
  133. }
  134. func (s *SObjectBackupStorage) RestoreBackupTo(ctx context.Context, targetFilename string, backupId string) error {
  135. return s.restoreObject(ctx, targetFilename, backupId, s.getBackupKey)
  136. }
  137. func (s *SObjectBackupStorage) RestoreBackupInstanceTo(ctx context.Context, targetFilename string, backupId string) error {
  138. return s.restoreObject(ctx, targetFilename, backupId, s.getBackupInstanceKey)
  139. }
  140. func (s *SObjectBackupStorage) restoreObject(ctx context.Context, targetFilename string, id string, getKeyFunc func(string) string) error {
  141. bucket, err := s.getBucket()
  142. if err != nil {
  143. return errors.Wrap(err, "getBucket")
  144. }
  145. reader, err := bucket.GetObject(ctx, getKeyFunc(id), nil)
  146. if err != nil {
  147. return errors.Wrap(err, "GetObject")
  148. }
  149. file, err := os.OpenFile(targetFilename, os.O_CREATE|os.O_WRONLY, 0600)
  150. if err != nil {
  151. return errors.Wrapf(err, "OpenFile %s", targetFilename)
  152. }
  153. defer file.Close()
  154. _, err = streamutils.StreamPipe(reader, file, false, nil)
  155. if err != nil {
  156. return errors.Wrap(err, "StreamPipe")
  157. }
  158. return nil
  159. }
  160. func (s *SObjectBackupStorage) RemoveBackup(ctx context.Context, backupId string) error {
  161. return s.removeObject(ctx, backupId, s.getBackupKey)
  162. }
  163. func (s *SObjectBackupStorage) RemoveBackupInstance(ctx context.Context, backupId string) error {
  164. return s.removeObject(ctx, backupId, s.getBackupInstanceKey)
  165. }
  166. func (s *SObjectBackupStorage) removeObject(ctx context.Context, id string, getKeyFunc func(string) string) error {
  167. bucket, err := s.getBucket()
  168. if err != nil {
  169. return errors.Wrap(err, "getBucket")
  170. }
  171. err = bucket.DeleteObject(ctx, getKeyFunc(id))
  172. if err != nil {
  173. return errors.Wrap(err, "DeleteObject")
  174. }
  175. return nil
  176. }
  177. func (s *SObjectBackupStorage) IsBackupExists(backupId string) (bool, string, error) {
  178. return s.isObjectExists(backupId, s.getBackupKey)
  179. }
  180. func (s *SObjectBackupStorage) IsBackupInstanceExists(backupId string) (bool, string, error) {
  181. return s.isObjectExists(backupId, s.getBackupInstanceKey)
  182. }
  183. func (s *SObjectBackupStorage) isObjectExists(id string, getKeyFunc func(string) string) (bool, string, error) {
  184. bucket, err := s.getBucket()
  185. if err != nil {
  186. return false, "", errors.Wrap(err, "getBucket")
  187. }
  188. _, err = cloudprovider.GetIObject(bucket, getKeyFunc(id))
  189. if err != nil {
  190. if errors.Cause(err) == errors.ErrNotFound {
  191. return false, "", nil
  192. }
  193. return false, "", errors.Wrap(err, "GetIObject")
  194. }
  195. return true, "", nil
  196. }
  197. func (s *SObjectBackupStorage) IsOnline() (bool, string, error) {
  198. exist, err := s.store.GetIRegion().IBucketExist(s.bucket)
  199. if err != nil {
  200. return false, "", errors.Wrap(err, "IBucketExist")
  201. }
  202. return exist, "", nil
  203. }
  204. func (s *SObjectBackupStorage) GetExternalAccessUrl(backupId string) (string, error) {
  205. var bucket cloudprovider.ICloudBucket
  206. var err error
  207. bucket, err = s.getExtBucket()
  208. if err != nil {
  209. log.Errorf("SObjectBackupStorage.GetExternalAccessUrl.getExtBucket: %v", err)
  210. bucket, err = s.getBucket()
  211. if err != nil {
  212. return "", errors.Wrap(err, "getBucket")
  213. }
  214. }
  215. url, err := bucket.GetTempUrl(http.MethodGet, s.getBackupKey(backupId), 6*time.Hour)
  216. if err != nil {
  217. return "", errors.Wrap(err, "GetTempUrl")
  218. }
  219. return url, nil
  220. }