| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package agent
- import (
- "fmt"
- "net"
- "strings"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/object"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/pkg/util/version"
- api "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/cloudcommon/agent/iagent"
- "yunion.io/x/onecloud/pkg/cloudcommon/consts"
- "yunion.io/x/onecloud/pkg/hostman/storageman"
- "yunion.io/x/onecloud/pkg/mcclient"
- modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- "yunion.io/x/onecloud/pkg/util/netutils2"
- )
- type SZoneInfo struct {
- Name string `json:"name"`
- Id string `json:"id"`
- }
- type SBaseAgent struct {
- object.SObject
- ListenInterface *net.Interface
- ListenIPs []net.IP
- AgentId string
- AgentName string
- Zone *SZoneInfo
- CachePath string
- CacheManager *storageman.SLocalImageCacheManager
- stop bool
- }
- func (agent *SBaseAgent) IAgent() iagent.IAgent {
- return agent.GetVirtualObject().(iagent.IAgent)
- }
- func (agent *SBaseAgent) Init(iagent iagent.IAgent, ifname string, cachePath string) error {
- iface, ips, err := netutils2.WaitIfaceIps(ifname)
- if err != nil {
- return errors.Wrap(err, "WaitIfaceIps")
- }
- if len(ips) == 0 {
- return fmt.Errorf("Interface %s ip address not found", ifname)
- }
- log.Debugf("Interface %s ip address: %v", iface.Name, ips)
- agent.SetVirtualObject(iagent)
- agent.ListenInterface = iface
- agent.ListenIPs = ips
- agent.CachePath = cachePath
- return nil
- }
- func (agent *SBaseAgent) GetListenIPs() []net.IP {
- return agent.ListenIPs
- }
- func (agent *SBaseAgent) FindListenIP(listenAddr string) (net.IP, error) {
- ips := agent.GetListenIPs()
- if listenAddr == "" {
- for i := range ips {
- ipstr := ips[i].String()
- if strings.HasPrefix(ipstr, netutils2.SECRET_PREFIX) {
- continue
- }
- return ips[i], nil
- }
- return nil, fmt.Errorf("Not Address on Interface %#v", agent.ListenInterface)
- }
- if listenAddr == "0.0.0.0" {
- return net.ParseIP(listenAddr), nil
- }
- for _, ip := range ips {
- if ip.String() == listenAddr {
- return ip, nil
- }
- }
- return nil, fmt.Errorf("Not found Address %s on Interface %#v", listenAddr, agent.ListenInterface)
- }
- func (agent *SBaseAgent) FindAccessIP(accessAddr string) (net.IP, error) {
- if accessAddr == "0.0.0.0" {
- return nil, fmt.Errorf("Access address must be specific, should not be 0.0.0.0")
- }
- return agent.FindListenIP(accessAddr)
- }
- func (agent *SBaseAgent) startRegister() error {
- // if agent.AgentId != "" {
- // return
- // }
- var delayRetryTime = 30 * time.Second
- var lastTry time.Time
- for !agent.stop {
- if time.Now().Sub(lastTry) >= delayRetryTime {
- session := agent.IAgent().GetAdminSession()
- err := agent.register(session)
- if err == nil {
- log.Infof("Register success!")
- return nil
- }
- log.Errorf("Register error: %v, retry after %s...", err, delayRetryTime)
- lastTry = time.Now()
- }
- time.Sleep(time.Second)
- }
- return fmt.Errorf("Error Stop")
- }
- func (agent *SBaseAgent) register(session *mcclient.ClientSession) error {
- var err error
- err = agent.fetchZone(session)
- if err != nil {
- return err
- }
- err = agent.createOrUpdateBaremetalAgent(session)
- if err != nil {
- return err
- }
- log.Infof("%s %s:%s register success, do offline", agent.IAgent().GetAgentType(), agent.AgentName, agent.AgentId)
- err = agent.doOffline(session)
- if err != nil {
- return err
- }
- return nil
- }
- func (agent *SBaseAgent) fetchZone(session *mcclient.ClientSession) error {
- zoneName := agent.IAgent().GetZoneId()
- var zoneInfoObj jsonutils.JSONObject
- var err error
- if zoneName != "" {
- zoneInfoObj, err = modules.Zones.Get(session, zoneName, nil)
- } else {
- zoneInfoObj, err = agent.getZoneByIP(session)
- }
- if err != nil {
- return err
- }
- zone := SZoneInfo{}
- err = zoneInfoObj.Unmarshal(&zone)
- if err != nil {
- return err
- }
- agent.Zone = &zone
- consts.SetZone(zone.Name)
- return nil
- }
- func (agent *SBaseAgent) getZoneByIP(session *mcclient.ClientSession) (jsonutils.JSONObject, error) {
- params := jsonutils.NewDict()
- listenIP, err := agent.IAgent().GetListenIP()
- if err != nil {
- return nil, err
- }
- params.Add(jsonutils.NewString(listenIP.String()), "ip")
- params.Add(jsonutils.JSONTrue, "is_classic")
- params.Add(jsonutils.NewString("system"), "scope")
- networks, err := modules.Networks.List(session, params)
- if err != nil {
- return nil, err
- }
- if len(networks.Data) == 0 {
- return nil, fmt.Errorf("Not found networks by agent listen ip: %s", listenIP)
- }
- wireId, err := networks.Data[0].GetString("wire_id")
- if err != nil {
- return nil, err
- }
- wire, err := modules.Wires.Get(session, wireId, nil)
- if err != nil {
- return nil, err
- }
- zoneId, err := wire.GetString("zone_id")
- if err != nil {
- return nil, err
- }
- zone, err := modules.Zones.Get(session, zoneId, nil)
- if err != nil {
- return nil, err
- }
- return zone, nil
- }
- func (agent *SBaseAgent) createOrUpdateBaremetalAgent(session *mcclient.ClientSession) error {
- params := jsonutils.NewDict()
- naccessIP, err := agent.IAgent().GetAccessIP()
- if err != nil {
- return err
- }
- if agent.IAgent().GetAgentType() != string(api.AgentTypeEsxi) {
- params.Add(jsonutils.NewString(naccessIP.String()), "access_ip")
- }
- params.Add(jsonutils.NewString(agent.IAgent().GetZoneId()), "zone_id")
- params.Add(jsonutils.NewString(agent.IAgent().GetAgentType()), "agent_type")
- ret, err := modules.Baremetalagents.List(session, params)
- if err != nil {
- return err
- }
- var (
- cloudObj jsonutils.JSONObject
- agentId string
- agentName string
- )
- // create or update BaremetalAgent
- if len(ret.Data) == 0 {
- cloudObj, err = agent.createBaremetalAgent(session)
- if err != nil {
- return err
- }
- } else {
- cloudBmAgent := ret.Data[0]
- agentId, _ := cloudBmAgent.GetString("id")
- cloudObj, err = agent.updateBaremetalAgent(session, agentId, "")
- if err != nil {
- return err
- }
- }
- agentId, err = cloudObj.GetString("id")
- if err != nil {
- return err
- }
- agentName, err = cloudObj.GetString("name")
- if err != nil {
- return err
- }
- agent.AgentId = agentId
- agent.AgentName = agentName
- storageCacheId, _ := cloudObj.GetString("storagecache_id")
- if len(storageCacheId) > 0 {
- err = agent.updateStorageCache(session, storageCacheId)
- if err != nil {
- if httputils.ErrorCode(errors.Cause(err)) == 404 {
- storageCacheId = ""
- } else {
- return err
- }
- }
- }
- if len(storageCacheId) == 0 {
- storageCacheId, err = agent.createStorageCache(session)
- if err != nil {
- return err
- }
- cloudObj, err = agent.updateBaremetalAgent(session, agentId, storageCacheId)
- if err != nil {
- return err
- }
- newStorageCacheId, _ := cloudObj.GetString("storagecache_id")
- if newStorageCacheId != storageCacheId {
- // cleanup the newly created storagecache
- modules.Storagecaches.Delete(session, storageCacheId, nil)
- return errors.Error("agent not support storagecache_id, region might not be up-to-date")
- }
- }
- agent.CacheManager = storageman.NewLocalImageCacheManager(agent.IAgent(), agent.CachePath, storageCacheId, nil)
- return nil
- }
- func (agent *SBaseAgent) GetManagerUri() string {
- accessIP, _ := agent.IAgent().GetAccessIP()
- proto := "http"
- if agent.IAgent().GetEnableSsl() {
- proto = "https"
- }
- if accessIP.To4() == nil { // ipv6 addr
- return fmt.Sprintf("%s://[%s]:%d", proto, accessIP, agent.IAgent().GetPort())
- }
- return fmt.Sprintf("%s://%s:%d", proto, accessIP, agent.IAgent().GetPort())
- }
- func (agent *SBaseAgent) GetListenUri() string {
- listenIP, _ := agent.IAgent().GetListenIP()
- proto := "http"
- if agent.IAgent().GetEnableSsl() {
- proto = "https"
- }
- return fmt.Sprintf("%s://%s:%d", proto, listenIP, agent.IAgent().GetPort())
- }
- func (agent *SBaseAgent) getCreateUpdateInfo() (jsonutils.JSONObject, error) {
- params := jsonutils.NewDict()
- if agent.AgentId == "" {
- agentName, err := agent.getName()
- if err != nil {
- return nil, errors.Wrap(err, "agent.getName")
- }
- params.Add(jsonutils.NewString(agentName), "name")
- }
- accessIP, err := agent.IAgent().GetAccessIP()
- if err != nil {
- return nil, errors.Wrap(err, "agent.IAgent().GetAccessIP()")
- }
- params.Add(jsonutils.NewString(accessIP.String()), "access_ip")
- params.Add(jsonutils.NewString(agent.GetManagerUri()), "manager_uri")
- params.Add(jsonutils.NewString(agent.Zone.Id), "zone_id")
- params.Add(jsonutils.NewString(agent.IAgent().GetAgentType()), "agent_type")
- params.Add(jsonutils.NewString(version.GetShortString()), "version")
- return params, nil
- }
- func (agent *SBaseAgent) getName() (string, error) {
- accessIP, err := agent.IAgent().GetAccessIP()
- if err != nil {
- return "", err
- }
- return fmt.Sprintf("%s-%s", agent.IAgent().GetAgentType(), accessIP), nil
- }
- func (agent *SBaseAgent) createStorageCache(session *mcclient.ClientSession) (string, error) {
- body := jsonutils.NewDict()
- agentName, err := agent.getName()
- if err != nil {
- return "", errors.Wrap(err, "agent.getName")
- }
- body.Set("name", jsonutils.NewString("imagecache-"+agentName))
- body.Set("path", jsonutils.NewString(agent.CachePath))
- body.Set("external_id", jsonutils.NewString(agent.AgentId))
- sc, err := modules.Storagecaches.Create(session, body)
- if err != nil {
- return "", errors.Wrap(err, "modules.Storagecaches.Create")
- }
- storageCacheId, err := sc.GetString("id")
- if err != nil {
- return "", errors.Wrap(err, "sc.GetString id")
- }
- return storageCacheId, nil
- }
- func (agent *SBaseAgent) updateStorageCache(session *mcclient.ClientSession, storageCacheId string) error {
- body := jsonutils.NewDict()
- body.Set("path", jsonutils.NewString(agent.CachePath))
- body.Set("external_id", jsonutils.NewString(agent.AgentId))
- _, err := modules.Storagecaches.Update(session, storageCacheId, body)
- if err != nil {
- return errors.Wrap(err, "modules.Storagecaches.Update")
- }
- return nil
- }
- func (agent *SBaseAgent) createBaremetalAgent(session *mcclient.ClientSession) (jsonutils.JSONObject, error) {
- params, err := agent.getCreateUpdateInfo()
- if err != nil {
- return nil, err
- }
- return modules.Baremetalagents.Create(session, params)
- }
- func (agent *SBaseAgent) updateBaremetalAgent(session *mcclient.ClientSession, id string, storageCacheId string) (jsonutils.JSONObject, error) {
- var params jsonutils.JSONObject
- var err error
- if len(storageCacheId) > 0 {
- params = jsonutils.NewDict()
- params.(*jsonutils.JSONDict).Set("storagecache_id", jsonutils.NewString(storageCacheId))
- } else {
- params, err = agent.getCreateUpdateInfo()
- if err != nil {
- return nil, err
- }
- }
- return modules.Baremetalagents.Update(session, id, params)
- }
- func (agent *SBaseAgent) doOffline(session *mcclient.ClientSession) error {
- _, err := modules.Baremetalagents.PerformAction(session, agent.AgentId, "offline", nil)
- return err
- }
- func (agent *SBaseAgent) DoOnline(session *mcclient.ClientSession) error {
- _, err := modules.Baremetalagents.PerformAction(session, agent.AgentId, "online", nil)
- return err
- }
- func (agent *SBaseAgent) Start() error {
- err := agent.startRegister()
- if err != nil {
- return err
- }
- agent.IAgent().TuneSystem()
- return agent.IAgent().StartService()
- }
- func (agent *SBaseAgent) Stop() {
- agent.stop = true
- agent.IAgent().StopService()
- }
|