worker.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package worker
  15. import (
  16. "context"
  17. "runtime"
  18. "runtime/debug"
  19. "sync"
  20. "time"
  21. "github.com/vishvananda/netlink"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/pkg/util/version"
  25. "yunion.io/x/pkg/utils"
  26. "yunion.io/x/onecloud/pkg/apihelper"
  27. api "yunion.io/x/onecloud/pkg/apis/cloudproxy"
  28. common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
  29. agentmodels "yunion.io/x/onecloud/pkg/cloudproxy/agent/models"
  30. agentoptions "yunion.io/x/onecloud/pkg/cloudproxy/agent/options"
  31. agentssh "yunion.io/x/onecloud/pkg/cloudproxy/agent/ssh"
  32. "yunion.io/x/onecloud/pkg/mcclient/auth"
  33. cloudproxy_modules "yunion.io/x/onecloud/pkg/mcclient/modules/cloudproxy"
  34. "yunion.io/x/onecloud/pkg/mcclient/modules/yunionconf"
  35. "yunion.io/x/onecloud/pkg/util/netutils2"
  36. ssh_util "yunion.io/x/onecloud/pkg/util/ssh"
  37. )
  38. type Worker struct {
  39. commonOpts *common_options.CommonOptions
  40. opts *agentoptions.Options
  41. proxyAgentId string
  42. bindAddr string
  43. apih *apihelper.APIHelper
  44. clientSet *agentssh.ClientSet
  45. sessionCache *auth.SessionCache
  46. }
  47. func NewWorker(commonOpts *common_options.CommonOptions, opts *agentoptions.Options) *Worker {
  48. modelSets := agentmodels.NewModelSets()
  49. apiOpts := &apihelper.Options{
  50. CommonOptions: *commonOpts,
  51. SyncIntervalSeconds: opts.APISyncIntervalSeconds,
  52. ListBatchSize: opts.APIListBatchSize,
  53. }
  54. apih, err := apihelper.NewAPIHelper(apiOpts, modelSets)
  55. if err != nil {
  56. return nil
  57. }
  58. w := &Worker{
  59. commonOpts: commonOpts,
  60. opts: opts,
  61. proxyAgentId: opts.ProxyAgentId,
  62. apih: apih,
  63. clientSet: agentssh.NewClientSet(),
  64. sessionCache: &auth.SessionCache{
  65. Region: commonOpts.Region,
  66. UseAdminToken: true,
  67. EarlyRefresh: time.Hour,
  68. },
  69. }
  70. return w
  71. }
  72. func (w *Worker) initProxyAgent_(ctx context.Context) error {
  73. s := w.sessionCache.Get(ctx)
  74. var agentDetail api.ProxyAgentDetails
  75. {
  76. j, err := cloudproxy_modules.ProxyAgents.Get(s, w.proxyAgentId, nil)
  77. if err != nil {
  78. return errors.Wrapf(err, "fetch proxy agent %s", w.proxyAgentId)
  79. }
  80. if err := j.Unmarshal(&agentDetail); err != nil {
  81. return errors.Wrapf(err, "unmarshal proxy agent detail: %s", j.String())
  82. }
  83. if agentDetail.Id == "" {
  84. return errors.Error("proxy agent id is empty")
  85. }
  86. w.proxyAgentId = agentDetail.Id
  87. }
  88. bindAddrExist := func(addr string) bool {
  89. as, err := netlink.AddrList(nil, netlink.FAMILY_ALL)
  90. if err != nil {
  91. log.Fatalf("list system available addresses: %v", err)
  92. }
  93. for _, a := range as {
  94. ipstr := a.IPNet.IP.String()
  95. if addr == ipstr {
  96. return true
  97. }
  98. }
  99. return false
  100. }
  101. var (
  102. bindAddr string
  103. advertiseAddr string
  104. bindAddrUpdate = false
  105. advertiseAddrUpdate = false
  106. )
  107. if agentDetail.BindAddr == "" || !bindAddrExist(bindAddr) {
  108. var err error
  109. bindAddr, err = netutils2.MyIPSmart()
  110. if err != nil {
  111. return errors.Wrap(err, "find bind Addr")
  112. }
  113. bindAddrUpdate = true
  114. } else {
  115. bindAddr = agentDetail.BindAddr
  116. }
  117. w.bindAddr = bindAddr
  118. if agentDetail.AdvertiseAddr == "" || (bindAddrUpdate && agentDetail.AdvertiseAddr == agentDetail.BindAddr) {
  119. advertiseAddr = bindAddr
  120. advertiseAddrUpdate = true
  121. } else {
  122. advertiseAddr = agentDetail.AdvertiseAddr
  123. }
  124. if bindAddrUpdate || advertiseAddrUpdate {
  125. req := api.ProxyAgentUpdateInput{
  126. BindAddr: bindAddr,
  127. AdvertiseAddr: advertiseAddr,
  128. }
  129. reqJ := req.JSON(req)
  130. if _, err := cloudproxy_modules.ProxyAgents.Put(s, w.proxyAgentId, reqJ); err != nil {
  131. return errors.Wrapf(err, "update proxy agent addr: %s", reqJ.String())
  132. }
  133. }
  134. return nil
  135. }
  136. func (w *Worker) initProxyAgent(ctx context.Context) error {
  137. done, err := utils.NewFibonacciRetrierMaxElapse(
  138. w.opts.GetProxyAgentInitWaitDuration(),
  139. func(retrier utils.FibonacciRetrier) (bool, error) {
  140. err := w.initProxyAgent_(ctx)
  141. if err != nil {
  142. return false, err
  143. }
  144. return true, nil
  145. }).Start(ctx)
  146. if done {
  147. return nil
  148. }
  149. return err
  150. }
  151. func (w *Worker) Start(ctx context.Context) {
  152. wg := ctx.Value("wg").(*sync.WaitGroup)
  153. wg.Add(1)
  154. defer func() {
  155. log.Infoln("agent: worker bye")
  156. wg.Done()
  157. }()
  158. if err := w.initProxyAgent(ctx); err != nil {
  159. log.Errorf("init proxy agent: %v", err)
  160. return
  161. }
  162. go w.apih.Start(ctx, nil, "")
  163. const tickDur = 11 * time.Second
  164. var (
  165. mss *agentmodels.ModelSets
  166. tick = time.NewTicker(tickDur)
  167. )
  168. for {
  169. select {
  170. case imss := <-w.apih.ModelSets():
  171. log.Infof("agent: got new data from api helper")
  172. mss = imss.(*agentmodels.ModelSets)
  173. if err := w.run(ctx, mss); err != nil {
  174. log.Errorf("agent run: %v", err)
  175. }
  176. case <-tick.C:
  177. if mss != nil {
  178. if err := w.run(ctx, mss); err != nil {
  179. log.Errorf("agent refresh run: %v", err)
  180. }
  181. }
  182. case <-ctx.Done():
  183. return
  184. }
  185. }
  186. }
  187. func (w *Worker) run(ctx context.Context, mss *agentmodels.ModelSets) (err error) {
  188. defer func() {
  189. if panicVal := recover(); panicVal != nil {
  190. yunionconf.BugReport.SendBugReport(context.Background(), version.GetShortString(), string(debug.Stack()), errors.Errorf("%s", panicVal))
  191. if panicErr, ok := panicVal.(runtime.Error); ok {
  192. err = errors.Wrap(panicErr, string(debug.Stack()))
  193. } else if panicErr, ok := panicVal.(error); ok {
  194. err = panicErr
  195. } else {
  196. panic(panicVal)
  197. }
  198. }
  199. }()
  200. w.clientSet.ClearAllMark()
  201. for _, pep := range mss.ProxyEndpoints {
  202. cc := ssh_util.ClientConfig{
  203. Username: pep.User,
  204. Host: pep.Host,
  205. Port: pep.Port,
  206. PrivateKey: pep.PrivateKey,
  207. }
  208. if reset := w.clientSet.ResetIfChanged(ctx, pep.Id, cc); reset {
  209. log.Warningf("proxy endpoint %s changed, connections reset", pep.Id)
  210. } else if added := w.clientSet.AddIfNotExist(ctx, pep.Id, cc); added {
  211. log.Infof("proxy endpoint %s added", pep.Id)
  212. }
  213. }
  214. w.clientSet.ResetUnmarked(ctx)
  215. removes := w.clientSet.ForwardKeySet()
  216. adds := agentssh.ForwardKeySet{}
  217. for _, pep := range mss.ProxyEndpoints {
  218. for _, forward := range pep.Forwards {
  219. if forward.ProxyAgentId != w.proxyAgentId {
  220. continue
  221. }
  222. if forward.ProxyEndpointId == "" {
  223. continue
  224. }
  225. var (
  226. typ string
  227. addr string
  228. port int
  229. )
  230. switch forward.Type {
  231. case api.FORWARD_TYPE_LOCAL:
  232. addr = w.bindAddr
  233. port = forward.BindPort
  234. typ = agentssh.ForwardKeyTypeL
  235. case api.FORWARD_TYPE_REMOTE:
  236. addr = forward.ProxyEndpoint.IntranetIpAddr
  237. port = forward.BindPort
  238. typ = agentssh.ForwardKeyTypeR
  239. default:
  240. log.Warningf("unknown forward type %s", forward.Type)
  241. continue
  242. }
  243. fk := agentssh.ForwardKey{
  244. EpKey: forward.ProxyEndpointId,
  245. Type: typ,
  246. KeyAddr: addr,
  247. KeyPort: port,
  248. Value: forward,
  249. }
  250. if removes.Contains(fk) {
  251. removes.Remove(fk)
  252. } else {
  253. adds.Add(fk)
  254. }
  255. }
  256. }
  257. for _, fk := range removes {
  258. log.Infof("close forward %s", fk.Key())
  259. w.clientSet.CloseForward(ctx, fk)
  260. }
  261. for _, fk := range adds {
  262. log.Infof("open forward %s", fk.Key())
  263. forward := fk.Value.(*agentmodels.Forward)
  264. tick := tickDuration(forward.LastSeenTimeout)
  265. tickCb := heartbeatFunc(forward.Id, w.sessionCache)
  266. switch fk.Type {
  267. case agentssh.ForwardKeyTypeL:
  268. w.clientSet.LocalForward(ctx, fk.EpKey, agentssh.LocalForwardReq{
  269. LocalAddr: fk.KeyAddr,
  270. LocalPort: fk.KeyPort,
  271. RemoteAddr: forward.RemoteAddr,
  272. RemotePort: forward.RemotePort,
  273. Tick: tick,
  274. TickCb: tickCb,
  275. })
  276. case agentssh.ForwardKeyTypeR:
  277. w.clientSet.RemoteForward(ctx, fk.EpKey, agentssh.RemoteForwardReq{
  278. RemoteAddr: fk.KeyAddr,
  279. RemotePort: fk.KeyPort,
  280. LocalAddr: forward.RemoteAddr,
  281. LocalPort: forward.RemotePort,
  282. Tick: tick,
  283. TickCb: tickCb,
  284. })
  285. }
  286. }
  287. return nil
  288. }