| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package ovn
- import (
- "context"
- "runtime"
- "runtime/debug"
- "sync"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/onecloud/pkg/apihelper"
- apis "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/appsrv"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- mcclient_modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- "yunion.io/x/onecloud/pkg/util/ovsutils"
- agentmodels "yunion.io/x/onecloud/pkg/vpcagent/models"
- "yunion.io/x/onecloud/pkg/vpcagent/options"
- "yunion.io/x/onecloud/pkg/vpcagent/ovnutil"
- "yunion.io/x/onecloud/pkg/vpcagent/worker"
- )
- type Worker struct {
- opts *options.Options
- apih *apihelper.APIHelper
- }
- func NewWorker(opts *options.Options) worker.IWorker {
- modelSets := agentmodels.NewModelSets()
- apiOpts := &apihelper.Options{
- CommonOptions: opts.CommonOptions,
- SyncIntervalSeconds: opts.APISyncIntervalSeconds,
- RunDelayMilliseconds: opts.APIRunDelayMilliseconds,
- ListBatchSize: opts.APIListBatchSize,
- FetchFromComputeService: opts.FetchDataFromComputeService,
- IncludeDetails: false,
- IncludeOtherCloudEnv: false,
- }
- apih, err := apihelper.NewAPIHelper(apiOpts, modelSets)
- if err != nil {
- return nil
- }
- w := &Worker{
- opts: opts,
- apih: apih,
- }
- return w
- }
- func (w *Worker) Start(ctx context.Context, app *appsrv.Application, prefix string) {
- wg := ctx.Value("wg").(*sync.WaitGroup)
- defer func() {
- log.Infoln("ovn: worker bye")
- wg.Done()
- }()
- wg.Add(1)
- go w.apih.Start(ctx, app, httputils.JoinPath(prefix, "api"))
- tickDuration := time.Duration(w.opts.OvnWorkerCheckInterval) * time.Second
- tick := time.NewTimer(tickDuration)
- defer tick.Stop()
- var mss *agentmodels.ModelSets
- for {
- select {
- case imss := <-w.apih.ModelSets():
- log.Infof("ovn: got new data from api helper")
- mss = imss.(*agentmodels.ModelSets)
- if err := w.run(ctx, mss); err != nil {
- log.Errorf("ovn: %v", err)
- }
- case <-tick.C:
- if mss != nil {
- log.Infof("ovn: tick check")
- if err := w.run(ctx, mss); err != nil {
- log.Errorf("ovn: %v", err)
- }
- }
- tick.Reset(tickDuration)
- case <-ctx.Done():
- return
- }
- }
- }
- func (w *Worker) run(ctx context.Context, mss *agentmodels.ModelSets) (err error) {
- defer func() {
- if panicVal := recover(); panicVal != nil {
- if panicErr, ok := panicVal.(runtime.Error); ok {
- err = errors.Wrap(panicErr, string(debug.Stack()))
- } else if panicErr, ok := panicVal.(error); ok {
- err = panicErr
- } else {
- panic(panicVal)
- }
- }
- }()
- dbUrl := w.opts.OvnNorthDatabase
- if db, err := ovsutils.NormalizeDbHost(dbUrl); err != nil {
- return err
- } else {
- dbUrl = db
- }
- log.Infof("ovn: connect to ovn north database %s", dbUrl)
- ovnnbctl := ovnutil.NewOvnNbCtl(dbUrl)
- ovndb, err := DumpOVNNorthbound(ctx, ovnnbctl)
- if err != nil {
- return err
- }
- ovndb.Mark(ctx)
- for _, vpc := range mss.Vpcs {
- if vpc.Id == apis.DEFAULT_VPC_ID {
- continue
- }
- ovndb.ClaimVpc(ctx, vpc)
- if vpcHasEipgw(vpc) {
- ovndb.ClaimVpcEipgw(ctx, vpc)
- }
- for _, network := range vpc.Networks {
- ovndb.ClaimNetwork(ctx, network, w.opts)
- for _, guestnetwork := range network.Guestnetworks {
- if guestnetwork.Guest == nil {
- continue
- }
- if vpcHasDistgw(vpc) {
- var (
- guest = guestnetwork.Guest
- network = guestnetwork.Network
- vpc = network.Vpc
- host = guest.Host
- )
- if host.OvnVersion == "" {
- // Just in case. This should never happen
- log.Errorf("host %s(%s) of vpc guestnetwork (%s,%s) has no ovn support",
- host.Id, host.Name, guestnetwork.NetworkId, guestnetwork.IpAddr)
- continue
- }
- if host.OvnMappedIpAddr == "" {
- // trigger ovn mapped ip addr allocation
- // apiVersion := "v2"
- s := auth.GetAdminSession(ctx, w.opts.Region)
- j, err := mcclient_modules.Hosts.Update(s, host.Id, nil)
- if err != nil {
- log.Errorf("host %s(%s) dummy update err: %v", host.Id, host.Name, err)
- continue
- }
- j.Unmarshal(host) // update local copy in place
- if host.OvnMappedIpAddr == "" {
- log.Errorf("host %s(%s) has no mapped addr", host.Id, host.Name)
- continue
- }
- }
- ovndb.ClaimVpcHost(ctx, vpc, host)
- }
- ovndb.ClaimGuestnetwork(ctx, guestnetwork, w.opts)
- }
- for _, groupnetwork := range network.Groupnetworks {
- ovndb.ClaimGroupnetwork(ctx, groupnetwork)
- }
- for _, loadbalancerNetwork := range network.LoadbalancerNetworks {
- ovndb.ClaimLoadbalancerNetwork(ctx, loadbalancerNetwork)
- }
- }
- routes := resolveRoutes(vpc, mss)
- ovndb.ClaimRoutes(ctx, vpc, routes)
- }
- for _, vpc := range mss.Vpcs {
- if vpc.Id == apis.DEFAULT_VPC_ID {
- continue
- }
- ovndb.ClaimVpcGuestDnsRecords(ctx, vpc)
- }
- ovndb.ClaimDnsRecords(ctx, mss.Vpcs, mss.DnsRecords)
- ovndb.Sweep(ctx)
- return nil
- }
|