ovn.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lbagent
  15. import (
  16. "context"
  17. "crypto/md5"
  18. "encoding/base64"
  19. "fmt"
  20. "io/ioutil"
  21. "net"
  22. "os"
  23. "os/exec"
  24. "path"
  25. "runtime"
  26. "strings"
  27. "sync"
  28. "time"
  29. "github.com/coreos/go-iptables/iptables"
  30. "github.com/vishvananda/netlink"
  31. "github.com/vishvananda/netns"
  32. "golang.org/x/sys/unix"
  33. "yunion.io/x/log"
  34. "yunion.io/x/pkg/errors"
  35. computeapis "yunion.io/x/onecloud/pkg/apis/compute"
  36. agentmodels "yunion.io/x/onecloud/pkg/lbagent/models"
  37. "yunion.io/x/onecloud/pkg/util/iproute2"
  38. )
  39. const (
  40. ErrOvnService = errors.Error("ovn controller")
  41. ErrOvnConfig = errors.Error("ovn controller configuration")
  42. )
  43. type empty struct{}
  44. type iptRule struct {
  45. rule string
  46. comment string
  47. // computed
  48. key string
  49. }
  50. type Bar struct {
  51. ctx context.Context
  52. cancel context.CancelFunc
  53. wg *sync.WaitGroup
  54. }
  55. func NewBar() *Bar {
  56. ctx := context.Background()
  57. ctx, cancel := context.WithCancel(ctx)
  58. b := &Bar{
  59. ctx: ctx,
  60. cancel: cancel,
  61. wg: &sync.WaitGroup{},
  62. }
  63. return b
  64. }
  65. func (b *Bar) Init() {
  66. b.wg.Add(1)
  67. }
  68. func (b *Bar) Done() {
  69. b.wg.Done()
  70. }
  71. func (b *Bar) Cancelled() <-chan struct{} {
  72. return b.ctx.Done()
  73. }
  74. func (b *Bar) Cancel() {
  75. b.cancel()
  76. b.wg.Wait()
  77. }
  78. type OvnHost struct {
  79. lb *agentmodels.Loadbalancer
  80. ovnBridge string
  81. macaddr string
  82. ipaddr string
  83. masklen int
  84. gateway string
  85. vethAddrInner string
  86. vethAddrOuter string
  87. addrIdxInner uint16
  88. addrIdxOuter uint16
  89. vethAddrInnerPortMap map[string]int // key backendId
  90. portMap map[int]empty // key port
  91. portIdx int
  92. Bar *Bar
  93. refreshCh chan *ovnHostRefreshCmd
  94. ns netns.NsHandle
  95. }
  96. type ovnHostRefreshCmd struct {
  97. lb *agentmodels.Loadbalancer
  98. done chan error
  99. }
  100. func newOvnHost(bridge string) *OvnHost {
  101. ovnHost := &OvnHost{
  102. vethAddrInnerPortMap: map[string]int{},
  103. portMap: map[int]empty{},
  104. Bar: NewBar(),
  105. refreshCh: make(chan *ovnHostRefreshCmd),
  106. ns: -1,
  107. ovnBridge: bridge,
  108. }
  109. return ovnHost
  110. }
  111. func (ovnHost *OvnHost) SetAddrIdx(inner, outer uint16) {
  112. ovnHost.addrIdxInner = inner
  113. ovnHost.addrIdxOuter = outer
  114. ovnHost.vethAddrInner = fmt.Sprintf("169.254.%d.%d", (inner>>8)&0xff, inner&0xff)
  115. ovnHost.vethAddrOuter = fmt.Sprintf("169.254.%d.%d", (outer>>8)&0xff, outer&0xff)
  116. }
  117. func (ovnHost *OvnHost) Start(ctx context.Context) {
  118. ovnHost.Bar.Init()
  119. defer ovnHost.Bar.Done()
  120. tick := time.NewTicker(11 * time.Second)
  121. for {
  122. select {
  123. case <-tick.C:
  124. ovnHost.run(ctx)
  125. case cmd := <-ovnHost.refreshCh:
  126. err := ovnHost.refresh(ctx, cmd.lb)
  127. ovnHost.run(ctx)
  128. cmd.done <- err
  129. case <-ovnHost.Bar.Cancelled():
  130. ovnHost.stop()
  131. return
  132. case <-ctx.Done():
  133. return
  134. }
  135. }
  136. }
  137. // stop releases resources, cleans up modifications. It's reentrant
  138. func (ovnHost *OvnHost) stop() {
  139. ovnHost.cleanUp()
  140. }
  141. func (ovnHost *OvnHost) cleanUp() {
  142. ovnHost.cleanUpNetns()
  143. ovnHost.cleanUpIface()
  144. }
  145. func (ovnHost *OvnHost) cleanUpIface() {
  146. ovnHost.cleanUpBridge()
  147. ovnHost.cleanUpVethPair()
  148. }
  149. func (ovnHost *OvnHost) cleanUpNetns() {
  150. netns.DeleteNamed(ovnHost.netnsName())
  151. if int(ovnHost.ns) >= 0 {
  152. ovnHost.ns.Close()
  153. }
  154. }
  155. func (ovnHost *OvnHost) cleanUpBridge() {
  156. bridge := ovnHost.ovnBridge
  157. _, peer0 := ovnHost.ovnPairName()
  158. args := []string{
  159. "ovs-vsctl",
  160. "--", "--if-exists", "del-port", bridge, peer0,
  161. }
  162. var cancelFunc context.CancelFunc
  163. ctx := context.Background()
  164. ctx, cancelFunc = context.WithTimeout(ctx, time.Second*7)
  165. defer cancelFunc()
  166. cmd := exec.CommandContext(ctx, args[0], args[1:]...)
  167. _, err := cmd.Output()
  168. if err != nil {
  169. err = errors.Wrapf(err, "cleanup %q: %s", bridge, strings.Join(args, " "))
  170. log.Errorf("%v", err)
  171. }
  172. }
  173. func (ovnHost *OvnHost) cleanUpVethPair() {
  174. _, peer0 := ovnHost.ovnPairName()
  175. _, peer1 := ovnHost.haproxyPairName()
  176. for _, name := range []string{peer0, peer1} {
  177. link, err := netlink.LinkByName(name)
  178. if err != nil {
  179. if _, ok := err.(netlink.LinkNotFoundError); ok {
  180. continue
  181. }
  182. err = errors.Wrapf(err, "ovnHost.stop: LinkByName(%q)", name)
  183. log.Errorf("%v", err)
  184. continue
  185. }
  186. if err := netlink.LinkDel(link); err != nil {
  187. err = errors.Wrapf(err, "ovnHost.stop: LinkDel(%q)", name)
  188. log.Errorf("%v", err)
  189. }
  190. }
  191. }
  192. func (ovnHost *OvnHost) Stop() {
  193. ovnHost.Bar.Cancel()
  194. }
  195. func (ovnHost *OvnHost) netnsName() string {
  196. return fmt.Sprintf("lb-%s", ovnHost.lb.Id)
  197. }
  198. func (ovnHost *OvnHost) lspName() string {
  199. return fmt.Sprintf("iface/lb/%s", ovnHost.lb.Id)
  200. }
  201. func (ovnHost *OvnHost) vethPairName(p string) (string, string) {
  202. // - index 0 is for the ovn host side
  203. // - index 1 is for the outside side (initial namespace)
  204. pref := fmt.Sprintf("%s%s", p, ovnHost.lb.Id[:15-len(p)-1])
  205. return pref + "0", pref + "1"
  206. }
  207. func (ovnHost *OvnHost) ovnPairName() (string, string) {
  208. // o is for connecting to ovn virtual network
  209. return ovnHost.vethPairName("o")
  210. }
  211. func (ovnHost *OvnHost) haproxyPairName() (string, string) {
  212. // h is for use with haproxy
  213. return ovnHost.vethPairName("h")
  214. }
  215. func (ovnHost *OvnHost) run(ctx context.Context) {
  216. defer func() {
  217. if msg := recover(); msg != nil {
  218. log.Errorf("ovn host for loadbalancer %s: %s", ovnHost.lb.Id, msg)
  219. }
  220. }()
  221. // - peer is the one in initial net namespace
  222. me0, peer0 := ovnHost.ovnPairName()
  223. me1, peer1 := ovnHost.haproxyPairName()
  224. ovnHost.ensureNs(ctx)
  225. ovnHost.ensureCleanNs(ctx, me0, peer0)
  226. ovnHost.ensureCleanNs(ctx, me1, peer1)
  227. ovnHost.ensureVethPair(ctx, me0, peer0)
  228. ovnHost.ensureVethPair(ctx, me1, peer1)
  229. if err := ovnHost.nsRun(ctx, func(ctx context.Context) error {
  230. // ovn side
  231. if err := iproute2.NewLink(me0).Address(ovnHost.macaddr).Up().Err(); err != nil {
  232. panic(errors.Wrapf(err, "set link %q up", me0))
  233. }
  234. if err := iproute2.NewAddress(me0, fmt.Sprintf("%s/%d", ovnHost.ipaddr, ovnHost.masklen)).Exact().Err(); err != nil {
  235. panic(errors.Wrapf(err, "set address of link %q", me0))
  236. }
  237. if err := iproute2.NewRoute(me0).AddByCidr("0.0.0.0/0", ovnHost.gateway).Err(); err != nil {
  238. panic(errors.Wrapf(err, "add route to inner addr %q", ovnHost.vethAddrInner))
  239. }
  240. // haproxy side
  241. if err := iproute2.NewLink(me1).Up().Err(); err != nil {
  242. panic(errors.Wrapf(err, "set link %q up", me0))
  243. }
  244. if err := iproute2.NewAddress(me1, ovnHost.vethAddrInner+"/16").Exact().Err(); err != nil {
  245. panic(errors.Wrapf(err, "set address of link %q", me0))
  246. }
  247. var (
  248. // NOTE: make sure arguments do not contain blank chars
  249. // or such things to avoid bad things like injection
  250. preRules []iptRule
  251. postRules []iptRule
  252. )
  253. // iptables SNAT rule
  254. postRules = append(postRules, iptRule{
  255. rule: fmt.Sprintf("-o %s -j SNAT --to-source %s", me0, ovnHost.ipaddr),
  256. comment: "snat-ovn",
  257. })
  258. postRules = append(postRules, iptRule{
  259. rule: fmt.Sprintf("-o %s -j SNAT --to-source %s", me1, ovnHost.vethAddrInner),
  260. comment: "snat-haproxy",
  261. })
  262. // iptables listener rule
  263. for _, listener := range ovnHost.lb.Listeners {
  264. var proto string
  265. switch listener.ListenerType {
  266. case computeapis.LB_LISTENER_TYPE_TCP,
  267. computeapis.LB_LISTENER_TYPE_HTTP,
  268. computeapis.LB_LISTENER_TYPE_HTTPS:
  269. proto = "tcp"
  270. case computeapis.LB_LISTENER_TYPE_UDP:
  271. proto = "udp"
  272. default:
  273. panic(errors.Errorf("listener %s(%s) protocol: %q", listener.Name, listener.Id, listener.ListenerType))
  274. }
  275. preRules = append(preRules, iptRule{
  276. rule: fmt.Sprintf("-i %s -d %s -p %s --dport %d -j DNAT --to-destination %s:%d",
  277. me0, ovnHost.ipaddr, proto, listener.ListenerPort, ovnHost.vethAddrOuter, listener.ListenerPort),
  278. comment: fmt.Sprintf("listener/%s/%s/%d", listener.Id, listener.ListenerType, listener.ListenerPort),
  279. })
  280. }
  281. // iptables backend rule
  282. for _, backendGroup := range ovnHost.lb.BackendGroups {
  283. for _, backend := range backendGroup.Backends {
  284. port, ok := ovnHost.vethAddrInnerPortMap[backend.Id]
  285. if !ok {
  286. log.Warningf("cannot find netns port for backend %s(%s) %s:%d",
  287. backend.Name, backend.Id, backend.Address, backend.Port)
  288. continue
  289. }
  290. for _, proto := range []string{"tcp", "udp"} {
  291. preRules = append(preRules, iptRule{
  292. rule: fmt.Sprintf("-i %s -d %s -p %s --dport %d -j DNAT --to-destination %s:%d",
  293. me1, ovnHost.vethAddrInner, proto, port, backend.Address, backend.Port),
  294. comment: fmt.Sprintf("backend/%s/%s/%s/%d", backend.Id, proto, backend.Address, backend.Port),
  295. })
  296. }
  297. }
  298. }
  299. ovnHost.ensureIptRules(ctx, "nat", "PREROUTING", preRules)
  300. ovnHost.ensureIptRules(ctx, "nat", "POSTROUTING", postRules)
  301. return nil
  302. }); err != nil {
  303. panic(errors.Wrap(err, "run in netns"))
  304. }
  305. { // setup brvpc
  306. bridge := ovnHost.ovnBridge
  307. lsp := ovnHost.lspName()
  308. args := []string{
  309. "ovs-vsctl",
  310. "--", "--may-exist", "add-port", bridge, peer0,
  311. "--", "set", "Interface", peer0, "external_ids:iface-id=" + lsp,
  312. }
  313. cmd := exec.CommandContext(ctx, args[0], args[1:]...)
  314. _, err := cmd.Output()
  315. if err != nil {
  316. panic(errors.Wrap(err, strings.Join(args, " ")))
  317. }
  318. }
  319. // set peer links up
  320. for _, link := range []string{peer0, peer1} {
  321. if err := iproute2.NewLink(link).Up().Err(); err != nil {
  322. panic(errors.Wrapf(err, "set link %q up", link))
  323. }
  324. }
  325. // set address, route for peer1 link
  326. if peer1Addr, err := netlink.ParseAddr(ovnHost.vethAddrOuter + "/16"); err != nil {
  327. panic(errors.Wrapf(err, "parse addr %q for link %q", ovnHost.vethAddrOuter+"/16", peer1))
  328. } else {
  329. peer1Addr.Flags |= unix.IFA_F_NOPREFIXROUTE
  330. if err := iproute2.NewAddressEx(peer1, peer1Addr).Exact().Err(); err != nil {
  331. panic(errors.Wrapf(err, "set address of link %q", peer1))
  332. }
  333. }
  334. if err := iproute2.NewRoute(peer1).AddByCidr(ovnHost.vethAddrInner+"/32", "").Err(); err != nil {
  335. panic(errors.Wrapf(err, "add route to inner addr %q", ovnHost.vethAddrInner))
  336. }
  337. }
  338. func (ovnHost *OvnHost) ensureNs(ctx context.Context) {
  339. if int(ovnHost.ns) >= 0 {
  340. return
  341. }
  342. netnsName := ovnHost.netnsName()
  343. {
  344. p := path.Join("/var/run/netns", netnsName)
  345. if _, err := ioutil.ReadFile(p); err == nil {
  346. err := os.Remove(p)
  347. log.Warningf("removing possible leftover file: %s: %v", p, err)
  348. }
  349. }
  350. if ns, err := netns.GetFromName(netnsName); err == nil {
  351. ovnHost.ns = ns
  352. return
  353. }
  354. if err := ovnHost.nsRun_(ctx, func(ctx context.Context) error {
  355. var err error
  356. ovnHost.ns, err = netns.NewNamed(netnsName)
  357. if err != nil {
  358. return errors.Wrapf(err, "new netns")
  359. }
  360. return nil
  361. }); err != nil {
  362. panic(err)
  363. }
  364. }
  365. func (ovnHost *OvnHost) ensureCleanNs(ctx context.Context, me, peer string) {
  366. var meOk, peerOk bool
  367. ovnHost.nsRun(ctx, func(ctx context.Context) error {
  368. _, err := netlink.LinkByName(me)
  369. if err == nil {
  370. meOk = true
  371. }
  372. return nil
  373. })
  374. _, err := netlink.LinkByName(peer)
  375. if err == nil {
  376. peerOk = true
  377. }
  378. if !meOk && peerOk {
  379. log.Warningf("clean up iface because %q %v, %q %v", me, meOk, peer, peerOk)
  380. ovnHost.cleanUpIface()
  381. }
  382. }
  383. func (ovnHost *OvnHost) ensureVethPair(ctx context.Context, me, peer string) {
  384. link, err := netlink.LinkByName(peer)
  385. if err != nil {
  386. if _, ok := err.(netlink.LinkNotFoundError); !ok {
  387. panic(errors.Wrapf(err, "veth: LinkByName(%q)", peer))
  388. }
  389. } else if typ := link.Type(); typ != "veth" {
  390. panic(errors.Wrapf(err, "veth: LinkByName(%q) found but of type %s, not veth", peer, typ))
  391. } else {
  392. return
  393. }
  394. veth := &netlink.Veth{}
  395. veth.Name = me
  396. veth.PeerName = peer
  397. if err := netlink.LinkAdd(veth); err != nil {
  398. panic(errors.Wrapf(err, "add veth pair %q, %q", me, peer))
  399. }
  400. if err = netlink.LinkSetNsFd(veth, int(ovnHost.ns)); err != nil {
  401. panic(errors.Wrapf(err, "set ns of link %q", veth.Name))
  402. }
  403. }
  404. func (ovnHost *OvnHost) ensureIptRules(ctx context.Context, tbl, chain string, iptRules []iptRule) {
  405. ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
  406. if err != nil {
  407. panic(errors.Wrap(err, "ipt client"))
  408. }
  409. oldLines, err := ipt.List(tbl, chain)
  410. if err != nil {
  411. panic(errors.Wrapf(err, "ipt list tbl %q chain %q", tbl, chain))
  412. }
  413. for i := len(iptRules) - 1; i >= 0; i-- {
  414. iptRule := &iptRules[i]
  415. h := md5.Sum([]byte(iptRule.rule + iptRule.comment))
  416. key := base64.RawStdEncoding.EncodeToString(h[:])
  417. iptRule.key = key
  418. for j := len(oldLines) - 1; j >= 0; j-- {
  419. line := oldLines[j]
  420. if strings.Contains(line, key) {
  421. // found a match
  422. iptRules = append(iptRules[:i], iptRules[i+1:]...)
  423. oldLines = append(oldLines[:j], oldLines[j+1:]...)
  424. break
  425. }
  426. }
  427. }
  428. // cleanup old lines
  429. for _, line := range oldLines {
  430. if !strings.HasPrefix(line, "-A ") {
  431. continue
  432. }
  433. args := strings.Fields(line)
  434. if len(args) < 2 {
  435. continue
  436. }
  437. args = args[2:]
  438. for i, arg := range args {
  439. if len(arg) >= 2 && strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) {
  440. args[i] = arg[1 : len(arg)-1]
  441. }
  442. }
  443. if err := ipt.Delete(tbl, chain, args...); err != nil {
  444. panic(errors.Wrapf(err, "ipt delete %q %q: %q", tbl, chain, line))
  445. }
  446. }
  447. // add missing ones
  448. for i := range iptRules {
  449. iptRule := &iptRules[i]
  450. rule := fmt.Sprintf("-m comment --comment %s:%s ", iptRule.key, iptRule.comment)
  451. rule += iptRule.rule
  452. args := strings.Fields(rule)
  453. if err := ipt.Append(tbl, chain, args...); err != nil {
  454. panic(errors.Wrapf(err, "ipt append %q %q: %q", tbl, chain, rule))
  455. }
  456. }
  457. }
  458. func (ovnHost *OvnHost) nsRun(ctx context.Context, f func(ctx context.Context) error) error {
  459. return ovnHost.nsRun_(ctx, func(ctx context.Context) error {
  460. if err := netns.Set(ovnHost.ns); err != nil {
  461. return errors.Wrapf(err, "nsRun: set netns %s", ovnHost.ns)
  462. }
  463. return f(ctx)
  464. })
  465. }
  466. func (ovnHost *OvnHost) nsRun_(ctx context.Context, f func(ctx context.Context) error) error {
  467. var (
  468. wg = &sync.WaitGroup{}
  469. err error
  470. )
  471. origNs, err := netns.Get()
  472. if err != nil {
  473. return errors.Wrap(err, "get current net ns")
  474. }
  475. defer origNs.Close()
  476. wg.Add(1)
  477. go func() {
  478. defer wg.Done()
  479. runtime.LockOSThread()
  480. defer runtime.UnlockOSThread()
  481. defer netns.Set(origNs)
  482. err = f(ctx)
  483. }()
  484. wg.Wait()
  485. return err
  486. }
  487. func (ovnHost *OvnHost) Refresh(ctx context.Context, lb *agentmodels.Loadbalancer) error {
  488. cmd := &ovnHostRefreshCmd{
  489. lb: lb,
  490. done: make(chan error),
  491. }
  492. select {
  493. case ovnHost.refreshCh <- cmd:
  494. case <-ctx.Done():
  495. return ctx.Err()
  496. }
  497. select {
  498. case <-ctx.Done():
  499. return ctx.Err()
  500. case err := <-cmd.done:
  501. return err
  502. }
  503. // return nil
  504. }
  505. func (ovnHost *OvnHost) refresh(ctx context.Context, lb *agentmodels.Loadbalancer) error {
  506. // We may allow detch/attach network for loadbalancer in the future
  507. lbnet := lb.LoadbalancerNetwork
  508. if len(lbnet.MacAddr) == 0 {
  509. return errors.Errorf("empty LoadbalancerNetwork MacAddr for lb %s(%s)? mismatch region version???", lb.Name, lb.Id)
  510. }
  511. lb.ListenAddress = ovnHost.vethAddrOuter
  512. ovnHost.lb = lb
  513. ovnHost.macaddr = lbnet.MacAddr
  514. ovnHost.ipaddr = lbnet.IpAddr
  515. ovnHost.masklen = int(lbnet.Network.GuestIpMask)
  516. ovnHost.gateway = lbnet.Network.GuestGateway
  517. backendGroups := lb.BackendGroups
  518. backends := map[string]*agentmodels.LoadbalancerBackend{}
  519. for _, backendGroup := range backendGroups {
  520. for _, backend := range backendGroup.Backends {
  521. backendId := backend.Id
  522. backends[backendId] = backend
  523. }
  524. }
  525. // release first
  526. for backendId, port := range ovnHost.vethAddrInnerPortMap {
  527. if _, ok := backends[backendId]; !ok {
  528. delete(ovnHost.portMap, port)
  529. delete(ovnHost.vethAddrInnerPortMap, backendId)
  530. }
  531. }
  532. // allocate
  533. for _, backend := range backends {
  534. backendId := backend.Id
  535. if p, ok := ovnHost.vethAddrInnerPortMap[backendId]; ok {
  536. backend.ConnectAddress = ovnHost.vethAddrInner
  537. backend.ConnectPort = p
  538. continue
  539. }
  540. p := ovnHost.portIdx
  541. for {
  542. p += 1
  543. p &= 0xffff
  544. if p <= 1024 {
  545. p = 1025
  546. }
  547. if p == ovnHost.portIdx {
  548. return errors.Errorf("no available port for mapping: loadbalancer %s(%s) has too many backends",
  549. lb.Name, lb.Id)
  550. }
  551. if _, ok := ovnHost.portMap[p]; !ok {
  552. ovnHost.portMap[p] = empty{}
  553. ovnHost.vethAddrInnerPortMap[backendId] = p
  554. backend.ConnectAddress = ovnHost.vethAddrInner
  555. backend.ConnectPort = p
  556. break
  557. }
  558. }
  559. ovnHost.portIdx = p
  560. }
  561. return nil
  562. }
  563. type OvnWorker struct {
  564. opts *Options
  565. lbMap map[string]*OvnHost // key loadbalancerId
  566. addrMap map[uint16]*OvnHost // key x.x in 169.254.x.x
  567. addrIdx uint16
  568. Bar *Bar
  569. refreshCh chan *ovnRefreshCmd
  570. }
  571. type ovnRefreshCmd struct {
  572. lbs agentmodels.Loadbalancers
  573. done chan error
  574. }
  575. func NewOvnWorker(opts *Options) *OvnWorker {
  576. ovn := &OvnWorker{
  577. opts: opts,
  578. lbMap: map[string]*OvnHost{},
  579. addrMap: map[uint16]*OvnHost{},
  580. Bar: NewBar(),
  581. refreshCh: make(chan *ovnRefreshCmd),
  582. }
  583. return ovn
  584. }
  585. func (ovn *OvnWorker) Start(ctx context.Context) {
  586. log.Infof("ovn worker started")
  587. ovn.Bar.Init()
  588. defer ovn.Bar.Done()
  589. defer log.Infof("ovn worker bye!")
  590. for {
  591. select {
  592. case cmd := <-ovn.refreshCh:
  593. err := ovn.refresh(ctx, cmd.lbs)
  594. cmd.done <- err
  595. case <-ovn.Bar.Cancelled():
  596. log.Infof("ovn worker stop on cancel signal")
  597. ovn.stop()
  598. return
  599. case <-ctx.Done():
  600. return
  601. }
  602. }
  603. }
  604. func (ovn *OvnWorker) stop() {
  605. for _, o := range ovn.lbMap {
  606. o.Stop()
  607. }
  608. }
  609. func (ovn *OvnWorker) Stop() {
  610. ovn.Bar.Cancel()
  611. }
  612. func (ovn *OvnWorker) findTwoAddrIdx() (uint16, uint16, error) {
  613. // fetch current addresses
  614. sysAddrs := map[uint16]empty{}
  615. {
  616. ifaddrs, err := net.InterfaceAddrs()
  617. if err != nil {
  618. return 0, 0, errors.Wrap(err, "fetch current system unicast addrs")
  619. }
  620. for _, ifaddr := range ifaddrs {
  621. ipaddr, ok := ifaddr.(*net.IPNet)
  622. if !ok {
  623. continue
  624. }
  625. ip := ipaddr.IP
  626. ip4 := ip.To4()
  627. if ip4 == nil {
  628. continue
  629. }
  630. if ip4[0] != 169 && ip4[1] != 254 {
  631. continue
  632. }
  633. i := uint16(ip4[2])<<8 | uint16(ip4[3])
  634. sysAddrs[i] = empty{}
  635. }
  636. }
  637. allocOne := func() (uint16, error) {
  638. i := ovn.addrIdx
  639. for {
  640. i += 1
  641. i &= 0xffff
  642. if i < 100 {
  643. i = 100
  644. }
  645. if i == ovn.addrIdx {
  646. return 0, errors.Error("169.254.x.x addrs run out")
  647. }
  648. if _, ok := sysAddrs[i]; ok {
  649. continue
  650. }
  651. if _, ok := ovn.addrMap[i]; !ok {
  652. break
  653. }
  654. }
  655. ovn.addrIdx = i
  656. return i, nil
  657. }
  658. inner, err := allocOne()
  659. if err != nil {
  660. return 0, 0, errors.Wrap(err, "inner address")
  661. }
  662. outer, err := allocOne()
  663. if err != nil {
  664. return 0, 0, errors.Wrap(err, "outer address")
  665. }
  666. return inner, outer, nil
  667. }
  668. func (ovn *OvnWorker) Refresh(ctx context.Context, lbs agentmodels.Loadbalancers) error {
  669. cmd := &ovnRefreshCmd{
  670. lbs: lbs,
  671. done: make(chan error),
  672. }
  673. select {
  674. case ovn.refreshCh <- cmd:
  675. case <-ctx.Done():
  676. return ctx.Err()
  677. }
  678. select {
  679. case <-ctx.Done():
  680. return ctx.Err()
  681. case err := <-cmd.done:
  682. return err
  683. }
  684. // return nil
  685. }
  686. func (ovn *OvnWorker) refresh(ctx context.Context, lbs agentmodels.Loadbalancers) error {
  687. m := ovn.lbMap
  688. // release deleted
  689. for lbId, ovnHost := range m {
  690. if _, ok := lbs[lbId]; ok {
  691. continue
  692. }
  693. delete(m, lbId)
  694. delete(ovn.addrMap, ovnHost.addrIdxInner)
  695. delete(ovn.addrMap, ovnHost.addrIdxOuter)
  696. ovnHost.Stop()
  697. }
  698. // allocate
  699. for _, lb := range lbs {
  700. if lb.NetworkType != computeapis.LB_NETWORK_TYPE_VPC {
  701. continue
  702. }
  703. if len(lb.LoadbalancerNetwork.MacAddr) == 0 {
  704. log.Errorf("empty LoadbalancerNetwork MacAddr for lb %s(%s)? mismatch region version???", lb.Name, lb.Id)
  705. continue
  706. }
  707. ovnHost, ok := m[lb.Id]
  708. if !ok {
  709. inner, outer, err := ovn.findTwoAddrIdx()
  710. if err != nil {
  711. return errors.Wrapf(err, "find 169.254.x.x for lb %s(%s)", lb.Name, lb.Id)
  712. }
  713. ovnHost = newOvnHost(ovn.opts.OvnIntegrationBridge)
  714. ovnHost.SetAddrIdx(inner, outer)
  715. ovn.addrMap[inner] = ovnHost
  716. ovn.addrMap[outer] = ovnHost
  717. ovn.lbMap[lb.Id] = ovnHost
  718. go ovnHost.Start(ctx)
  719. }
  720. if err := ovnHost.Refresh(ctx, lb); err != nil {
  721. return err
  722. }
  723. }
  724. return nil
  725. }