| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package lbagent
- import (
- "context"
- "fmt"
- "strings"
- "sync"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/version"
- "yunion.io/x/onecloud/pkg/apihelper"
- api "yunion.io/x/onecloud/pkg/apis/compute"
- computemodels "yunion.io/x/onecloud/pkg/compute/models"
- "yunion.io/x/onecloud/pkg/hostman/guestfs/fsdriver"
- agentmodels "yunion.io/x/onecloud/pkg/lbagent/models"
- agentutils "yunion.io/x/onecloud/pkg/lbagent/utils"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- options "yunion.io/x/onecloud/pkg/mcclient/options/compute"
- "yunion.io/x/onecloud/pkg/util/netutils2"
- )
- type ApiHelper struct {
- opts *Options
- lbagentId string
- dataDirMan *agentutils.ConfigDirManager
- apih *apihelper.APIHelper
- corpus *agentmodels.LoadbalancerCorpus
- agentParams *agentmodels.AgentParams
- haState string
- haStateProvider HaStateProvider
- mcclientSession *mcclient.ClientSession
- ovn *OvnWorker
- }
- func NewApiHelper(opts *Options, lbagentId string) (*ApiHelper, error) {
- corpus := agentmodels.NewEmptyLoadbalancerCorpus()
- apiOpts := &apihelper.Options{
- CommonOptions: opts.CommonOptions,
- SyncIntervalSeconds: opts.ApiSyncIntervalSeconds,
- RunDelayMilliseconds: opts.ApiRunDelayMilliseconds,
- ListBatchSize: opts.ApiListBatchSize,
- }
- apih, err := apihelper.NewAPIHelper(apiOpts, corpus.ModelSets)
- if err != nil {
- return nil, errors.Wrap(err, "new apihelper")
- }
- helper := &ApiHelper{
- opts: opts,
- lbagentId: lbagentId,
- dataDirMan: agentutils.NewConfigDirManager(opts.apiDataStoreDir),
- apih: apih,
- corpus: corpus,
- haState: api.LB_HA_STATE_UNKNOWN,
- }
- return helper, nil
- }
- func (h *ApiHelper) deployAdminAuthorizedKeys(ctx context.Context) {
- err := fsdriver.DeployAdminAuthorizedKeys(h.adminClientSession(ctx))
- if err != nil {
- log.Errorf("DeployAdminAuthorizedKeys %s", err)
- }
- }
- func (h *ApiHelper) Run(ctx context.Context) {
- // deploy host admin key
- h.deployAdminAuthorizedKeys(ctx)
- wg := ctx.Value("wg").(*sync.WaitGroup)
- defer func() {
- wg.Done()
- log.Infof("api helper bye")
- }()
- h.haState = <-h.haStateProvider.StateChannel()
- log.Infof("initial haState: %s", h.haState)
- switch h.haState {
- case api.LB_HA_STATE_BACKUP:
- default:
- h.startOvnWorker(ctx)
- }
- wg.Add(1)
- go h.apih.Start(ctx, nil, "")
- hbTicker := time.NewTicker(time.Duration(h.opts.ApiLbagentHbInterval) * time.Second)
- agentParamsSyncTicker := time.NewTicker(time.Duration(h.opts.ApiSyncIntervalSeconds) * time.Second)
- defer hbTicker.Stop()
- defer agentParamsSyncTicker.Stop()
- for {
- select {
- case <-hbTicker.C:
- _, err := h.doHb(ctx)
- if err != nil {
- log.Errorf("heartbeat: %s", err)
- }
- case imss := <-h.apih.ModelSets():
- log.Infof("got new data from api helper")
- mss := imss.(*agentmodels.ModelSets)
- h.corpus.ModelSets = mss
- h.doUseCorpus(ctx)
- h.agentUpdateSeen(ctx)
- err := h.saveCorpus(ctx)
- if err != nil {
- log.Errorf("save corpus failed: %s", err)
- } else {
- if err := h.dataDirMan.Prune(h.opts.DataPreserveN); err != nil {
- log.Errorf("prune corpus data dir failed: %s", err)
- }
- }
- case <-agentParamsSyncTicker.C:
- changed := h.doSyncAgentParams(ctx)
- if changed {
- log.Infof("agent params changed")
- h.doUseCorpus(ctx)
- }
- case state := <-h.haStateProvider.StateChannel():
- log.Infof("current state: %s ha_state: %s", h.haState, state)
- switch state {
- case api.LB_HA_STATE_BACKUP:
- h.stopOvnWorker()
- h.doStopDaemons(ctx)
- default:
- if state != h.haState {
- // try your best to make things up
- h.startOvnWorker(ctx)
- h.doUseCorpus(ctx)
- }
- }
- h.haState = state
- case <-ctx.Done():
- return
- }
- }
- }
- func (h *ApiHelper) SetHaStateProvider(hsp HaStateProvider) {
- h.haStateProvider = hsp
- }
- func (h *ApiHelper) startOvnWorker(ctx context.Context) {
- if h.ovn == nil && !h.opts.DisableLocalVpc {
- h.ovn = NewOvnWorker(h.opts)
- go h.ovn.Start(ctx)
- }
- }
- func (h *ApiHelper) stopOvnWorker() {
- if h.ovn != nil {
- h.ovn.Stop()
- h.ovn = nil
- }
- }
- func (h *ApiHelper) adminClientSession(ctx context.Context) *mcclient.ClientSession {
- s := h.mcclientSession
- if s != nil {
- token := s.GetToken()
- expires := token.GetExpires()
- if time.Now().Add(time.Hour).After(expires) {
- return s
- }
- }
- region := h.opts.CommonOptions.Region
- h.mcclientSession = auth.GetAdminSession(ctx, region)
- return h.mcclientSession
- }
- func (h *ApiHelper) agentPeekOnce(ctx context.Context) (*computemodels.SLoadbalancerAgent, error) {
- s := h.adminClientSession(ctx)
- params := jsonutils.NewDict()
- params.Set(api.LBAGENT_QUERY_ORIG_KEY, jsonutils.NewString(api.LBAGENT_QUERY_ORIG_VAL))
- data, err := modules.LoadbalancerAgents.Get(s, h.lbagentId, params)
- if err != nil {
- err := fmt.Errorf("agent get error: %s", err)
- return nil, err
- }
- agent := &computemodels.SLoadbalancerAgent{}
- err = data.Unmarshal(agent)
- if err != nil {
- err := fmt.Errorf("agent data unmarshal error: %s", err)
- return nil, err
- }
- return agent, nil
- }
- func (h *ApiHelper) agentPeekPeers(ctx context.Context, agent *computemodels.SLoadbalancerAgent) ([]*computemodels.SLoadbalancerAgent, error) {
- vri := agent.Params.Vrrp.VirtualRouterId
- clusterId := agent.ClusterId
- s := h.adminClientSession(ctx)
- params := jsonutils.NewDict()
- params.Set(api.LBAGENT_QUERY_ORIG_KEY, jsonutils.NewString(api.LBAGENT_QUERY_ORIG_VAL))
- params.Set("cluster_id", jsonutils.NewString(clusterId))
- listResult, err := modules.LoadbalancerAgents.List(s, params)
- if err != nil {
- err := fmt.Errorf("agent listing error: %s", err)
- return nil, err
- }
- peers := []*computemodels.SLoadbalancerAgent{}
- for _, data := range listResult.Data {
- peerAgent := &computemodels.SLoadbalancerAgent{}
- err := data.Unmarshal(peerAgent)
- if err != nil {
- err := fmt.Errorf("agent data unmarshal error: %s", err)
- return nil, err
- }
- // just in case
- if peerAgent.ClusterId != clusterId {
- continue
- }
- if peerAgent.Params.Vrrp.VirtualRouterId != vri {
- continue
- }
- peers = append(peers, peerAgent)
- }
- return peers, nil
- }
- type agentPeekResult computemodels.SLoadbalancerAgent
- func (r *agentPeekResult) staleInFuture(s int) bool {
- if r.HbLastSeen.IsZero() {
- return true
- }
- duration := time.Since(r.HbLastSeen).Seconds()
- if int(duration) < s {
- return true
- }
- return false
- }
- /*
- func (h *ApiHelper) agentPeek(ctx context.Context) *agentPeekResult {
- doPeekWithLog := func() *computemodels.SLoadbalancerAgent {
- agent, err := h.agentPeekOnce(ctx)
- if err != nil {
- log.Errorf("agent peek failed: %s", err)
- }
- return agent
- }
- agent := doPeekWithLog()
- if agent == nil {
- initHbTicker := time.NewTicker(time.Duration(3) * time.Second)
- defer initHbTicker.Stop()
- initHbDone:
- for {
- select {
- case <-initHbTicker.C:
- agent = doPeekWithLog()
- if agent != nil {
- break initHbDone
- }
- case <-ctx.Done():
- return nil
- }
- }
- }
- return (*agentPeekResult)(agent)
- }
- */
- func (h *ApiHelper) agentUpdateSeen(ctx context.Context) *computemodels.SLoadbalancerAgent {
- s := h.adminClientSession(ctx)
- params := h.corpus.MaxSeenUpdatedAtParams()
- data, err := modules.LoadbalancerAgents.Update(s, h.lbagentId, params)
- if err != nil {
- log.Errorf("agent get error: %s", err)
- return nil
- }
- agent := &computemodels.SLoadbalancerAgent{}
- err = data.Unmarshal(agent)
- if err != nil {
- log.Errorf("agent data unmarshal error: %s", err)
- return nil
- }
- return agent
- }
- func (h *ApiHelper) newAgentHbParams(ctx context.Context) (*jsonutils.JSONDict, error) {
- ip, err := netutils2.MyIPSmart()
- if err != nil {
- return nil, err
- }
- state := h.haState
- version := version.Get().GitVersion
- opts := &options.LoadbalancerAgentActionHbOptions{
- IP: ip,
- HaState: state,
- Version: version,
- }
- params, err := opts.Params()
- if err != nil {
- return nil, err
- }
- return params, nil
- }
- func (h *ApiHelper) doHb(ctx context.Context) (*computemodels.SLoadbalancerAgent, error) {
- // TODO check if things changed recently
- s := h.adminClientSession(ctx)
- params, err := h.newAgentHbParams(ctx)
- if err != nil {
- return nil, fmt.Errorf("heartbeat: making params: %s", err)
- }
- data, err := modules.LoadbalancerAgents.PerformAction(s, h.lbagentId, "hb", params)
- if err != nil {
- err := fmt.Errorf("heartbeat api error: %s", err)
- return nil, err
- }
- agent := &computemodels.SLoadbalancerAgent{}
- err = data.Unmarshal(agent)
- if err != nil {
- err := fmt.Errorf("heartbeat data unmarshal error: %s", err)
- return nil, err
- }
- return agent, nil
- }
- func (h *ApiHelper) saveCorpus(ctx context.Context) error {
- _, err := h.dataDirMan.NewDir(func(dir string) error {
- err := h.corpus.SaveDir(dir)
- if err != nil {
- return fmt.Errorf("save to dir %s: %s", dir, err)
- }
- return nil
- })
- return err
- }
- func (h *ApiHelper) doSyncAgentParams(ctx context.Context) bool {
- agent, err := h.agentPeekOnce(ctx)
- if err != nil {
- log.Errorf("agent params get failure: %s", err)
- return false
- }
- peers, err := h.agentPeekPeers(ctx, agent)
- if err != nil {
- log.Errorf("agent get peers failure: %s", err)
- return false
- }
- unicastPeer := []string{}
- for _, peer := range peers {
- if peer.Id == agent.Id {
- continue
- }
- if peer.IP == "" {
- log.Warningf("agent %s(%s) has no ip, use multicast vrrp", peer.Name, peer.Id)
- break
- }
- unicastPeer = append(unicastPeer, peer.IP)
- }
- useUnicast := len(unicastPeer) == len(peers)-1
- agentParams, err := agentmodels.NewAgentParams(agent)
- if err != nil {
- log.Errorf("agent params prepare failure: %s", err)
- return false
- }
- agentParams.SetVrrpParams("notify_script", h.haStateProvider.StateScript())
- if useUnicast {
- agentParams.SetVrrpParams("unicast_peer", unicastPeer)
- }
- if !agentParams.Equals(h.agentParams) {
- if useUnicast {
- log.Infof("use unicast vrrp from %s to %s", agent.IP, strings.Join(unicastPeer, ","))
- }
- h.agentParams = agentParams
- return true
- }
- return false
- }
- func (h *ApiHelper) doUseCorpus(ctx context.Context) {
- if h.corpus == nil || h.corpus.ModelSets == nil {
- log.Warningf("agent corpus nil")
- return
- }
- if h.agentParams == nil {
- log.Warningf("agent params nil")
- return
- }
- if h.ovn != nil {
- if err := h.ovn.Refresh(ctx, h.corpus.ModelSets.Loadbalancers); err != nil {
- log.Errorf("ovn refresh: %v", err)
- }
- }
- log.Infof("make effect new corpus and params")
- cmdData := &LbagentCmdUseCorpusData{
- Corpus: h.corpus,
- AgentParams: h.agentParams,
- }
- cmdData.Wg.Add(1)
- cmd := &LbagentCmd{
- Type: LbagentCmdUseCorpus,
- Data: cmdData,
- }
- cmdChan := ctx.Value("cmdChan").(chan *LbagentCmd)
- select {
- case cmdChan <- cmd:
- cmdData.Wg.Wait()
- case <-ctx.Done():
- return
- }
- }
- func (h *ApiHelper) doStopDaemons(ctx context.Context) {
- cmd := &LbagentCmd{
- Type: LbagentCmdStopDaemons,
- }
- cmdChan := ctx.Value("cmdChan").(chan *LbagentCmd)
- select {
- case cmdChan <- cmd:
- case <-ctx.Done():
- }
- }
|