agent.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package agent
  15. import (
  16. "fmt"
  17. "net"
  18. "strings"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/object"
  24. "yunion.io/x/pkg/util/httputils"
  25. "yunion.io/x/pkg/util/version"
  26. api "yunion.io/x/onecloud/pkg/apis/compute"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/agent/iagent"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  29. "yunion.io/x/onecloud/pkg/hostman/storageman"
  30. "yunion.io/x/onecloud/pkg/mcclient"
  31. modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  32. "yunion.io/x/onecloud/pkg/util/netutils2"
  33. )
  34. type SZoneInfo struct {
  35. Name string `json:"name"`
  36. Id string `json:"id"`
  37. }
  38. type SBaseAgent struct {
  39. object.SObject
  40. ListenInterface *net.Interface
  41. ListenIPs []net.IP
  42. AgentId string
  43. AgentName string
  44. Zone *SZoneInfo
  45. CachePath string
  46. CacheManager *storageman.SLocalImageCacheManager
  47. stop bool
  48. }
  49. func (agent *SBaseAgent) IAgent() iagent.IAgent {
  50. return agent.GetVirtualObject().(iagent.IAgent)
  51. }
  52. func (agent *SBaseAgent) Init(iagent iagent.IAgent, ifname string, cachePath string) error {
  53. iface, ips, err := netutils2.WaitIfaceIps(ifname)
  54. if err != nil {
  55. return errors.Wrap(err, "WaitIfaceIps")
  56. }
  57. if len(ips) == 0 {
  58. return fmt.Errorf("Interface %s ip address not found", ifname)
  59. }
  60. log.Debugf("Interface %s ip address: %v", iface.Name, ips)
  61. agent.SetVirtualObject(iagent)
  62. agent.ListenInterface = iface
  63. agent.ListenIPs = ips
  64. agent.CachePath = cachePath
  65. return nil
  66. }
  67. func (agent *SBaseAgent) GetListenIPs() []net.IP {
  68. return agent.ListenIPs
  69. }
  70. func (agent *SBaseAgent) FindListenIP(listenAddr string) (net.IP, error) {
  71. ips := agent.GetListenIPs()
  72. if listenAddr == "" {
  73. for i := range ips {
  74. ipstr := ips[i].String()
  75. if strings.HasPrefix(ipstr, netutils2.SECRET_PREFIX) {
  76. continue
  77. }
  78. return ips[i], nil
  79. }
  80. return nil, fmt.Errorf("Not Address on Interface %#v", agent.ListenInterface)
  81. }
  82. if listenAddr == "0.0.0.0" {
  83. return net.ParseIP(listenAddr), nil
  84. }
  85. for _, ip := range ips {
  86. if ip.String() == listenAddr {
  87. return ip, nil
  88. }
  89. }
  90. return nil, fmt.Errorf("Not found Address %s on Interface %#v", listenAddr, agent.ListenInterface)
  91. }
  92. func (agent *SBaseAgent) FindAccessIP(accessAddr string) (net.IP, error) {
  93. if accessAddr == "0.0.0.0" {
  94. return nil, fmt.Errorf("Access address must be specific, should not be 0.0.0.0")
  95. }
  96. return agent.FindListenIP(accessAddr)
  97. }
  98. func (agent *SBaseAgent) startRegister() error {
  99. // if agent.AgentId != "" {
  100. // return
  101. // }
  102. var delayRetryTime = 30 * time.Second
  103. var lastTry time.Time
  104. for !agent.stop {
  105. if time.Now().Sub(lastTry) >= delayRetryTime {
  106. session := agent.IAgent().GetAdminSession()
  107. err := agent.register(session)
  108. if err == nil {
  109. log.Infof("Register success!")
  110. return nil
  111. }
  112. log.Errorf("Register error: %v, retry after %s...", err, delayRetryTime)
  113. lastTry = time.Now()
  114. }
  115. time.Sleep(time.Second)
  116. }
  117. return fmt.Errorf("Error Stop")
  118. }
  119. func (agent *SBaseAgent) register(session *mcclient.ClientSession) error {
  120. var err error
  121. err = agent.fetchZone(session)
  122. if err != nil {
  123. return err
  124. }
  125. err = agent.createOrUpdateBaremetalAgent(session)
  126. if err != nil {
  127. return err
  128. }
  129. log.Infof("%s %s:%s register success, do offline", agent.IAgent().GetAgentType(), agent.AgentName, agent.AgentId)
  130. err = agent.doOffline(session)
  131. if err != nil {
  132. return err
  133. }
  134. return nil
  135. }
  136. func (agent *SBaseAgent) fetchZone(session *mcclient.ClientSession) error {
  137. zoneName := agent.IAgent().GetZoneId()
  138. var zoneInfoObj jsonutils.JSONObject
  139. var err error
  140. if zoneName != "" {
  141. zoneInfoObj, err = modules.Zones.Get(session, zoneName, nil)
  142. } else {
  143. zoneInfoObj, err = agent.getZoneByIP(session)
  144. }
  145. if err != nil {
  146. return err
  147. }
  148. zone := SZoneInfo{}
  149. err = zoneInfoObj.Unmarshal(&zone)
  150. if err != nil {
  151. return err
  152. }
  153. agent.Zone = &zone
  154. consts.SetZone(zone.Name)
  155. return nil
  156. }
  157. func (agent *SBaseAgent) getZoneByIP(session *mcclient.ClientSession) (jsonutils.JSONObject, error) {
  158. params := jsonutils.NewDict()
  159. listenIP, err := agent.IAgent().GetListenIP()
  160. if err != nil {
  161. return nil, err
  162. }
  163. params.Add(jsonutils.NewString(listenIP.String()), "ip")
  164. params.Add(jsonutils.JSONTrue, "is_classic")
  165. params.Add(jsonutils.NewString("system"), "scope")
  166. networks, err := modules.Networks.List(session, params)
  167. if err != nil {
  168. return nil, err
  169. }
  170. if len(networks.Data) == 0 {
  171. return nil, fmt.Errorf("Not found networks by agent listen ip: %s", listenIP)
  172. }
  173. wireId, err := networks.Data[0].GetString("wire_id")
  174. if err != nil {
  175. return nil, err
  176. }
  177. wire, err := modules.Wires.Get(session, wireId, nil)
  178. if err != nil {
  179. return nil, err
  180. }
  181. zoneId, err := wire.GetString("zone_id")
  182. if err != nil {
  183. return nil, err
  184. }
  185. zone, err := modules.Zones.Get(session, zoneId, nil)
  186. if err != nil {
  187. return nil, err
  188. }
  189. return zone, nil
  190. }
  191. func (agent *SBaseAgent) createOrUpdateBaremetalAgent(session *mcclient.ClientSession) error {
  192. params := jsonutils.NewDict()
  193. naccessIP, err := agent.IAgent().GetAccessIP()
  194. if err != nil {
  195. return err
  196. }
  197. if agent.IAgent().GetAgentType() != string(api.AgentTypeEsxi) {
  198. params.Add(jsonutils.NewString(naccessIP.String()), "access_ip")
  199. }
  200. params.Add(jsonutils.NewString(agent.IAgent().GetZoneId()), "zone_id")
  201. params.Add(jsonutils.NewString(agent.IAgent().GetAgentType()), "agent_type")
  202. ret, err := modules.Baremetalagents.List(session, params)
  203. if err != nil {
  204. return err
  205. }
  206. var (
  207. cloudObj jsonutils.JSONObject
  208. agentId string
  209. agentName string
  210. )
  211. // create or update BaremetalAgent
  212. if len(ret.Data) == 0 {
  213. cloudObj, err = agent.createBaremetalAgent(session)
  214. if err != nil {
  215. return err
  216. }
  217. } else {
  218. cloudBmAgent := ret.Data[0]
  219. agentId, _ := cloudBmAgent.GetString("id")
  220. cloudObj, err = agent.updateBaremetalAgent(session, agentId, "")
  221. if err != nil {
  222. return err
  223. }
  224. }
  225. agentId, err = cloudObj.GetString("id")
  226. if err != nil {
  227. return err
  228. }
  229. agentName, err = cloudObj.GetString("name")
  230. if err != nil {
  231. return err
  232. }
  233. agent.AgentId = agentId
  234. agent.AgentName = agentName
  235. storageCacheId, _ := cloudObj.GetString("storagecache_id")
  236. if len(storageCacheId) > 0 {
  237. err = agent.updateStorageCache(session, storageCacheId)
  238. if err != nil {
  239. if httputils.ErrorCode(errors.Cause(err)) == 404 {
  240. storageCacheId = ""
  241. } else {
  242. return err
  243. }
  244. }
  245. }
  246. if len(storageCacheId) == 0 {
  247. storageCacheId, err = agent.createStorageCache(session)
  248. if err != nil {
  249. return err
  250. }
  251. cloudObj, err = agent.updateBaremetalAgent(session, agentId, storageCacheId)
  252. if err != nil {
  253. return err
  254. }
  255. newStorageCacheId, _ := cloudObj.GetString("storagecache_id")
  256. if newStorageCacheId != storageCacheId {
  257. // cleanup the newly created storagecache
  258. modules.Storagecaches.Delete(session, storageCacheId, nil)
  259. return errors.Error("agent not support storagecache_id, region might not be up-to-date")
  260. }
  261. }
  262. agent.CacheManager = storageman.NewLocalImageCacheManager(agent.IAgent(), agent.CachePath, storageCacheId, nil)
  263. return nil
  264. }
  265. func (agent *SBaseAgent) GetManagerUri() string {
  266. accessIP, _ := agent.IAgent().GetAccessIP()
  267. proto := "http"
  268. if agent.IAgent().GetEnableSsl() {
  269. proto = "https"
  270. }
  271. if accessIP.To4() == nil { // ipv6 addr
  272. return fmt.Sprintf("%s://[%s]:%d", proto, accessIP, agent.IAgent().GetPort())
  273. }
  274. return fmt.Sprintf("%s://%s:%d", proto, accessIP, agent.IAgent().GetPort())
  275. }
  276. func (agent *SBaseAgent) GetListenUri() string {
  277. listenIP, _ := agent.IAgent().GetListenIP()
  278. proto := "http"
  279. if agent.IAgent().GetEnableSsl() {
  280. proto = "https"
  281. }
  282. return fmt.Sprintf("%s://%s:%d", proto, listenIP, agent.IAgent().GetPort())
  283. }
  284. func (agent *SBaseAgent) getCreateUpdateInfo() (jsonutils.JSONObject, error) {
  285. params := jsonutils.NewDict()
  286. if agent.AgentId == "" {
  287. agentName, err := agent.getName()
  288. if err != nil {
  289. return nil, errors.Wrap(err, "agent.getName")
  290. }
  291. params.Add(jsonutils.NewString(agentName), "name")
  292. }
  293. accessIP, err := agent.IAgent().GetAccessIP()
  294. if err != nil {
  295. return nil, errors.Wrap(err, "agent.IAgent().GetAccessIP()")
  296. }
  297. params.Add(jsonutils.NewString(accessIP.String()), "access_ip")
  298. params.Add(jsonutils.NewString(agent.GetManagerUri()), "manager_uri")
  299. params.Add(jsonutils.NewString(agent.Zone.Id), "zone_id")
  300. params.Add(jsonutils.NewString(agent.IAgent().GetAgentType()), "agent_type")
  301. params.Add(jsonutils.NewString(version.GetShortString()), "version")
  302. return params, nil
  303. }
  304. func (agent *SBaseAgent) getName() (string, error) {
  305. accessIP, err := agent.IAgent().GetAccessIP()
  306. if err != nil {
  307. return "", err
  308. }
  309. return fmt.Sprintf("%s-%s", agent.IAgent().GetAgentType(), accessIP), nil
  310. }
  311. func (agent *SBaseAgent) createStorageCache(session *mcclient.ClientSession) (string, error) {
  312. body := jsonutils.NewDict()
  313. agentName, err := agent.getName()
  314. if err != nil {
  315. return "", errors.Wrap(err, "agent.getName")
  316. }
  317. body.Set("name", jsonutils.NewString("imagecache-"+agentName))
  318. body.Set("path", jsonutils.NewString(agent.CachePath))
  319. body.Set("external_id", jsonutils.NewString(agent.AgentId))
  320. sc, err := modules.Storagecaches.Create(session, body)
  321. if err != nil {
  322. return "", errors.Wrap(err, "modules.Storagecaches.Create")
  323. }
  324. storageCacheId, err := sc.GetString("id")
  325. if err != nil {
  326. return "", errors.Wrap(err, "sc.GetString id")
  327. }
  328. return storageCacheId, nil
  329. }
  330. func (agent *SBaseAgent) updateStorageCache(session *mcclient.ClientSession, storageCacheId string) error {
  331. body := jsonutils.NewDict()
  332. body.Set("path", jsonutils.NewString(agent.CachePath))
  333. body.Set("external_id", jsonutils.NewString(agent.AgentId))
  334. _, err := modules.Storagecaches.Update(session, storageCacheId, body)
  335. if err != nil {
  336. return errors.Wrap(err, "modules.Storagecaches.Update")
  337. }
  338. return nil
  339. }
  340. func (agent *SBaseAgent) createBaremetalAgent(session *mcclient.ClientSession) (jsonutils.JSONObject, error) {
  341. params, err := agent.getCreateUpdateInfo()
  342. if err != nil {
  343. return nil, err
  344. }
  345. return modules.Baremetalagents.Create(session, params)
  346. }
  347. func (agent *SBaseAgent) updateBaremetalAgent(session *mcclient.ClientSession, id string, storageCacheId string) (jsonutils.JSONObject, error) {
  348. var params jsonutils.JSONObject
  349. var err error
  350. if len(storageCacheId) > 0 {
  351. params = jsonutils.NewDict()
  352. params.(*jsonutils.JSONDict).Set("storagecache_id", jsonutils.NewString(storageCacheId))
  353. } else {
  354. params, err = agent.getCreateUpdateInfo()
  355. if err != nil {
  356. return nil, err
  357. }
  358. }
  359. return modules.Baremetalagents.Update(session, id, params)
  360. }
  361. func (agent *SBaseAgent) doOffline(session *mcclient.ClientSession) error {
  362. _, err := modules.Baremetalagents.PerformAction(session, agent.AgentId, "offline", nil)
  363. return err
  364. }
  365. func (agent *SBaseAgent) DoOnline(session *mcclient.ClientSession) error {
  366. _, err := modules.Baremetalagents.PerformAction(session, agent.AgentId, "online", nil)
  367. return err
  368. }
  369. func (agent *SBaseAgent) Start() error {
  370. err := agent.startRegister()
  371. if err != nil {
  372. return err
  373. }
  374. agent.IAgent().TuneSystem()
  375. return agent.IAgent().StartService()
  376. }
  377. func (agent *SBaseAgent) Stop() {
  378. agent.stop = true
  379. agent.IAgent().StopService()
  380. }