| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package worker
- import (
- "context"
- "runtime"
- "runtime/debug"
- "sync"
- "time"
- "github.com/vishvananda/netlink"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/version"
- "yunion.io/x/pkg/utils"
- "yunion.io/x/onecloud/pkg/apihelper"
- api "yunion.io/x/onecloud/pkg/apis/cloudproxy"
- common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
- agentmodels "yunion.io/x/onecloud/pkg/cloudproxy/agent/models"
- agentoptions "yunion.io/x/onecloud/pkg/cloudproxy/agent/options"
- agentssh "yunion.io/x/onecloud/pkg/cloudproxy/agent/ssh"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- cloudproxy_modules "yunion.io/x/onecloud/pkg/mcclient/modules/cloudproxy"
- "yunion.io/x/onecloud/pkg/mcclient/modules/yunionconf"
- "yunion.io/x/onecloud/pkg/util/netutils2"
- ssh_util "yunion.io/x/onecloud/pkg/util/ssh"
- )
- type Worker struct {
- commonOpts *common_options.CommonOptions
- opts *agentoptions.Options
- proxyAgentId string
- bindAddr string
- apih *apihelper.APIHelper
- clientSet *agentssh.ClientSet
- sessionCache *auth.SessionCache
- }
- func NewWorker(commonOpts *common_options.CommonOptions, opts *agentoptions.Options) *Worker {
- modelSets := agentmodels.NewModelSets()
- apiOpts := &apihelper.Options{
- CommonOptions: *commonOpts,
- SyncIntervalSeconds: opts.APISyncIntervalSeconds,
- ListBatchSize: opts.APIListBatchSize,
- }
- apih, err := apihelper.NewAPIHelper(apiOpts, modelSets)
- if err != nil {
- return nil
- }
- w := &Worker{
- commonOpts: commonOpts,
- opts: opts,
- proxyAgentId: opts.ProxyAgentId,
- apih: apih,
- clientSet: agentssh.NewClientSet(),
- sessionCache: &auth.SessionCache{
- Region: commonOpts.Region,
- UseAdminToken: true,
- EarlyRefresh: time.Hour,
- },
- }
- return w
- }
- func (w *Worker) initProxyAgent_(ctx context.Context) error {
- s := w.sessionCache.Get(ctx)
- var agentDetail api.ProxyAgentDetails
- {
- j, err := cloudproxy_modules.ProxyAgents.Get(s, w.proxyAgentId, nil)
- if err != nil {
- return errors.Wrapf(err, "fetch proxy agent %s", w.proxyAgentId)
- }
- if err := j.Unmarshal(&agentDetail); err != nil {
- return errors.Wrapf(err, "unmarshal proxy agent detail: %s", j.String())
- }
- if agentDetail.Id == "" {
- return errors.Error("proxy agent id is empty")
- }
- w.proxyAgentId = agentDetail.Id
- }
- bindAddrExist := func(addr string) bool {
- as, err := netlink.AddrList(nil, netlink.FAMILY_ALL)
- if err != nil {
- log.Fatalf("list system available addresses: %v", err)
- }
- for _, a := range as {
- ipstr := a.IPNet.IP.String()
- if addr == ipstr {
- return true
- }
- }
- return false
- }
- var (
- bindAddr string
- advertiseAddr string
- bindAddrUpdate = false
- advertiseAddrUpdate = false
- )
- if agentDetail.BindAddr == "" || !bindAddrExist(bindAddr) {
- var err error
- bindAddr, err = netutils2.MyIPSmart()
- if err != nil {
- return errors.Wrap(err, "find bind Addr")
- }
- bindAddrUpdate = true
- } else {
- bindAddr = agentDetail.BindAddr
- }
- w.bindAddr = bindAddr
- if agentDetail.AdvertiseAddr == "" || (bindAddrUpdate && agentDetail.AdvertiseAddr == agentDetail.BindAddr) {
- advertiseAddr = bindAddr
- advertiseAddrUpdate = true
- } else {
- advertiseAddr = agentDetail.AdvertiseAddr
- }
- if bindAddrUpdate || advertiseAddrUpdate {
- req := api.ProxyAgentUpdateInput{
- BindAddr: bindAddr,
- AdvertiseAddr: advertiseAddr,
- }
- reqJ := req.JSON(req)
- if _, err := cloudproxy_modules.ProxyAgents.Put(s, w.proxyAgentId, reqJ); err != nil {
- return errors.Wrapf(err, "update proxy agent addr: %s", reqJ.String())
- }
- }
- return nil
- }
- func (w *Worker) initProxyAgent(ctx context.Context) error {
- done, err := utils.NewFibonacciRetrierMaxElapse(
- w.opts.GetProxyAgentInitWaitDuration(),
- func(retrier utils.FibonacciRetrier) (bool, error) {
- err := w.initProxyAgent_(ctx)
- if err != nil {
- return false, err
- }
- return true, nil
- }).Start(ctx)
- if done {
- return nil
- }
- return err
- }
- func (w *Worker) Start(ctx context.Context) {
- wg := ctx.Value("wg").(*sync.WaitGroup)
- wg.Add(1)
- defer func() {
- log.Infoln("agent: worker bye")
- wg.Done()
- }()
- if err := w.initProxyAgent(ctx); err != nil {
- log.Errorf("init proxy agent: %v", err)
- return
- }
- go w.apih.Start(ctx, nil, "")
- const tickDur = 11 * time.Second
- var (
- mss *agentmodels.ModelSets
- tick = time.NewTicker(tickDur)
- )
- for {
- select {
- case imss := <-w.apih.ModelSets():
- log.Infof("agent: got new data from api helper")
- mss = imss.(*agentmodels.ModelSets)
- if err := w.run(ctx, mss); err != nil {
- log.Errorf("agent run: %v", err)
- }
- case <-tick.C:
- if mss != nil {
- if err := w.run(ctx, mss); err != nil {
- log.Errorf("agent refresh run: %v", err)
- }
- }
- case <-ctx.Done():
- return
- }
- }
- }
- func (w *Worker) run(ctx context.Context, mss *agentmodels.ModelSets) (err error) {
- defer func() {
- if panicVal := recover(); panicVal != nil {
- yunionconf.BugReport.SendBugReport(context.Background(), version.GetShortString(), string(debug.Stack()), errors.Errorf("%s", panicVal))
- if panicErr, ok := panicVal.(runtime.Error); ok {
- err = errors.Wrap(panicErr, string(debug.Stack()))
- } else if panicErr, ok := panicVal.(error); ok {
- err = panicErr
- } else {
- panic(panicVal)
- }
- }
- }()
- w.clientSet.ClearAllMark()
- for _, pep := range mss.ProxyEndpoints {
- cc := ssh_util.ClientConfig{
- Username: pep.User,
- Host: pep.Host,
- Port: pep.Port,
- PrivateKey: pep.PrivateKey,
- }
- if reset := w.clientSet.ResetIfChanged(ctx, pep.Id, cc); reset {
- log.Warningf("proxy endpoint %s changed, connections reset", pep.Id)
- } else if added := w.clientSet.AddIfNotExist(ctx, pep.Id, cc); added {
- log.Infof("proxy endpoint %s added", pep.Id)
- }
- }
- w.clientSet.ResetUnmarked(ctx)
- removes := w.clientSet.ForwardKeySet()
- adds := agentssh.ForwardKeySet{}
- for _, pep := range mss.ProxyEndpoints {
- for _, forward := range pep.Forwards {
- if forward.ProxyAgentId != w.proxyAgentId {
- continue
- }
- if forward.ProxyEndpointId == "" {
- continue
- }
- var (
- typ string
- addr string
- port int
- )
- switch forward.Type {
- case api.FORWARD_TYPE_LOCAL:
- addr = w.bindAddr
- port = forward.BindPort
- typ = agentssh.ForwardKeyTypeL
- case api.FORWARD_TYPE_REMOTE:
- addr = forward.ProxyEndpoint.IntranetIpAddr
- port = forward.BindPort
- typ = agentssh.ForwardKeyTypeR
- default:
- log.Warningf("unknown forward type %s", forward.Type)
- continue
- }
- fk := agentssh.ForwardKey{
- EpKey: forward.ProxyEndpointId,
- Type: typ,
- KeyAddr: addr,
- KeyPort: port,
- Value: forward,
- }
- if removes.Contains(fk) {
- removes.Remove(fk)
- } else {
- adds.Add(fk)
- }
- }
- }
- for _, fk := range removes {
- log.Infof("close forward %s", fk.Key())
- w.clientSet.CloseForward(ctx, fk)
- }
- for _, fk := range adds {
- log.Infof("open forward %s", fk.Key())
- forward := fk.Value.(*agentmodels.Forward)
- tick := tickDuration(forward.LastSeenTimeout)
- tickCb := heartbeatFunc(forward.Id, w.sessionCache)
- switch fk.Type {
- case agentssh.ForwardKeyTypeL:
- w.clientSet.LocalForward(ctx, fk.EpKey, agentssh.LocalForwardReq{
- LocalAddr: fk.KeyAddr,
- LocalPort: fk.KeyPort,
- RemoteAddr: forward.RemoteAddr,
- RemotePort: forward.RemotePort,
- Tick: tick,
- TickCb: tickCb,
- })
- case agentssh.ForwardKeyTypeR:
- w.clientSet.RemoteForward(ctx, fk.EpKey, agentssh.RemoteForwardReq{
- RemoteAddr: fk.KeyAddr,
- RemotePort: fk.KeyPort,
- LocalAddr: forward.RemoteAddr,
- LocalPort: forward.RemotePort,
- Tick: tick,
- TickCb: tickCb,
- })
- }
- }
- return nil
- }
|