| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package lbagent
- import (
- "context"
- "crypto/md5"
- "encoding/base64"
- "fmt"
- "io/ioutil"
- "net"
- "os"
- "os/exec"
- "path"
- "runtime"
- "strings"
- "sync"
- "time"
- "github.com/coreos/go-iptables/iptables"
- "github.com/vishvananda/netlink"
- "github.com/vishvananda/netns"
- "golang.org/x/sys/unix"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- computeapis "yunion.io/x/onecloud/pkg/apis/compute"
- agentmodels "yunion.io/x/onecloud/pkg/lbagent/models"
- "yunion.io/x/onecloud/pkg/util/iproute2"
- )
- const (
- ErrOvnService = errors.Error("ovn controller")
- ErrOvnConfig = errors.Error("ovn controller configuration")
- )
- type empty struct{}
- type iptRule struct {
- rule string
- comment string
- // computed
- key string
- }
- type Bar struct {
- ctx context.Context
- cancel context.CancelFunc
- wg *sync.WaitGroup
- }
- func NewBar() *Bar {
- ctx := context.Background()
- ctx, cancel := context.WithCancel(ctx)
- b := &Bar{
- ctx: ctx,
- cancel: cancel,
- wg: &sync.WaitGroup{},
- }
- return b
- }
- func (b *Bar) Init() {
- b.wg.Add(1)
- }
- func (b *Bar) Done() {
- b.wg.Done()
- }
- func (b *Bar) Cancelled() <-chan struct{} {
- return b.ctx.Done()
- }
- func (b *Bar) Cancel() {
- b.cancel()
- b.wg.Wait()
- }
- type OvnHost struct {
- lb *agentmodels.Loadbalancer
- ovnBridge string
- macaddr string
- ipaddr string
- masklen int
- gateway string
- vethAddrInner string
- vethAddrOuter string
- addrIdxInner uint16
- addrIdxOuter uint16
- vethAddrInnerPortMap map[string]int // key backendId
- portMap map[int]empty // key port
- portIdx int
- Bar *Bar
- refreshCh chan *ovnHostRefreshCmd
- ns netns.NsHandle
- }
- type ovnHostRefreshCmd struct {
- lb *agentmodels.Loadbalancer
- done chan error
- }
- func newOvnHost(bridge string) *OvnHost {
- ovnHost := &OvnHost{
- vethAddrInnerPortMap: map[string]int{},
- portMap: map[int]empty{},
- Bar: NewBar(),
- refreshCh: make(chan *ovnHostRefreshCmd),
- ns: -1,
- ovnBridge: bridge,
- }
- return ovnHost
- }
- func (ovnHost *OvnHost) SetAddrIdx(inner, outer uint16) {
- ovnHost.addrIdxInner = inner
- ovnHost.addrIdxOuter = outer
- ovnHost.vethAddrInner = fmt.Sprintf("169.254.%d.%d", (inner>>8)&0xff, inner&0xff)
- ovnHost.vethAddrOuter = fmt.Sprintf("169.254.%d.%d", (outer>>8)&0xff, outer&0xff)
- }
- func (ovnHost *OvnHost) Start(ctx context.Context) {
- ovnHost.Bar.Init()
- defer ovnHost.Bar.Done()
- tick := time.NewTicker(11 * time.Second)
- for {
- select {
- case <-tick.C:
- ovnHost.run(ctx)
- case cmd := <-ovnHost.refreshCh:
- err := ovnHost.refresh(ctx, cmd.lb)
- ovnHost.run(ctx)
- cmd.done <- err
- case <-ovnHost.Bar.Cancelled():
- ovnHost.stop()
- return
- case <-ctx.Done():
- return
- }
- }
- }
- // stop releases resources, cleans up modifications. It's reentrant
- func (ovnHost *OvnHost) stop() {
- ovnHost.cleanUp()
- }
- func (ovnHost *OvnHost) cleanUp() {
- ovnHost.cleanUpNetns()
- ovnHost.cleanUpIface()
- }
- func (ovnHost *OvnHost) cleanUpIface() {
- ovnHost.cleanUpBridge()
- ovnHost.cleanUpVethPair()
- }
- func (ovnHost *OvnHost) cleanUpNetns() {
- netns.DeleteNamed(ovnHost.netnsName())
- if int(ovnHost.ns) >= 0 {
- ovnHost.ns.Close()
- }
- }
- func (ovnHost *OvnHost) cleanUpBridge() {
- bridge := ovnHost.ovnBridge
- _, peer0 := ovnHost.ovnPairName()
- args := []string{
- "ovs-vsctl",
- "--", "--if-exists", "del-port", bridge, peer0,
- }
- var cancelFunc context.CancelFunc
- ctx := context.Background()
- ctx, cancelFunc = context.WithTimeout(ctx, time.Second*7)
- defer cancelFunc()
- cmd := exec.CommandContext(ctx, args[0], args[1:]...)
- _, err := cmd.Output()
- if err != nil {
- err = errors.Wrapf(err, "cleanup %q: %s", bridge, strings.Join(args, " "))
- log.Errorf("%v", err)
- }
- }
- func (ovnHost *OvnHost) cleanUpVethPair() {
- _, peer0 := ovnHost.ovnPairName()
- _, peer1 := ovnHost.haproxyPairName()
- for _, name := range []string{peer0, peer1} {
- link, err := netlink.LinkByName(name)
- if err != nil {
- if _, ok := err.(netlink.LinkNotFoundError); ok {
- continue
- }
- err = errors.Wrapf(err, "ovnHost.stop: LinkByName(%q)", name)
- log.Errorf("%v", err)
- continue
- }
- if err := netlink.LinkDel(link); err != nil {
- err = errors.Wrapf(err, "ovnHost.stop: LinkDel(%q)", name)
- log.Errorf("%v", err)
- }
- }
- }
- func (ovnHost *OvnHost) Stop() {
- ovnHost.Bar.Cancel()
- }
- func (ovnHost *OvnHost) netnsName() string {
- return fmt.Sprintf("lb-%s", ovnHost.lb.Id)
- }
- func (ovnHost *OvnHost) lspName() string {
- return fmt.Sprintf("iface/lb/%s", ovnHost.lb.Id)
- }
- func (ovnHost *OvnHost) vethPairName(p string) (string, string) {
- // - index 0 is for the ovn host side
- // - index 1 is for the outside side (initial namespace)
- pref := fmt.Sprintf("%s%s", p, ovnHost.lb.Id[:15-len(p)-1])
- return pref + "0", pref + "1"
- }
- func (ovnHost *OvnHost) ovnPairName() (string, string) {
- // o is for connecting to ovn virtual network
- return ovnHost.vethPairName("o")
- }
- func (ovnHost *OvnHost) haproxyPairName() (string, string) {
- // h is for use with haproxy
- return ovnHost.vethPairName("h")
- }
- func (ovnHost *OvnHost) run(ctx context.Context) {
- defer func() {
- if msg := recover(); msg != nil {
- log.Errorf("ovn host for loadbalancer %s: %s", ovnHost.lb.Id, msg)
- }
- }()
- // - peer is the one in initial net namespace
- me0, peer0 := ovnHost.ovnPairName()
- me1, peer1 := ovnHost.haproxyPairName()
- ovnHost.ensureNs(ctx)
- ovnHost.ensureCleanNs(ctx, me0, peer0)
- ovnHost.ensureCleanNs(ctx, me1, peer1)
- ovnHost.ensureVethPair(ctx, me0, peer0)
- ovnHost.ensureVethPair(ctx, me1, peer1)
- if err := ovnHost.nsRun(ctx, func(ctx context.Context) error {
- // ovn side
- if err := iproute2.NewLink(me0).Address(ovnHost.macaddr).Up().Err(); err != nil {
- panic(errors.Wrapf(err, "set link %q up", me0))
- }
- if err := iproute2.NewAddress(me0, fmt.Sprintf("%s/%d", ovnHost.ipaddr, ovnHost.masklen)).Exact().Err(); err != nil {
- panic(errors.Wrapf(err, "set address of link %q", me0))
- }
- if err := iproute2.NewRoute(me0).AddByCidr("0.0.0.0/0", ovnHost.gateway).Err(); err != nil {
- panic(errors.Wrapf(err, "add route to inner addr %q", ovnHost.vethAddrInner))
- }
- // haproxy side
- if err := iproute2.NewLink(me1).Up().Err(); err != nil {
- panic(errors.Wrapf(err, "set link %q up", me0))
- }
- if err := iproute2.NewAddress(me1, ovnHost.vethAddrInner+"/16").Exact().Err(); err != nil {
- panic(errors.Wrapf(err, "set address of link %q", me0))
- }
- var (
- // NOTE: make sure arguments do not contain blank chars
- // or such things to avoid bad things like injection
- preRules []iptRule
- postRules []iptRule
- )
- // iptables SNAT rule
- postRules = append(postRules, iptRule{
- rule: fmt.Sprintf("-o %s -j SNAT --to-source %s", me0, ovnHost.ipaddr),
- comment: "snat-ovn",
- })
- postRules = append(postRules, iptRule{
- rule: fmt.Sprintf("-o %s -j SNAT --to-source %s", me1, ovnHost.vethAddrInner),
- comment: "snat-haproxy",
- })
- // iptables listener rule
- for _, listener := range ovnHost.lb.Listeners {
- var proto string
- switch listener.ListenerType {
- case computeapis.LB_LISTENER_TYPE_TCP,
- computeapis.LB_LISTENER_TYPE_HTTP,
- computeapis.LB_LISTENER_TYPE_HTTPS:
- proto = "tcp"
- case computeapis.LB_LISTENER_TYPE_UDP:
- proto = "udp"
- default:
- panic(errors.Errorf("listener %s(%s) protocol: %q", listener.Name, listener.Id, listener.ListenerType))
- }
- preRules = append(preRules, iptRule{
- rule: fmt.Sprintf("-i %s -d %s -p %s --dport %d -j DNAT --to-destination %s:%d",
- me0, ovnHost.ipaddr, proto, listener.ListenerPort, ovnHost.vethAddrOuter, listener.ListenerPort),
- comment: fmt.Sprintf("listener/%s/%s/%d", listener.Id, listener.ListenerType, listener.ListenerPort),
- })
- }
- // iptables backend rule
- for _, backendGroup := range ovnHost.lb.BackendGroups {
- for _, backend := range backendGroup.Backends {
- port, ok := ovnHost.vethAddrInnerPortMap[backend.Id]
- if !ok {
- log.Warningf("cannot find netns port for backend %s(%s) %s:%d",
- backend.Name, backend.Id, backend.Address, backend.Port)
- continue
- }
- for _, proto := range []string{"tcp", "udp"} {
- preRules = append(preRules, iptRule{
- rule: fmt.Sprintf("-i %s -d %s -p %s --dport %d -j DNAT --to-destination %s:%d",
- me1, ovnHost.vethAddrInner, proto, port, backend.Address, backend.Port),
- comment: fmt.Sprintf("backend/%s/%s/%s/%d", backend.Id, proto, backend.Address, backend.Port),
- })
- }
- }
- }
- ovnHost.ensureIptRules(ctx, "nat", "PREROUTING", preRules)
- ovnHost.ensureIptRules(ctx, "nat", "POSTROUTING", postRules)
- return nil
- }); err != nil {
- panic(errors.Wrap(err, "run in netns"))
- }
- { // setup brvpc
- bridge := ovnHost.ovnBridge
- lsp := ovnHost.lspName()
- args := []string{
- "ovs-vsctl",
- "--", "--may-exist", "add-port", bridge, peer0,
- "--", "set", "Interface", peer0, "external_ids:iface-id=" + lsp,
- }
- cmd := exec.CommandContext(ctx, args[0], args[1:]...)
- _, err := cmd.Output()
- if err != nil {
- panic(errors.Wrap(err, strings.Join(args, " ")))
- }
- }
- // set peer links up
- for _, link := range []string{peer0, peer1} {
- if err := iproute2.NewLink(link).Up().Err(); err != nil {
- panic(errors.Wrapf(err, "set link %q up", link))
- }
- }
- // set address, route for peer1 link
- if peer1Addr, err := netlink.ParseAddr(ovnHost.vethAddrOuter + "/16"); err != nil {
- panic(errors.Wrapf(err, "parse addr %q for link %q", ovnHost.vethAddrOuter+"/16", peer1))
- } else {
- peer1Addr.Flags |= unix.IFA_F_NOPREFIXROUTE
- if err := iproute2.NewAddressEx(peer1, peer1Addr).Exact().Err(); err != nil {
- panic(errors.Wrapf(err, "set address of link %q", peer1))
- }
- }
- if err := iproute2.NewRoute(peer1).AddByCidr(ovnHost.vethAddrInner+"/32", "").Err(); err != nil {
- panic(errors.Wrapf(err, "add route to inner addr %q", ovnHost.vethAddrInner))
- }
- }
- func (ovnHost *OvnHost) ensureNs(ctx context.Context) {
- if int(ovnHost.ns) >= 0 {
- return
- }
- netnsName := ovnHost.netnsName()
- {
- p := path.Join("/var/run/netns", netnsName)
- if _, err := ioutil.ReadFile(p); err == nil {
- err := os.Remove(p)
- log.Warningf("removing possible leftover file: %s: %v", p, err)
- }
- }
- if ns, err := netns.GetFromName(netnsName); err == nil {
- ovnHost.ns = ns
- return
- }
- if err := ovnHost.nsRun_(ctx, func(ctx context.Context) error {
- var err error
- ovnHost.ns, err = netns.NewNamed(netnsName)
- if err != nil {
- return errors.Wrapf(err, "new netns")
- }
- return nil
- }); err != nil {
- panic(err)
- }
- }
- func (ovnHost *OvnHost) ensureCleanNs(ctx context.Context, me, peer string) {
- var meOk, peerOk bool
- ovnHost.nsRun(ctx, func(ctx context.Context) error {
- _, err := netlink.LinkByName(me)
- if err == nil {
- meOk = true
- }
- return nil
- })
- _, err := netlink.LinkByName(peer)
- if err == nil {
- peerOk = true
- }
- if !meOk && peerOk {
- log.Warningf("clean up iface because %q %v, %q %v", me, meOk, peer, peerOk)
- ovnHost.cleanUpIface()
- }
- }
- func (ovnHost *OvnHost) ensureVethPair(ctx context.Context, me, peer string) {
- link, err := netlink.LinkByName(peer)
- if err != nil {
- if _, ok := err.(netlink.LinkNotFoundError); !ok {
- panic(errors.Wrapf(err, "veth: LinkByName(%q)", peer))
- }
- } else if typ := link.Type(); typ != "veth" {
- panic(errors.Wrapf(err, "veth: LinkByName(%q) found but of type %s, not veth", peer, typ))
- } else {
- return
- }
- veth := &netlink.Veth{}
- veth.Name = me
- veth.PeerName = peer
- if err := netlink.LinkAdd(veth); err != nil {
- panic(errors.Wrapf(err, "add veth pair %q, %q", me, peer))
- }
- if err = netlink.LinkSetNsFd(veth, int(ovnHost.ns)); err != nil {
- panic(errors.Wrapf(err, "set ns of link %q", veth.Name))
- }
- }
- func (ovnHost *OvnHost) ensureIptRules(ctx context.Context, tbl, chain string, iptRules []iptRule) {
- ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
- if err != nil {
- panic(errors.Wrap(err, "ipt client"))
- }
- oldLines, err := ipt.List(tbl, chain)
- if err != nil {
- panic(errors.Wrapf(err, "ipt list tbl %q chain %q", tbl, chain))
- }
- for i := len(iptRules) - 1; i >= 0; i-- {
- iptRule := &iptRules[i]
- h := md5.Sum([]byte(iptRule.rule + iptRule.comment))
- key := base64.RawStdEncoding.EncodeToString(h[:])
- iptRule.key = key
- for j := len(oldLines) - 1; j >= 0; j-- {
- line := oldLines[j]
- if strings.Contains(line, key) {
- // found a match
- iptRules = append(iptRules[:i], iptRules[i+1:]...)
- oldLines = append(oldLines[:j], oldLines[j+1:]...)
- break
- }
- }
- }
- // cleanup old lines
- for _, line := range oldLines {
- if !strings.HasPrefix(line, "-A ") {
- continue
- }
- args := strings.Fields(line)
- if len(args) < 2 {
- continue
- }
- args = args[2:]
- for i, arg := range args {
- if len(arg) >= 2 && strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) {
- args[i] = arg[1 : len(arg)-1]
- }
- }
- if err := ipt.Delete(tbl, chain, args...); err != nil {
- panic(errors.Wrapf(err, "ipt delete %q %q: %q", tbl, chain, line))
- }
- }
- // add missing ones
- for i := range iptRules {
- iptRule := &iptRules[i]
- rule := fmt.Sprintf("-m comment --comment %s:%s ", iptRule.key, iptRule.comment)
- rule += iptRule.rule
- args := strings.Fields(rule)
- if err := ipt.Append(tbl, chain, args...); err != nil {
- panic(errors.Wrapf(err, "ipt append %q %q: %q", tbl, chain, rule))
- }
- }
- }
- func (ovnHost *OvnHost) nsRun(ctx context.Context, f func(ctx context.Context) error) error {
- return ovnHost.nsRun_(ctx, func(ctx context.Context) error {
- if err := netns.Set(ovnHost.ns); err != nil {
- return errors.Wrapf(err, "nsRun: set netns %s", ovnHost.ns)
- }
- return f(ctx)
- })
- }
- func (ovnHost *OvnHost) nsRun_(ctx context.Context, f func(ctx context.Context) error) error {
- var (
- wg = &sync.WaitGroup{}
- err error
- )
- origNs, err := netns.Get()
- if err != nil {
- return errors.Wrap(err, "get current net ns")
- }
- defer origNs.Close()
- wg.Add(1)
- go func() {
- defer wg.Done()
- runtime.LockOSThread()
- defer runtime.UnlockOSThread()
- defer netns.Set(origNs)
- err = f(ctx)
- }()
- wg.Wait()
- return err
- }
- func (ovnHost *OvnHost) Refresh(ctx context.Context, lb *agentmodels.Loadbalancer) error {
- cmd := &ovnHostRefreshCmd{
- lb: lb,
- done: make(chan error),
- }
- select {
- case ovnHost.refreshCh <- cmd:
- case <-ctx.Done():
- return ctx.Err()
- }
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-cmd.done:
- return err
- }
- // return nil
- }
- func (ovnHost *OvnHost) refresh(ctx context.Context, lb *agentmodels.Loadbalancer) error {
- // We may allow detch/attach network for loadbalancer in the future
- lbnet := lb.LoadbalancerNetwork
- if len(lbnet.MacAddr) == 0 {
- return errors.Errorf("empty LoadbalancerNetwork MacAddr for lb %s(%s)? mismatch region version???", lb.Name, lb.Id)
- }
- lb.ListenAddress = ovnHost.vethAddrOuter
- ovnHost.lb = lb
- ovnHost.macaddr = lbnet.MacAddr
- ovnHost.ipaddr = lbnet.IpAddr
- ovnHost.masklen = int(lbnet.Network.GuestIpMask)
- ovnHost.gateway = lbnet.Network.GuestGateway
- backendGroups := lb.BackendGroups
- backends := map[string]*agentmodels.LoadbalancerBackend{}
- for _, backendGroup := range backendGroups {
- for _, backend := range backendGroup.Backends {
- backendId := backend.Id
- backends[backendId] = backend
- }
- }
- // release first
- for backendId, port := range ovnHost.vethAddrInnerPortMap {
- if _, ok := backends[backendId]; !ok {
- delete(ovnHost.portMap, port)
- delete(ovnHost.vethAddrInnerPortMap, backendId)
- }
- }
- // allocate
- for _, backend := range backends {
- backendId := backend.Id
- if p, ok := ovnHost.vethAddrInnerPortMap[backendId]; ok {
- backend.ConnectAddress = ovnHost.vethAddrInner
- backend.ConnectPort = p
- continue
- }
- p := ovnHost.portIdx
- for {
- p += 1
- p &= 0xffff
- if p <= 1024 {
- p = 1025
- }
- if p == ovnHost.portIdx {
- return errors.Errorf("no available port for mapping: loadbalancer %s(%s) has too many backends",
- lb.Name, lb.Id)
- }
- if _, ok := ovnHost.portMap[p]; !ok {
- ovnHost.portMap[p] = empty{}
- ovnHost.vethAddrInnerPortMap[backendId] = p
- backend.ConnectAddress = ovnHost.vethAddrInner
- backend.ConnectPort = p
- break
- }
- }
- ovnHost.portIdx = p
- }
- return nil
- }
- type OvnWorker struct {
- opts *Options
- lbMap map[string]*OvnHost // key loadbalancerId
- addrMap map[uint16]*OvnHost // key x.x in 169.254.x.x
- addrIdx uint16
- Bar *Bar
- refreshCh chan *ovnRefreshCmd
- }
- type ovnRefreshCmd struct {
- lbs agentmodels.Loadbalancers
- done chan error
- }
- func NewOvnWorker(opts *Options) *OvnWorker {
- ovn := &OvnWorker{
- opts: opts,
- lbMap: map[string]*OvnHost{},
- addrMap: map[uint16]*OvnHost{},
- Bar: NewBar(),
- refreshCh: make(chan *ovnRefreshCmd),
- }
- return ovn
- }
- func (ovn *OvnWorker) Start(ctx context.Context) {
- log.Infof("ovn worker started")
- ovn.Bar.Init()
- defer ovn.Bar.Done()
- defer log.Infof("ovn worker bye!")
- for {
- select {
- case cmd := <-ovn.refreshCh:
- err := ovn.refresh(ctx, cmd.lbs)
- cmd.done <- err
- case <-ovn.Bar.Cancelled():
- log.Infof("ovn worker stop on cancel signal")
- ovn.stop()
- return
- case <-ctx.Done():
- return
- }
- }
- }
- func (ovn *OvnWorker) stop() {
- for _, o := range ovn.lbMap {
- o.Stop()
- }
- }
- func (ovn *OvnWorker) Stop() {
- ovn.Bar.Cancel()
- }
- func (ovn *OvnWorker) findTwoAddrIdx() (uint16, uint16, error) {
- // fetch current addresses
- sysAddrs := map[uint16]empty{}
- {
- ifaddrs, err := net.InterfaceAddrs()
- if err != nil {
- return 0, 0, errors.Wrap(err, "fetch current system unicast addrs")
- }
- for _, ifaddr := range ifaddrs {
- ipaddr, ok := ifaddr.(*net.IPNet)
- if !ok {
- continue
- }
- ip := ipaddr.IP
- ip4 := ip.To4()
- if ip4 == nil {
- continue
- }
- if ip4[0] != 169 && ip4[1] != 254 {
- continue
- }
- i := uint16(ip4[2])<<8 | uint16(ip4[3])
- sysAddrs[i] = empty{}
- }
- }
- allocOne := func() (uint16, error) {
- i := ovn.addrIdx
- for {
- i += 1
- i &= 0xffff
- if i < 100 {
- i = 100
- }
- if i == ovn.addrIdx {
- return 0, errors.Error("169.254.x.x addrs run out")
- }
- if _, ok := sysAddrs[i]; ok {
- continue
- }
- if _, ok := ovn.addrMap[i]; !ok {
- break
- }
- }
- ovn.addrIdx = i
- return i, nil
- }
- inner, err := allocOne()
- if err != nil {
- return 0, 0, errors.Wrap(err, "inner address")
- }
- outer, err := allocOne()
- if err != nil {
- return 0, 0, errors.Wrap(err, "outer address")
- }
- return inner, outer, nil
- }
- func (ovn *OvnWorker) Refresh(ctx context.Context, lbs agentmodels.Loadbalancers) error {
- cmd := &ovnRefreshCmd{
- lbs: lbs,
- done: make(chan error),
- }
- select {
- case ovn.refreshCh <- cmd:
- case <-ctx.Done():
- return ctx.Err()
- }
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-cmd.done:
- return err
- }
- // return nil
- }
- func (ovn *OvnWorker) refresh(ctx context.Context, lbs agentmodels.Loadbalancers) error {
- m := ovn.lbMap
- // release deleted
- for lbId, ovnHost := range m {
- if _, ok := lbs[lbId]; ok {
- continue
- }
- delete(m, lbId)
- delete(ovn.addrMap, ovnHost.addrIdxInner)
- delete(ovn.addrMap, ovnHost.addrIdxOuter)
- ovnHost.Stop()
- }
- // allocate
- for _, lb := range lbs {
- if lb.NetworkType != computeapis.LB_NETWORK_TYPE_VPC {
- continue
- }
- if len(lb.LoadbalancerNetwork.MacAddr) == 0 {
- log.Errorf("empty LoadbalancerNetwork MacAddr for lb %s(%s)? mismatch region version???", lb.Name, lb.Id)
- continue
- }
- ovnHost, ok := m[lb.Id]
- if !ok {
- inner, outer, err := ovn.findTwoAddrIdx()
- if err != nil {
- return errors.Wrapf(err, "find 169.254.x.x for lb %s(%s)", lb.Name, lb.Id)
- }
- ovnHost = newOvnHost(ovn.opts.OvnIntegrationBridge)
- ovnHost.SetAddrIdx(inner, outer)
- ovn.addrMap[inner] = ovnHost
- ovn.addrMap[outer] = ovnHost
- ovn.lbMap[lb.Id] = ovnHost
- go ovnHost.Start(ctx)
- }
- if err := ovnHost.Refresh(ctx, lb); err != nil {
- return err
- }
- }
- return nil
- }
|