client.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cephfs
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "net/http"
  20. "net/url"
  21. "strings"
  22. "sync"
  23. api "yunion.io/x/cloudmux/pkg/apis/compute"
  24. "yunion.io/x/cloudmux/pkg/cloudprovider"
  25. "yunion.io/x/cloudmux/pkg/multicloud"
  26. "yunion.io/x/jsonutils"
  27. "yunion.io/x/pkg/errors"
  28. "yunion.io/x/pkg/gotypes"
  29. "yunion.io/x/pkg/util/httputils"
  30. )
  31. const (
  32. CLOUD_PROVIDER_CEPHFS = api.CLOUD_PROVIDER_CEPHFS
  33. )
  34. type CephFSClientConfig struct {
  35. cpcfg cloudprovider.ProviderConfig
  36. host string
  37. port int
  38. username string
  39. password string
  40. fsId string
  41. debug bool
  42. }
  43. func (cfg *CephFSClientConfig) CloudproviderConfig(cpcfg cloudprovider.ProviderConfig) *CephFSClientConfig {
  44. cfg.cpcfg = cpcfg
  45. return cfg
  46. }
  47. func (cfg *CephFSClientConfig) Debug(debug bool) *CephFSClientConfig {
  48. cfg.debug = debug
  49. return cfg
  50. }
  51. func NewCephFSClientConfig(host string, port int, username, password, fsId string) *CephFSClientConfig {
  52. cfg := &CephFSClientConfig{
  53. host: host,
  54. port: port,
  55. username: username,
  56. password: password,
  57. fsId: fsId,
  58. }
  59. return cfg
  60. }
  61. type SCephFSClient struct {
  62. *CephFSClientConfig
  63. cloudprovider.SFakeOnPremiseRegion
  64. multicloud.SNoObjectStorageRegion
  65. multicloud.SRegion
  66. lock sync.Mutex
  67. client *http.Client
  68. token string
  69. }
  70. func NewCephFSClient(cfg *CephFSClientConfig) (*SCephFSClient, error) {
  71. client := &SCephFSClient{
  72. CephFSClientConfig: cfg,
  73. }
  74. return client, client.auth()
  75. }
  76. func (cli *SCephFSClient) GetI18n() cloudprovider.SModelI18nTable {
  77. table := cloudprovider.SModelI18nTable{}
  78. table["name"] = cloudprovider.NewSModelI18nEntry(cli.GetName()).CN(cli.GetName())
  79. return table
  80. }
  81. func (cli *SCephFSClient) getDefaultClient() *http.Client {
  82. cli.lock.Lock()
  83. defer cli.lock.Unlock()
  84. if !gotypes.IsNil(cli.client) {
  85. return cli.client
  86. }
  87. cli.client = httputils.GetAdaptiveTimeoutClient()
  88. httputils.SetClientProxyFunc(cli.client, cli.cpcfg.ProxyFunc)
  89. ts, _ := cli.client.Transport.(*http.Transport)
  90. ts.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
  91. cli.client.Transport = cloudprovider.GetCheckTransport(ts, func(req *http.Request) (func(resp *http.Response) error, error) {
  92. if cli.cpcfg.ReadOnly {
  93. if req.Method == "GET" || strings.HasSuffix(req.URL.Path, "/auth") {
  94. return nil, nil
  95. }
  96. return nil, errors.Wrapf(cloudprovider.ErrAccountReadOnly, "%s %s", req.Method, req.URL.Path)
  97. }
  98. return nil, nil
  99. })
  100. return cli.client
  101. }
  102. func (cli *SCephFSClient) baseUrl() string {
  103. protocol := "http"
  104. if strings.Contains(fmt.Sprintf("%d", cli.port), "443") {
  105. protocol = "https"
  106. }
  107. return fmt.Sprintf("%s://%s:%d/api", protocol, cli.host, cli.port)
  108. }
  109. func (cli *SCephFSClient) auth() error {
  110. client := cli.getDefaultClient()
  111. url := fmt.Sprintf("%s/auth", cli.baseUrl())
  112. header := http.Header{}
  113. header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
  114. header.Set("Accept-Encoding", "gzip")
  115. body := jsonutils.Marshal(map[string]interface{}{
  116. "username": cli.username,
  117. "password": cli.password,
  118. })
  119. _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.POST, url, header, body, cli.debug)
  120. if err != nil {
  121. return errors.Wrapf(err, "auth")
  122. }
  123. if gotypes.IsNil(resp) {
  124. return fmt.Errorf("empty response")
  125. }
  126. cli.token, err = resp.GetString("token")
  127. return err
  128. }
  129. func (cli *SCephFSClient) GetCapabilities() []string {
  130. return []string{
  131. cloudprovider.CLOUD_CAPABILITY_NAS,
  132. }
  133. }
  134. func (cli *SCephFSClient) list(res string, params url.Values) (jsonutils.JSONObject, error) {
  135. client := cli.getDefaultClient()
  136. url := fmt.Sprintf("%s/%s", cli.baseUrl(), res)
  137. if len(params) > 0 {
  138. url = fmt.Sprintf("%s/%s?%s", cli.baseUrl(), res, params.Encode())
  139. }
  140. header := http.Header{}
  141. header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
  142. header.Set("Accept-Encoding", "gzip")
  143. header.Set("Authorization", fmt.Sprintf("Bearer %s", cli.token))
  144. _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.GET, url, header, nil, cli.debug)
  145. return resp, err
  146. }
  147. func (cli *SCephFSClient) post(res string, params map[string]interface{}) (jsonutils.JSONObject, error) {
  148. client := cli.getDefaultClient()
  149. url := fmt.Sprintf("%s/%s", cli.baseUrl(), res)
  150. header := http.Header{}
  151. header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
  152. header.Set("Accept-Encoding", "gzip")
  153. header.Set("Authorization", fmt.Sprintf("Bearer %s", cli.token))
  154. _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.POST, url, header, jsonutils.Marshal(params), cli.debug)
  155. return resp, err
  156. }
  157. func (cli *SCephFSClient) delete(res string, params map[string]interface{}) (jsonutils.JSONObject, error) {
  158. client := cli.getDefaultClient()
  159. url := fmt.Sprintf("%s/%s", cli.baseUrl(), res)
  160. header := http.Header{}
  161. header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
  162. header.Set("Accept-Encoding", "gzip")
  163. header.Set("Authorization", fmt.Sprintf("Bearer %s", cli.token))
  164. _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.DELETE, url, header, jsonutils.Marshal(params), cli.debug)
  165. return resp, err
  166. }
  167. func (cli *SCephFSClient) put(res string, params map[string]interface{}) (jsonutils.JSONObject, error) {
  168. client := cli.getDefaultClient()
  169. url := fmt.Sprintf("%s/%s", cli.baseUrl(), res)
  170. header := http.Header{}
  171. header.Set("Accept", "application/vnd.ceph.api.v1.0+json")
  172. header.Set("Accept-Encoding", "gzip")
  173. header.Set("Authorization", fmt.Sprintf("Bearer %s", cli.token))
  174. _, resp, err := httputils.JSONRequest(client, context.Background(), httputils.PUT, url, header, jsonutils.Marshal(params), cli.debug)
  175. return resp, err
  176. }
  177. func (cli *SCephFSClient) GetProvider() string {
  178. return api.CLOUD_PROVIDER_CEPHFS
  179. }
  180. func (cli *SCephFSClient) GetSubAccounts() ([]cloudprovider.SSubAccount, error) {
  181. fss, err := cli.GetCephFSs()
  182. if err != nil {
  183. return nil, err
  184. }
  185. ret := []cloudprovider.SSubAccount{}
  186. for _, fs := range fss {
  187. subAccount := cloudprovider.SSubAccount{
  188. Id: cli.cpcfg.Id,
  189. Account: fmt.Sprintf("%s/%s", cli.username, fs.Id),
  190. Name: fs.Mdsmap.FsName,
  191. HealthStatus: api.CLOUD_PROVIDER_HEALTH_NORMAL,
  192. }
  193. ret = append(ret, subAccount)
  194. }
  195. return ret, nil
  196. }
  197. func (cli *SCephFSClient) GetAccountId() string {
  198. return fmt.Sprintf("%s@%s:%d", cli.username, cli.host, cli.port)
  199. }