| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package cephfs
- import (
- "context"
- "crypto/tls"
- "fmt"
- "net/http"
- "net/url"
- "strings"
- "sync"
- api "yunion.io/x/cloudmux/pkg/apis/compute"
- "yunion.io/x/cloudmux/pkg/cloudprovider"
- "yunion.io/x/cloudmux/pkg/multicloud"
- "yunion.io/x/jsonutils"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/gotypes"
- "yunion.io/x/pkg/util/httputils"
- )
- const (
- CLOUD_PROVIDER_CEPHFS = api.CLOUD_PROVIDER_CEPHFS
- )
- type CephFSClientConfig struct {
- cpcfg cloudprovider.ProviderConfig
- host string
- port int
- username string
- password string
- fsId string
- debug bool
- }
- func (cfg *CephFSClientConfig) CloudproviderConfig(cpcfg cloudprovider.ProviderConfig) *CephFSClientConfig {
- cfg.cpcfg = cpcfg
- return cfg
- }
- func (cfg *CephFSClientConfig) Debug(debug bool) *CephFSClientConfig {
- cfg.debug = debug
- return cfg
- }
- func NewCephFSClientConfig(host string, port int, username, password, fsId string) *CephFSClientConfig {
- cfg := &CephFSClientConfig{
- host: host,
- port: port,
- username: username,
- password: password,
- fsId: fsId,
- }
- return cfg
- }
- type SCephFSClient struct {
- *CephFSClientConfig
- cloudprovider.SFakeOnPremiseRegion
- multicloud.SNoObjectStorageRegion
- multicloud.SRegion
- lock sync.Mutex
- client *http.Client
- token string
- }
- func NewCephFSClient(cfg *CephFSClientConfig) (*SCephFSClient, error) {
- client := &SCephFSClient{
- CephFSClientConfig: cfg,
- }
- return client, client.auth()
- }
- func (cli *SCephFSClient) GetI18n() cloudprovider.SModelI18nTable {
- table := cloudprovider.SModelI18nTable{}
- table["name"] = cloudprovider.NewSModelI18nEntry(cli.GetName()).CN(cli.GetName())
- return table
- }
- func (cli *SCephFSClient) getDefaultClient() *http.Client {
- cli.lock.Lock()
- defer cli.lock.Unlock()
- if !gotypes.IsNil(cli.client) {
- return cli.client
- }
- cli.client = httputils.GetAdaptiveTimeoutClient()
- httputils.SetClientProxyFunc(cli.client, cli.cpcfg.ProxyFunc)
- ts, _ := cli.client.Transport.(*http.Transport)
- ts.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
- cli.client.Transport = cloudprovider.GetCheckTransport(ts, func(req *http.Request) (func(resp *http.Response) error, error) {
- if cli.cpcfg.ReadOnly {
- if req.Method == "GET" || strings.HasSuffix(req.URL.Path, "/auth") {
- return nil, nil
- }
- return nil, errors.Wrapf(cloudprovider.ErrAccountReadOnly, "%s %s", req.Method, req.URL.Path)
- }
- return nil, nil
- })
- return cli.client
- }
- func (cli *SCephFSClient) baseUrl() string {
- protocol := "http"
- if strings.Contains(fmt.Sprintf("%d", cli.port), "443") {
- protocol = "https"
- }
- return fmt.Sprintf("%s://%s:%d/api", protocol, cli.host, cli.port)
- }
- func (cli *SCephFSClient) auth() error {
- client := cli.getDefaultClient()
- url := fmt.Sprintf("%s/auth", cli.baseUrl())
- header := http.Header{}
- header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
- header.Set("Accept-Encoding", "gzip")
- body := jsonutils.Marshal(map[string]interface{}{
- "username": cli.username,
- "password": cli.password,
- })
- _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.POST, url, header, body, cli.debug)
- if err != nil {
- return errors.Wrapf(err, "auth")
- }
- if gotypes.IsNil(resp) {
- return fmt.Errorf("empty response")
- }
- cli.token, err = resp.GetString("token")
- return err
- }
- func (cli *SCephFSClient) GetCapabilities() []string {
- return []string{
- cloudprovider.CLOUD_CAPABILITY_NAS,
- }
- }
- func (cli *SCephFSClient) list(res string, params url.Values) (jsonutils.JSONObject, error) {
- client := cli.getDefaultClient()
- url := fmt.Sprintf("%s/%s", cli.baseUrl(), res)
- if len(params) > 0 {
- url = fmt.Sprintf("%s/%s?%s", cli.baseUrl(), res, params.Encode())
- }
- header := http.Header{}
- header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
- header.Set("Accept-Encoding", "gzip")
- header.Set("Authorization", fmt.Sprintf("Bearer %s", cli.token))
- _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.GET, url, header, nil, cli.debug)
- return resp, err
- }
- func (cli *SCephFSClient) post(res string, params map[string]interface{}) (jsonutils.JSONObject, error) {
- client := cli.getDefaultClient()
- url := fmt.Sprintf("%s/%s", cli.baseUrl(), res)
- header := http.Header{}
- header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
- header.Set("Accept-Encoding", "gzip")
- header.Set("Authorization", fmt.Sprintf("Bearer %s", cli.token))
- _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.POST, url, header, jsonutils.Marshal(params), cli.debug)
- return resp, err
- }
- func (cli *SCephFSClient) delete(res string, params map[string]interface{}) (jsonutils.JSONObject, error) {
- client := cli.getDefaultClient()
- url := fmt.Sprintf("%s/%s", cli.baseUrl(), res)
- header := http.Header{}
- header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
- header.Set("Accept-Encoding", "gzip")
- header.Set("Authorization", fmt.Sprintf("Bearer %s", cli.token))
- _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.DELETE, url, header, jsonutils.Marshal(params), cli.debug)
- return resp, err
- }
- func (cli *SCephFSClient) put(res string, params map[string]interface{}) (jsonutils.JSONObject, error) {
- client := cli.getDefaultClient()
- url := fmt.Sprintf("%s/%s", cli.baseUrl(), res)
- header := http.Header{}
- header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
- header.Set("Accept-Encoding", "gzip")
- header.Set("Authorization", fmt.Sprintf("Bearer %s", cli.token))
- _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.PUT, url, header, jsonutils.Marshal(params), cli.debug)
- return resp, err
- }
- func (cli *SCephFSClient) GetProvider() string {
- return api.CLOUD_PROVIDER_CEPHFS
- }
- func (cli *SCephFSClient) GetSubAccounts() ([]cloudprovider.SSubAccount, error) {
- fss, err := cli.GetCephFSs()
- if err != nil {
- return nil, err
- }
- ret := []cloudprovider.SSubAccount{}
- for _, fs := range fss {
- subAccount := cloudprovider.SSubAccount{
- Id: cli.cpcfg.Id,
- Account: fmt.Sprintf("%s/%s", cli.username, fs.Id),
- Name: fs.Mdsmap.FsName,
- HealthStatus: api.CLOUD_PROVIDER_HEALTH_NORMAL,
- }
- ret = append(ret, subAccount)
- }
- return ret, nil
- }
- func (cli *SCephFSClient) GetAccountId() string {
- return fmt.Sprintf("%s@%s:%d", cli.username, cli.host, cli.port)
- }
|