| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package object
- import (
- "context"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "os"
- "strings"
- "time"
- "yunion.io/x/cloudmux/pkg/cloudprovider"
- "yunion.io/x/cloudmux/pkg/multicloud/objectstore"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/streamutils"
- )
- type SObjectBackupStorage struct {
- BackupStorageId string
- bucket string
- extBucket string
- store *objectstore.SObjectStoreClient
- extStore *objectstore.SObjectStoreClient
- }
- func newObjectBackupStorage(backupStorageId, bucketUrl, accessKey, secret string, signVer objectstore.S3SignVersion, bucketUrlExt string) (*SObjectBackupStorage, error) {
- bucket, endpoint, err := parseBucketUrl(bucketUrl)
- if err != nil {
- return nil, errors.Wrapf(err, "parseBucketUrl %s", bucketUrl)
- }
- cfg := objectstore.NewObjectStoreClientConfig(endpoint, accessKey, secret)
- if len(signVer) > 0 {
- cfg = cfg.SignVersion(signVer)
- }
- store, err := objectstore.NewObjectStoreClient(cfg)
- if err != nil {
- return nil, errors.Wrap(err, "NewObjectStoreClient")
- }
- var extStore *objectstore.SObjectStoreClient
- var extBucket string
- if len(bucketUrlExt) > 0 {
- b, extEndpoint, err := parseBucketUrl(bucketUrlExt)
- if err != nil {
- log.Errorf("parseBucketUrl %s: %v", bucketUrlExt, err)
- } else {
- extCfg := objectstore.NewObjectStoreClientConfig(extEndpoint, accessKey, secret)
- extStore, _ = objectstore.NewObjectStoreClient(extCfg)
- if extStore != nil {
- extBucket = b
- }
- }
- }
- return &SObjectBackupStorage{
- BackupStorageId: backupStorageId,
- bucket: bucket,
- store: store,
- extBucket: extBucket,
- extStore: extStore,
- }, nil
- }
- func parseBucketUrl(bucketUrl string) (string, string, error) {
- bu, err := url.Parse(bucketUrl)
- if err != nil {
- return "", "", errors.Wrapf(err, "ur.Parse %s", bucketUrl)
- }
- for len(bu.Path) > 0 && bu.Path[0] == '/' {
- bu.Path = bu.Path[1:]
- }
- if len(bu.Path) > 0 {
- bucket := strings.TrimRight(bu.Path, "/")
- bu.Path = ""
- return bucket, fmt.Sprintf("%s://%s", bu.Scheme, bu.Host), nil
- } else {
- parts := strings.Split(bu.Host, ".")
- if len(parts) < 3 {
- return "", "", errors.Wrapf(errors.ErrInvalidFormat, "host %s should have at least 3 segments", bu.Host)
- }
- return parts[0], fmt.Sprintf("%s//%s", bu.Scheme, bu.Host), nil
- }
- }
- const backupPathPrefix = "backups"
- const backupInstancePathPrefix = "backuppacks"
- func (s *SObjectBackupStorage) getBackupKey(backupId string) string {
- return fmt.Sprintf("%s/%s", backupPathPrefix, backupId)
- }
- func (s *SObjectBackupStorage) getBackupInstanceKey(backupInstancePackName string) string {
- return fmt.Sprintf("%s/%s", backupInstancePathPrefix, backupInstancePackName)
- }
- func (s *SObjectBackupStorage) getBucket() (cloudprovider.ICloudBucket, error) {
- bucket, err := s.store.GetIRegion().GetIBucketByName(s.bucket)
- if err != nil {
- return nil, errors.Wrap(err, "IBucketExist")
- }
- return bucket, nil
- }
- func (s *SObjectBackupStorage) getExtBucket() (cloudprovider.ICloudBucket, error) {
- if s.extStore == nil || len(s.extBucket) == 0 {
- return nil, errors.Wrap(errors.ErrInvalidStatus, "extStore is nil or extBucket is empty")
- }
- bucket, err := s.extStore.GetIRegion().GetIBucketByName(s.extBucket)
- if err != nil {
- return nil, errors.Wrap(err, "GetIBucketByName")
- }
- return bucket, nil
- }
- func (s *SObjectBackupStorage) SaveBackupFrom(ctx context.Context, srcFile io.Reader, fileSize int64, backupId string) error {
- return s.saveObject(ctx, srcFile, fileSize, backupId, s.getBackupKey)
- }
- func (s *SObjectBackupStorage) SaveBackupInstanceFrom(ctx context.Context, srcFile io.Reader, fileSize int64, backupId string) error {
- return s.saveObject(ctx, srcFile, fileSize, backupId, s.getBackupInstanceKey)
- }
- func (s *SObjectBackupStorage) saveObject(ctx context.Context, srcFile io.Reader, fileSize int64, id string, getKeyFunc func(string) string) error {
- bucket, err := s.getBucket()
- if err != nil {
- return errors.Wrap(err, "getBucket")
- }
- err = cloudprovider.UploadObject(ctx, bucket, getKeyFunc(id), 200*1024*1024, srcFile, fileSize, cloudprovider.ACLPrivate, "", nil, false)
- if err != nil {
- return errors.Wrapf(err, "UploadObject %d %s", fileSize, getKeyFunc(id))
- }
- return nil
- }
- func (s *SObjectBackupStorage) RestoreBackupTo(ctx context.Context, targetFilename string, backupId string) error {
- return s.restoreObject(ctx, targetFilename, backupId, s.getBackupKey)
- }
- func (s *SObjectBackupStorage) RestoreBackupInstanceTo(ctx context.Context, targetFilename string, backupId string) error {
- return s.restoreObject(ctx, targetFilename, backupId, s.getBackupInstanceKey)
- }
- func (s *SObjectBackupStorage) restoreObject(ctx context.Context, targetFilename string, id string, getKeyFunc func(string) string) error {
- bucket, err := s.getBucket()
- if err != nil {
- return errors.Wrap(err, "getBucket")
- }
- reader, err := bucket.GetObject(ctx, getKeyFunc(id), nil)
- if err != nil {
- return errors.Wrap(err, "GetObject")
- }
- file, err := os.OpenFile(targetFilename, os.O_CREATE|os.O_WRONLY, 0600)
- if err != nil {
- return errors.Wrapf(err, "OpenFile %s", targetFilename)
- }
- defer file.Close()
- _, err = streamutils.StreamPipe(reader, file, false, nil)
- if err != nil {
- return errors.Wrap(err, "StreamPipe")
- }
- return nil
- }
- func (s *SObjectBackupStorage) RemoveBackup(ctx context.Context, backupId string) error {
- return s.removeObject(ctx, backupId, s.getBackupKey)
- }
- func (s *SObjectBackupStorage) RemoveBackupInstance(ctx context.Context, backupId string) error {
- return s.removeObject(ctx, backupId, s.getBackupInstanceKey)
- }
- func (s *SObjectBackupStorage) removeObject(ctx context.Context, id string, getKeyFunc func(string) string) error {
- bucket, err := s.getBucket()
- if err != nil {
- return errors.Wrap(err, "getBucket")
- }
- err = bucket.DeleteObject(ctx, getKeyFunc(id))
- if err != nil {
- return errors.Wrap(err, "DeleteObject")
- }
- return nil
- }
- func (s *SObjectBackupStorage) IsBackupExists(backupId string) (bool, string, error) {
- return s.isObjectExists(backupId, s.getBackupKey)
- }
- func (s *SObjectBackupStorage) IsBackupInstanceExists(backupId string) (bool, string, error) {
- return s.isObjectExists(backupId, s.getBackupInstanceKey)
- }
- func (s *SObjectBackupStorage) isObjectExists(id string, getKeyFunc func(string) string) (bool, string, error) {
- bucket, err := s.getBucket()
- if err != nil {
- return false, "", errors.Wrap(err, "getBucket")
- }
- _, err = cloudprovider.GetIObject(bucket, getKeyFunc(id))
- if err != nil {
- if errors.Cause(err) == errors.ErrNotFound {
- return false, "", nil
- }
- return false, "", errors.Wrap(err, "GetIObject")
- }
- return true, "", nil
- }
- func (s *SObjectBackupStorage) IsOnline() (bool, string, error) {
- exist, err := s.store.GetIRegion().IBucketExist(s.bucket)
- if err != nil {
- return false, "", errors.Wrap(err, "IBucketExist")
- }
- return exist, "", nil
- }
- func (s *SObjectBackupStorage) GetExternalAccessUrl(backupId string) (string, error) {
- var bucket cloudprovider.ICloudBucket
- var err error
- bucket, err = s.getExtBucket()
- if err != nil {
- log.Errorf("SObjectBackupStorage.GetExternalAccessUrl.getExtBucket: %v", err)
- bucket, err = s.getBucket()
- if err != nil {
- return "", errors.Wrap(err, "getBucket")
- }
- }
- url, err := bucket.GetTempUrl(http.MethodGet, s.getBackupKey(backupId), 6*time.Hour)
- if err != nil {
- return "", errors.Wrap(err, "GetTempUrl")
- }
- return url, nil
- }
|