service_url.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package utils
  15. import (
  16. "context"
  17. "fmt"
  18. "net/url"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/util/httputils"
  24. "yunion.io/x/pkg/util/sets"
  25. "yunion.io/x/onecloud/pkg/apis"
  26. ansible_api "yunion.io/x/onecloud/pkg/apis/ansible"
  27. proxy_api "yunion.io/x/onecloud/pkg/apis/cloudproxy"
  28. comapi "yunion.io/x/onecloud/pkg/apis/compute"
  29. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  30. "yunion.io/x/onecloud/pkg/httperrors"
  31. "yunion.io/x/onecloud/pkg/mcclient/auth"
  32. ansible_modules "yunion.io/x/onecloud/pkg/mcclient/modules/ansible"
  33. "yunion.io/x/onecloud/pkg/mcclient/modules/cloudproxy"
  34. "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  35. "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
  36. "yunion.io/x/onecloud/pkg/util/ansible"
  37. )
  38. type Service struct {
  39. Name string
  40. Url string
  41. }
  42. func serviceComplete(serviceName, address string, port int) (url, checkUrl string, expectedCode int) {
  43. switch serviceName {
  44. case apis.SERVICE_TYPE_INFLUXDB, apis.SERVICE_TYPE_VICTORIA_METRICS:
  45. return fmt.Sprintf("https://%s:%d", address, port), fmt.Sprintf("https://%s:%d/ping", address, port), 204
  46. case "repo":
  47. return fmt.Sprintf("http://%s:%d", address, port), fmt.Sprintf("http://%s:%d", address, port), 200
  48. default:
  49. return fmt.Sprintf("http://%s:%d", address, port), fmt.Sprintf("http://%s:%d", address, port), 200
  50. }
  51. }
  52. func serviceComplete2(service Service) (completeUrl string, expectedCode int) {
  53. switch service.Name {
  54. case apis.SERVICE_TYPE_INFLUXDB, apis.SERVICE_TYPE_VICTORIA_METRICS:
  55. return fmt.Sprintf("%s/ping", service.Url), 204
  56. case "repo":
  57. return service.Url, 200
  58. default:
  59. return service.Url, 200
  60. }
  61. }
  62. func proxyEndpoints(ctx context.Context, proxyEndpointId string, info sServerInfo) ([]sProxyEndpoint, error) {
  63. pes := make([]sProxyEndpoint, 0)
  64. session := auth.GetAdminSession(ctx, "")
  65. if len(proxyEndpointId) > 0 {
  66. ep, err := cloudproxy.ProxyEndpoints.Get(session, proxyEndpointId, nil)
  67. if err != nil {
  68. return nil, errors.Wrapf(err, "unable to get proxy endpoint %s", proxyEndpointId)
  69. }
  70. address, _ := ep.GetString("intranet_ip_addr")
  71. pes = append(pes, sProxyEndpoint{proxyEndpointId, address})
  72. return pes, nil
  73. }
  74. proxyEndpointIds := sets.NewString()
  75. for _, netId := range info.NetworkIds {
  76. filter := jsonutils.NewDict()
  77. filter.Set("network_id", jsonutils.NewString(netId))
  78. filter.Set("scope", jsonutils.NewString("system"))
  79. lr, err := cloudproxy.ProxyEndpoints.List(session, filter)
  80. if err != nil {
  81. return nil, errors.Wrapf(err, "unable to list proxy endpoint in network %q", netId)
  82. }
  83. for i := range lr.Data {
  84. proxyEndpointId, _ := lr.Data[i].GetString("id")
  85. address, _ := lr.Data[i].GetString("intranet_ip_addr")
  86. if proxyEndpointIds.Has(proxyEndpointId) {
  87. continue
  88. }
  89. pes = append(pes, sProxyEndpoint{proxyEndpointId, address})
  90. proxyEndpointIds.Insert(proxyEndpointId)
  91. }
  92. }
  93. filter := jsonutils.NewDict()
  94. filter.Set("vpc_id", jsonutils.NewString(info.VpcId))
  95. filter.Set("scope", jsonutils.NewString("system"))
  96. lr, err := cloudproxy.ProxyEndpoints.List(session, filter)
  97. if err != nil {
  98. return nil, errors.Wrapf(err, "unable to list proxy endpoint in vpc %q", info.VpcId)
  99. }
  100. for i := range lr.Data {
  101. proxyEndpointId, _ := lr.Data[i].GetString("id")
  102. address, _ := lr.Data[i].GetString("intranet_ip_addr")
  103. if proxyEndpointIds.Has(proxyEndpointId) {
  104. continue
  105. }
  106. pes = append(pes, sProxyEndpoint{proxyEndpointId, address})
  107. proxyEndpointIds.Insert(proxyEndpointId)
  108. }
  109. return pes, nil
  110. }
  111. func serviceUrlDirect(ctx context.Context, service Service, proxyEndpointId string, info sServerInfo, host *ansible_api.AnsibleHost) (string, error) {
  112. url, code := serviceComplete2(service)
  113. ok, err := checkUrl(ctx, url, code, host)
  114. if err != nil {
  115. return "", err
  116. }
  117. if ok {
  118. return service.Url, nil
  119. }
  120. return "", nil
  121. }
  122. func GetServerInfo(ctx context.Context, serverId string) (sServerInfo, error) {
  123. // check server
  124. session := auth.GetAdminSession(ctx, "")
  125. data, err := compute.Servers.Get(session, serverId, nil)
  126. if err != nil {
  127. if httputils.ErrorCode(err) == 404 {
  128. return sServerInfo{}, httperrors.NewInputParameterError("no such server %s", serverId)
  129. }
  130. return sServerInfo{}, fmt.Errorf("unable to get server %s: %s", serverId, httputils.ErrorMsg(err))
  131. }
  132. info := sServerInfo{}
  133. var serverDetails comapi.ServerDetails
  134. err = data.Unmarshal(&serverDetails)
  135. if err != nil {
  136. return info, errors.Wrap(err, "unable to unmarshal serverDetails")
  137. }
  138. if serverDetails.Status != comapi.VM_RUNNING {
  139. return info, httperrors.NewInputParameterError("can only apply scripts to %s server", comapi.VM_RUNNING)
  140. }
  141. info.serverDetails = &serverDetails
  142. info.ServerId = serverDetails.Id
  143. networkIds := sets.NewString()
  144. for _, nic := range serverDetails.Nics {
  145. networkIds.Insert(nic.NetworkId)
  146. info.VpcId = nic.VpcId
  147. }
  148. info.NetworkIds = networkIds.UnsortedList()
  149. return info, nil
  150. }
  151. func serviceUrlViaProxyEndpoint(ctx context.Context, service Service, proxyEndpointId string, info sServerInfo, host *ansible_api.AnsibleHost) (string, error) {
  152. if len(proxyEndpointId) > 0 {
  153. pes, err := proxyEndpoints(ctx, proxyEndpointId, info)
  154. if err != nil {
  155. return "", err
  156. }
  157. url, err := checkProxyEndpoint(ctx, service, proxyEndpointId, pes[0].Address, host)
  158. if err != nil {
  159. return "", err
  160. }
  161. if len(url) > 0 {
  162. return url, nil
  163. }
  164. }
  165. pes, err := proxyEndpoints(ctx, "", info)
  166. if err != nil {
  167. return "", err
  168. }
  169. for _, pe := range pes {
  170. url, err := checkProxyEndpoint(ctx, service, pe.Id, pe.Address, host)
  171. if err != nil {
  172. return "", err
  173. }
  174. if len(url) > 0 {
  175. return url, nil
  176. }
  177. }
  178. return "", nil
  179. }
  180. func FindValidServiceUrl(ctx context.Context, service Service, proxyEndpointId string, info sServerInfo, host *ansible_api.AnsibleHost) (string, error) {
  181. findFuncs := []func(ctx context.Context, service Service, proxyEndpointId string, info sServerInfo, host *ansible_api.AnsibleHost) (string, error){}
  182. if info.serverDetails.Hypervisor == comapi.HYPERVISOR_KVM || info.serverDetails.Hypervisor == comapi.HYPERVISOR_BAREMETAL {
  183. findFuncs = append(findFuncs, serviceUrlDirect, serviceUrlViaProxyEndpoint)
  184. } else {
  185. findFuncs = append(findFuncs, serviceUrlViaProxyEndpoint, serviceUrlDirect)
  186. }
  187. for _, find := range findFuncs {
  188. url, err := find(ctx, service, proxyEndpointId, info, host)
  189. if err != nil {
  190. return "", err
  191. }
  192. if len(url) > 0 {
  193. return url, nil
  194. }
  195. }
  196. return "", nil
  197. }
  198. func checkProxyEndpoint(ctx context.Context, service Service, proxyEndpointId, address string, host *ansible_api.AnsibleHost) (string, error) {
  199. port, recycle, err := convertServiceUrl(ctx, service, proxyEndpointId)
  200. if err != nil {
  201. return "", err
  202. }
  203. url, cUrl, code := serviceComplete(service.Name, address, int(port))
  204. ok, err := checkUrl(ctx, cUrl, code, host)
  205. if err != nil {
  206. return "", errors.Wrapf(err, "check url %q", cUrl)
  207. }
  208. if !ok {
  209. if recycle != nil {
  210. err := recycle()
  211. if err != nil {
  212. return "", errors.Wrapf(err, "unble to recycle remote forward of proxyEndpoint %s", proxyEndpointId)
  213. }
  214. }
  215. return "", nil
  216. }
  217. return url, nil
  218. }
  219. type sServerInfo struct {
  220. ServerId string
  221. VpcId string
  222. NetworkIds []string
  223. serverDetails *comapi.ServerDetails
  224. }
  225. var serviceUrls map[string]string = map[string]string{}
  226. func GetServiceUrl(ctx context.Context, serviceName string) (string, error) {
  227. if url, ok := serviceUrls[serviceName]; ok {
  228. return url, nil
  229. }
  230. session := auth.GetAdminSession(ctx, "")
  231. params := jsonutils.NewDict()
  232. params.Set("interface", jsonutils.NewString("public"))
  233. params.Set("service", jsonutils.NewString(serviceName))
  234. ret, err := identity.EndpointsV3.List(session, params)
  235. if err != nil {
  236. return "", err
  237. }
  238. log.Infof("params to list endpoint: %v", params)
  239. log.Infof("ret to list endpoint: %s", jsonutils.Marshal(ret))
  240. if len(ret.Data) == 0 {
  241. return "", fmt.Errorf("no sucn endpoint with 'internal' interface and 'influxdb' service")
  242. }
  243. url, _ := ret.Data[0].GetString("url")
  244. serviceUrls[serviceName] = url
  245. return url, nil
  246. }
  247. func convertServiceUrl(ctx context.Context, service Service, endpointId string) (port int64, recycle func() error, err error) {
  248. session := auth.AdminSessionWithInternal(ctx, "", "")
  249. filter := jsonutils.NewDict()
  250. filter.Set("proxy_endpoint_id", jsonutils.NewString(endpointId))
  251. filter.Set("opaque", jsonutils.NewString(service.Url))
  252. filter.Set("scope", jsonutils.NewString("system"))
  253. lr, err := cloudproxy.Forwards.List(session, filter)
  254. if err != nil {
  255. return 0, nil, errors.Wrap(err, "failed to list forward")
  256. }
  257. var forwardId string
  258. var lastSeen string
  259. if len(lr.Data) > 0 {
  260. port, _ = lr.Data[0].Int("bind_port")
  261. forwardId, _ = lr.Data[0].GetString("id")
  262. lastSeen, _ = lr.Data[0].GetString("last_seen")
  263. } else {
  264. var rUrl *url.URL
  265. rUrl, err = url.Parse(service.Url)
  266. if err != nil {
  267. err = errors.Wrap(err, "invalid serviceUrl?")
  268. return
  269. }
  270. // create one
  271. createP := jsonutils.NewDict()
  272. createP.Set("proxy_endpoint", jsonutils.NewString(endpointId))
  273. createP.Set("type", jsonutils.NewString(proxy_api.FORWARD_TYPE_REMOTE))
  274. createP.Set("remote_addr", jsonutils.NewString(rUrl.Hostname()))
  275. createP.Set("remote_port", jsonutils.NewString(rUrl.Port()))
  276. createP.Set("generate_name", jsonutils.NewString(service.Name+" proxy"))
  277. createP.Set("opaque", jsonutils.NewString(service.Url))
  278. var forward jsonutils.JSONObject
  279. forward, err = cloudproxy.Forwards.Create(session, createP)
  280. if err != nil {
  281. err = errors.Wrapf(err, "unable to create forward with create params %s", createP.String())
  282. return
  283. }
  284. forwardId, _ = forward.GetString("id")
  285. lastSeen, _ = forward.GetString("last_seen")
  286. recycle = func() error {
  287. _, err := cloudproxy.Forwards.Delete(session, forwardId, nil)
  288. return err
  289. }
  290. port, _ = forward.Int("bind_port")
  291. }
  292. // wait forward last seen not empty
  293. times, waitTime := 0, time.Second
  294. var data jsonutils.JSONObject
  295. for lastSeen == "" && times < 10 {
  296. time.Sleep(waitTime)
  297. times += 1
  298. waitTime += time.Second * time.Duration(times)
  299. data, err = cloudproxy.Forwards.GetSpecific(session, forwardId, "lastseen", nil)
  300. if err != nil {
  301. err = errors.Wrapf(err, "unable to check last_seen for forward %s", forwardId)
  302. return
  303. }
  304. log.Infof("data of last seen: %s", data)
  305. lastSeen, _ = data.GetString("last_seen")
  306. }
  307. if lastSeen == "" {
  308. err = errors.Wrapf(err, "last_seen of forward %s always is empty, something wrong", forwardId)
  309. }
  310. return
  311. }
  312. func checkUrl(ctx context.Context, completeUrl string, expectedCode int, host *ansible_api.AnsibleHost) (bool, error) {
  313. session := auth.GetAdminSession(ctx, "")
  314. ahost := ansible.Host{}
  315. ahost.Name = host.IP
  316. ahost.Vars = map[string]string{
  317. "ansible_port": fmt.Sprintf("%d", host.Port),
  318. "ansible_user": host.User,
  319. }
  320. if host.OsType == "Windows" {
  321. ahost.Vars["ansible_password"] = host.Password
  322. ahost.Vars["ansible_connection"] = "winrm"
  323. ahost.Vars["ansible_winrm_server_cert_validation"] = "ignore"
  324. ahost.Vars["ansible_winrm_transport"] = "ntlm"
  325. }
  326. modulename := "uri"
  327. if host.OsType == "Windows" {
  328. modulename = "ansible.windows.win_uri"
  329. }
  330. mod := ansible.Module{
  331. Name: modulename,
  332. Args: []string{
  333. fmt.Sprintf("url=%s", completeUrl),
  334. "method=GET",
  335. fmt.Sprintf("status_code=%d", expectedCode),
  336. "validate_certs=no",
  337. },
  338. }
  339. playbook := ansible.NewPlaybook()
  340. playbook.Inventory = ansible.Inventory{
  341. Hosts: []ansible.Host{
  342. ahost,
  343. },
  344. }
  345. playbook.Modules = []ansible.Module{
  346. mod,
  347. }
  348. apCreateInput := ansible_api.AnsiblePlaybookCreateInput{
  349. Name: db.DefaultUUIDGenerator(),
  350. Playbook: *playbook,
  351. }
  352. apb, err := ansible_modules.AnsiblePlaybooks.Create(session, apCreateInput.JSON(apCreateInput))
  353. if err != nil {
  354. return false, errors.Wrap(err, "create ansible playbook")
  355. }
  356. id, _ := apb.GetString("id")
  357. defer func() {
  358. _, err := ansible_modules.AnsiblePlaybooks.Delete(session, id, nil)
  359. if err != nil {
  360. log.Errorf("unable to delete ansibleplaybook %s: %v", id, err)
  361. }
  362. }()
  363. times, waitTimes := 0, time.Second
  364. for times < 10 {
  365. time.Sleep(waitTimes)
  366. times++
  367. waitTimes += time.Second * time.Duration(times)
  368. apd, err := ansible_modules.AnsiblePlaybooks.GetSpecific(session, id, "status", nil)
  369. if err != nil {
  370. return false, errors.Wrapf(err, "unable to get ansibleplaybook %s status", id)
  371. }
  372. status, _ := apd.GetString("status")
  373. switch status {
  374. case ansible_api.AnsiblePlaybookStatusInit, ansible_api.AnsiblePlaybookStatusRunning:
  375. continue
  376. case ansible_api.AnsiblePlaybookStatusFailed, ansible_api.AnsiblePlaybookStatusCanceled, ansible_api.AnsiblePlaybookStatusUnknown:
  377. return false, nil
  378. case ansible_api.AnsiblePlaybookStatusSucceeded:
  379. return true, nil
  380. }
  381. }
  382. return false, nil
  383. }