worker.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ovn
  15. import (
  16. "context"
  17. "runtime"
  18. "runtime/debug"
  19. "sync"
  20. "time"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/util/httputils"
  24. "yunion.io/x/onecloud/pkg/apihelper"
  25. apis "yunion.io/x/onecloud/pkg/apis/compute"
  26. "yunion.io/x/onecloud/pkg/appsrv"
  27. "yunion.io/x/onecloud/pkg/mcclient/auth"
  28. mcclient_modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  29. "yunion.io/x/onecloud/pkg/util/ovsutils"
  30. agentmodels "yunion.io/x/onecloud/pkg/vpcagent/models"
  31. "yunion.io/x/onecloud/pkg/vpcagent/options"
  32. "yunion.io/x/onecloud/pkg/vpcagent/ovnutil"
  33. "yunion.io/x/onecloud/pkg/vpcagent/worker"
  34. )
  35. type Worker struct {
  36. opts *options.Options
  37. apih *apihelper.APIHelper
  38. }
  39. func NewWorker(opts *options.Options) worker.IWorker {
  40. modelSets := agentmodels.NewModelSets()
  41. apiOpts := &apihelper.Options{
  42. CommonOptions: opts.CommonOptions,
  43. SyncIntervalSeconds: opts.APISyncIntervalSeconds,
  44. RunDelayMilliseconds: opts.APIRunDelayMilliseconds,
  45. ListBatchSize: opts.APIListBatchSize,
  46. FetchFromComputeService: opts.FetchDataFromComputeService,
  47. IncludeDetails: false,
  48. IncludeOtherCloudEnv: false,
  49. }
  50. apih, err := apihelper.NewAPIHelper(apiOpts, modelSets)
  51. if err != nil {
  52. return nil
  53. }
  54. w := &Worker{
  55. opts: opts,
  56. apih: apih,
  57. }
  58. return w
  59. }
  60. func (w *Worker) Start(ctx context.Context, app *appsrv.Application, prefix string) {
  61. wg := ctx.Value("wg").(*sync.WaitGroup)
  62. defer func() {
  63. log.Infoln("ovn: worker bye")
  64. wg.Done()
  65. }()
  66. wg.Add(1)
  67. go w.apih.Start(ctx, app, httputils.JoinPath(prefix, "api"))
  68. tickDuration := time.Duration(w.opts.OvnWorkerCheckInterval) * time.Second
  69. tick := time.NewTimer(tickDuration)
  70. defer tick.Stop()
  71. var mss *agentmodels.ModelSets
  72. for {
  73. select {
  74. case imss := <-w.apih.ModelSets():
  75. log.Infof("ovn: got new data from api helper")
  76. mss = imss.(*agentmodels.ModelSets)
  77. if err := w.run(ctx, mss); err != nil {
  78. log.Errorf("ovn: %v", err)
  79. }
  80. case <-tick.C:
  81. if mss != nil {
  82. log.Infof("ovn: tick check")
  83. if err := w.run(ctx, mss); err != nil {
  84. log.Errorf("ovn: %v", err)
  85. }
  86. }
  87. tick.Reset(tickDuration)
  88. case <-ctx.Done():
  89. return
  90. }
  91. }
  92. }
  93. func (w *Worker) run(ctx context.Context, mss *agentmodels.ModelSets) (err error) {
  94. defer func() {
  95. if panicVal := recover(); panicVal != nil {
  96. if panicErr, ok := panicVal.(runtime.Error); ok {
  97. err = errors.Wrap(panicErr, string(debug.Stack()))
  98. } else if panicErr, ok := panicVal.(error); ok {
  99. err = panicErr
  100. } else {
  101. panic(panicVal)
  102. }
  103. }
  104. }()
  105. dbUrl := w.opts.OvnNorthDatabase
  106. if db, err := ovsutils.NormalizeDbHost(dbUrl); err != nil {
  107. return err
  108. } else {
  109. dbUrl = db
  110. }
  111. log.Infof("ovn: connect to ovn north database %s", dbUrl)
  112. ovnnbctl := ovnutil.NewOvnNbCtl(dbUrl)
  113. ovndb, err := DumpOVNNorthbound(ctx, ovnnbctl)
  114. if err != nil {
  115. return err
  116. }
  117. ovndb.Mark(ctx)
  118. for _, vpc := range mss.Vpcs {
  119. if vpc.Id == apis.DEFAULT_VPC_ID {
  120. continue
  121. }
  122. ovndb.ClaimVpc(ctx, vpc)
  123. if vpcHasEipgw(vpc) {
  124. ovndb.ClaimVpcEipgw(ctx, vpc)
  125. }
  126. for _, network := range vpc.Networks {
  127. ovndb.ClaimNetwork(ctx, network, w.opts)
  128. for _, guestnetwork := range network.Guestnetworks {
  129. if guestnetwork.Guest == nil {
  130. continue
  131. }
  132. if vpcHasDistgw(vpc) {
  133. var (
  134. guest = guestnetwork.Guest
  135. network = guestnetwork.Network
  136. vpc = network.Vpc
  137. host = guest.Host
  138. )
  139. if host.OvnVersion == "" {
  140. // Just in case. This should never happen
  141. log.Errorf("host %s(%s) of vpc guestnetwork (%s,%s) has no ovn support",
  142. host.Id, host.Name, guestnetwork.NetworkId, guestnetwork.IpAddr)
  143. continue
  144. }
  145. if host.OvnMappedIpAddr == "" {
  146. // trigger ovn mapped ip addr allocation
  147. // apiVersion := "v2"
  148. s := auth.GetAdminSession(ctx, w.opts.Region)
  149. j, err := mcclient_modules.Hosts.Update(s, host.Id, nil)
  150. if err != nil {
  151. log.Errorf("host %s(%s) dummy update err: %v", host.Id, host.Name, err)
  152. continue
  153. }
  154. j.Unmarshal(host) // update local copy in place
  155. if host.OvnMappedIpAddr == "" {
  156. log.Errorf("host %s(%s) has no mapped addr", host.Id, host.Name)
  157. continue
  158. }
  159. }
  160. ovndb.ClaimVpcHost(ctx, vpc, host)
  161. }
  162. ovndb.ClaimGuestnetwork(ctx, guestnetwork, w.opts)
  163. }
  164. for _, groupnetwork := range network.Groupnetworks {
  165. ovndb.ClaimGroupnetwork(ctx, groupnetwork)
  166. }
  167. for _, loadbalancerNetwork := range network.LoadbalancerNetworks {
  168. ovndb.ClaimLoadbalancerNetwork(ctx, loadbalancerNetwork)
  169. }
  170. }
  171. routes := resolveRoutes(vpc, mss)
  172. ovndb.ClaimRoutes(ctx, vpc, routes)
  173. }
  174. for _, vpc := range mss.Vpcs {
  175. if vpc.Id == apis.DEFAULT_VPC_ID {
  176. continue
  177. }
  178. ovndb.ClaimVpcGuestDnsRecords(ctx, vpc)
  179. }
  180. ovndb.ClaimDnsRecords(ctx, mss.Vpcs, mss.DnsRecords)
  181. ovndb.Sweep(ctx)
  182. return nil
  183. }