loadbalanceragents.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "database/sql"
  18. "encoding/base64"
  19. "net/url"
  20. "reflect"
  21. "text/template"
  22. "time"
  23. "yunion.io/x/jsonutils"
  24. "yunion.io/x/log"
  25. "yunion.io/x/pkg/errors"
  26. "yunion.io/x/pkg/gotypes"
  27. "yunion.io/x/sqlchemy"
  28. "yunion.io/x/onecloud/pkg/apis"
  29. api "yunion.io/x/onecloud/pkg/apis/compute"
  30. identity_apis "yunion.io/x/onecloud/pkg/apis/identity"
  31. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  32. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  33. "yunion.io/x/onecloud/pkg/cloudcommon/tsdb"
  34. "yunion.io/x/onecloud/pkg/cloudcommon/validators"
  35. "yunion.io/x/onecloud/pkg/compute/options"
  36. "yunion.io/x/onecloud/pkg/httperrors"
  37. "yunion.io/x/onecloud/pkg/mcclient"
  38. "yunion.io/x/onecloud/pkg/mcclient/auth"
  39. "yunion.io/x/onecloud/pkg/util/logclient"
  40. "yunion.io/x/onecloud/pkg/util/stringutils2"
  41. )
  42. // +onecloud:swagger-gen-model-singular=loadbalanceragent
  43. // +onecloud:swagger-gen-model-plural=loadbalanceragents
  44. type SLoadbalancerAgentManager struct {
  45. SLoadbalancerLogSkipper
  46. db.SStandaloneResourceBaseManager
  47. SLoadbalancerClusterResourceBaseManager
  48. }
  49. var LoadbalancerAgentManager *SLoadbalancerAgentManager
  50. func init() {
  51. gotypes.RegisterSerializable(reflect.TypeOf(&SLoadbalancerAgentParams{}), func() gotypes.ISerializable {
  52. return &SLoadbalancerAgentParams{}
  53. })
  54. LoadbalancerAgentManager = &SLoadbalancerAgentManager{
  55. SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
  56. SLoadbalancerAgent{},
  57. "loadbalanceragents_tbl",
  58. "loadbalanceragent",
  59. "loadbalanceragents",
  60. ),
  61. }
  62. LoadbalancerAgentManager.SetVirtualObject(LoadbalancerAgentManager)
  63. }
  64. // TODO
  65. //
  66. // - scrub stale backends: Guests with deleted=1
  67. // - agent configuration params
  68. type SLoadbalancerAgent struct {
  69. db.SStandaloneResourceBase
  70. SLoadbalancerClusterResourceBase `width:"36" charset:"ascii" nullable:"true" list:"user" update:"admin"`
  71. Version string `width:"64" nullable:"true" list:"admin" create:"required" update:"admin"`
  72. IP string `width:"32" charset:"ascii" nullable:"true" list:"admin" create:"required"`
  73. Interface string `width:"17" charset:"ascii" nullable:"true" list:"admin" create:"required" update:"admin"`
  74. Priority int `nullable:"true" list:"user" update:"admin"`
  75. HaState string `width:"32" nullable:"true" list:"admin" update:"admin" default:"UNKNOWN"` // LB_HA_STATE_UNKNOWN
  76. HbLastSeen time.Time `nullable:"true" list:"admin" update:"admin"`
  77. HbTimeout int `nullable:"true" list:"admin" update:"admin" create:"optional" default:"3600"`
  78. Params *SLoadbalancerAgentParams `create:"optional" list:"admin" get:"admin"`
  79. Networks time.Time `nullable:"true" list:"admin" update:"admin"`
  80. LoadbalancerNetworks time.Time `nullable:"true" list:"admin" update:"admin"`
  81. Loadbalancers time.Time `nullable:"true" list:"admin" update:"admin"`
  82. LoadbalancerListeners time.Time `nullable:"true" list:"admin" update:"admin"`
  83. LoadbalancerListenerRules time.Time `nullable:"true" list:"admin" update:"admin"`
  84. LoadbalancerBackendGroups time.Time `nullable:"true" list:"admin" update:"admin"`
  85. LoadbalancerBackends time.Time `nullable:"true" list:"admin" update:"admin"`
  86. LoadbalancerAcls time.Time `nullable:"true" list:"admin" update:"admin"`
  87. LoadbalancerCertificates time.Time `nullable:"true" list:"admin" update:"admin"`
  88. Deployment *SLoadbalancerAgentDeployment `create:"optional" list:"admin" get:"admin"`
  89. // ClusterId string `width:"36" charset:"ascii" nullable:"false" list:"user" create:"required"`
  90. }
  91. type SLoadbalancerAgentParamsVrrp struct {
  92. SLoadbalancerClusterParams
  93. Priority int `json:",omitzero"`
  94. Interface string
  95. }
  96. const (
  97. lbagentVrrpDefaultVrid = 17
  98. lbagentVrrpDefaultPrio = 100
  99. )
  100. type SLoadbalancerAgentParamsHaproxy struct {
  101. GlobalLog string
  102. GlobalNbthread int `json:",omitzero"`
  103. LogHttp bool
  104. LogTcp bool
  105. LogNormal bool
  106. TuneHttpMaxhdr int `json:",omitzero"`
  107. }
  108. type SLoadbalancerAgentParamsTelegraf struct {
  109. InfluxDbOutputUrl string
  110. InfluxDbOutputName string
  111. InfluxDbOutputUnsafeSsl bool
  112. HaproxyInputInterval int `json:",omitzero"`
  113. }
  114. type SLoadbalancerAgentParams struct {
  115. KeepalivedConfTmpl string
  116. HaproxyConfTmpl string
  117. TelegrafConfTmpl string
  118. Vrrp SLoadbalancerAgentParamsVrrp
  119. Haproxy SLoadbalancerAgentParamsHaproxy
  120. Telegraf SLoadbalancerAgentParamsTelegraf
  121. }
  122. func (p *SLoadbalancerAgentParamsVrrp) Validate(data *jsonutils.JSONDict) error {
  123. /*if len(p.Interface) == 0 || len(p.Interface) > 16 {
  124. // TODO printable exclude white space
  125. return httperrors.NewInputParameterError("invalid vrrp interface %q", p.Interface)
  126. }
  127. if len(p.Pass) == 0 || len(p.Pass) > 8 {
  128. // TODO printable exclude white space
  129. return httperrors.NewInputParameterError("invalid vrrp authentication pass size: %d, want [1,8]", len(p.Pass))
  130. }
  131. if p.Priority < 1 || p.Priority > 255 {
  132. return httperrors.NewInputParameterError("invalid vrrp priority %d: want [1,255]", p.Priority)
  133. }
  134. if p.VirtualRouterId < 1 || p.VirtualRouterId > 255 {
  135. return httperrors.NewInputParameterError("invalid vrrp virtual_router_id %d: want [1,255]", p.VirtualRouterId)
  136. }
  137. if p.AdvertInt < 1 || p.AdvertInt > 255 {
  138. return httperrors.NewInputParameterError("invalid vrrp advert_int %d: want [1,255]", p.AdvertInt)
  139. }*/
  140. return nil
  141. }
  142. /*
  143. func (p *SLoadbalancerAgentParamsVrrp) validatePeer(pp *SLoadbalancerAgentParamsVrrp) error {
  144. if p.Priority == pp.Priority {
  145. return fmt.Errorf("vrrp priority of peer lbagents must be different, got %d", p.Priority)
  146. }
  147. return nil
  148. }
  149. func (p *SLoadbalancerAgentParamsVrrp) setByPeer(pp *SLoadbalancerAgentParamsVrrp) {
  150. p.VirtualRouterId = pp.VirtualRouterId
  151. p.AdvertInt = pp.AdvertInt
  152. p.Preempt = pp.Preempt
  153. p.Pass = pp.Pass
  154. }
  155. func (p *SLoadbalancerAgentParamsVrrp) needsUpdatePeer(pp *SLoadbalancerAgentParamsVrrp) bool {
  156. // properties no need to check: Priority
  157. if p.VirtualRouterId != pp.VirtualRouterId {
  158. return true
  159. }
  160. if p.AdvertInt != pp.AdvertInt {
  161. return true
  162. }
  163. if p.Preempt != pp.Preempt {
  164. return true
  165. }
  166. if p.Pass != pp.Pass {
  167. return true
  168. }
  169. return false
  170. }
  171. func (p *SLoadbalancerAgentParamsVrrp) updateBy(pp *SLoadbalancerAgentParamsVrrp) {
  172. p.VirtualRouterId = pp.VirtualRouterId
  173. p.AdvertInt = pp.AdvertInt
  174. p.Preempt = pp.Preempt
  175. p.Pass = pp.Pass
  176. }
  177. func (p *SLoadbalancerAgentParamsVrrp) initDefault(data *jsonutils.JSONDict) {
  178. if !data.Contains("params", "vrrp", "interface") {
  179. p.Interface = "eth0"
  180. }
  181. if !data.Contains("params", "vrrp", "virtual_router_id") {
  182. p.VirtualRouterId = lbagentVrrpDefaultVrid
  183. }
  184. if !data.Contains("params", "vrrp", "advert_int") {
  185. p.AdvertInt = 5
  186. }
  187. if !data.Contains("params", "vrrp", "garp_master_refresh") {
  188. p.GarpMasterRefresh = 27
  189. }
  190. if !data.Contains("params", "vrrp", "pass") {
  191. p.Pass = "YunionLB"
  192. }
  193. }*/
  194. func (p *SLoadbalancerAgentParamsHaproxy) Validate(data *jsonutils.JSONDict) error {
  195. if p.GlobalNbthread < 1 {
  196. p.GlobalNbthread = 1
  197. }
  198. if p.GlobalNbthread > 64 {
  199. // This is a limit imposed by haproxy and arch word size
  200. p.GlobalNbthread = 64
  201. }
  202. if p.TuneHttpMaxhdr < 0 {
  203. p.TuneHttpMaxhdr = 0
  204. }
  205. if p.TuneHttpMaxhdr > 32767 {
  206. p.TuneHttpMaxhdr = 32767
  207. }
  208. return nil
  209. }
  210. func (p *SLoadbalancerAgentParamsHaproxy) needsUpdatePeer(pp *SLoadbalancerAgentParamsHaproxy) bool {
  211. return *p != *pp
  212. }
  213. func (p *SLoadbalancerAgentParamsHaproxy) updateBy(pp *SLoadbalancerAgentParamsHaproxy) {
  214. *p = *pp
  215. }
  216. func (p *SLoadbalancerAgentParamsHaproxy) initDefault(data *jsonutils.JSONDict) {
  217. if !data.Contains("params", "haproxy", "global_nbthread") {
  218. p.GlobalNbthread = 1
  219. }
  220. if !data.Contains("params", "haproxy", "global_log") {
  221. p.GlobalLog = "log /dev/log local0 info"
  222. }
  223. if !data.Contains("params", "haproxy", "log_http") {
  224. p.LogHttp = true
  225. }
  226. if !data.Contains("params", "haproxy", "log_normal") {
  227. p.LogNormal = true
  228. }
  229. }
  230. func (p *SLoadbalancerAgentParamsTelegraf) Validate(data *jsonutils.JSONDict) error {
  231. if p.InfluxDbOutputUrl != "" {
  232. _, err := url.Parse(p.InfluxDbOutputUrl)
  233. if err != nil {
  234. return httperrors.NewInputParameterError("telegraf params: invalid influxdb url: %s", err)
  235. }
  236. }
  237. if p.HaproxyInputInterval <= 0 {
  238. p.HaproxyInputInterval = 5
  239. }
  240. if p.InfluxDbOutputName == "" {
  241. p.InfluxDbOutputName = "telegraf"
  242. }
  243. return nil
  244. }
  245. func (p *SLoadbalancerAgentParamsTelegraf) needsUpdatePeer(pp *SLoadbalancerAgentParamsTelegraf) bool {
  246. return *p != *pp
  247. }
  248. func (p *SLoadbalancerAgentParamsTelegraf) updateBy(pp *SLoadbalancerAgentParamsTelegraf) {
  249. *p = *pp
  250. }
  251. func (p *SLoadbalancerAgentParamsTelegraf) initDefault(data *jsonutils.JSONDict) {
  252. if p.InfluxDbOutputUrl == "" {
  253. baseOpts := &options.Options
  254. u, _ := tsdb.GetDefaultServiceSourceURL(auth.GetAdminSession(context.Background(), baseOpts.Region), identity_apis.EndpointInterfacePublic)
  255. p.InfluxDbOutputUrl = u
  256. p.InfluxDbOutputUnsafeSsl = true
  257. }
  258. if p.HaproxyInputInterval == 0 {
  259. p.HaproxyInputInterval = 5
  260. }
  261. if p.InfluxDbOutputName == "" {
  262. p.InfluxDbOutputName = "telegraf"
  263. }
  264. }
  265. func (p *SLoadbalancerAgentParams) validateTmpl(k, s string) error {
  266. d, err := base64.StdEncoding.DecodeString(s)
  267. if err != nil {
  268. return httperrors.NewInputParameterError("%s: bad base64 encoded string: %s", k, err)
  269. }
  270. s = string(d)
  271. _, err = template.New("").Parse(s)
  272. if err != nil {
  273. return httperrors.NewInputParameterError("%s: bad template: %s", k, err)
  274. }
  275. return nil
  276. }
  277. func (p *SLoadbalancerAgentParams) initDefault(data *jsonutils.JSONDict) {
  278. if p.KeepalivedConfTmpl == "" {
  279. p.KeepalivedConfTmpl = loadbalancerKeepalivedConfTmplDefaultEncoded
  280. }
  281. if p.HaproxyConfTmpl == "" {
  282. p.HaproxyConfTmpl = loadbalancerHaproxyConfTmplDefaultEncoded
  283. }
  284. if p.TelegrafConfTmpl == "" {
  285. p.TelegrafConfTmpl = loadbalancerTelegrafConfTmplDefaultEncoded
  286. }
  287. //p.Vrrp.initDefault(data)
  288. p.Haproxy.initDefault(data)
  289. p.Telegraf.initDefault(data)
  290. }
  291. func (p *SLoadbalancerAgentParams) Validate(data *jsonutils.JSONDict) error {
  292. p.initDefault(data)
  293. if err := p.validateTmpl("keepalived_conf_tmpl", p.KeepalivedConfTmpl); err != nil {
  294. return err
  295. }
  296. if err := p.validateTmpl("haproxy_conf_tmpl", p.HaproxyConfTmpl); err != nil {
  297. return err
  298. }
  299. if err := p.validateTmpl("telegraf_conf_tmpl", p.TelegrafConfTmpl); err != nil {
  300. return err
  301. }
  302. /*if err := p.Vrrp.Validate(data); err != nil {
  303. return err
  304. }*/
  305. if err := p.Haproxy.Validate(data); err != nil {
  306. return err
  307. }
  308. if err := p.Telegraf.Validate(data); err != nil {
  309. return err
  310. }
  311. return nil
  312. }
  313. func (p *SLoadbalancerAgentParams) needsUpdatePeer(pp *SLoadbalancerAgentParams) bool {
  314. if p.KeepalivedConfTmpl != pp.KeepalivedConfTmpl ||
  315. p.HaproxyConfTmpl != pp.HaproxyConfTmpl ||
  316. p.TelegrafConfTmpl != pp.TelegrafConfTmpl {
  317. return true
  318. }
  319. return p.Haproxy.needsUpdatePeer(&pp.Haproxy) ||
  320. p.Telegraf.needsUpdatePeer(&pp.Telegraf)
  321. }
  322. func (p *SLoadbalancerAgentParams) updateBy(pp *SLoadbalancerAgentParams) {
  323. p.KeepalivedConfTmpl = pp.KeepalivedConfTmpl
  324. p.HaproxyConfTmpl = pp.HaproxyConfTmpl
  325. p.TelegrafConfTmpl = pp.TelegrafConfTmpl
  326. // p.Vrrp.updateBy(&pp.Vrrp)
  327. p.Haproxy.updateBy(&pp.Haproxy)
  328. p.Telegraf.updateBy(&pp.Telegraf)
  329. }
  330. func (p *SLoadbalancerAgentParams) String() string {
  331. return jsonutils.Marshal(p).String()
  332. }
  333. func (p *SLoadbalancerAgentParams) IsZero() bool {
  334. if *p == (SLoadbalancerAgentParams{}) {
  335. return true
  336. }
  337. return false
  338. }
  339. func (man *SLoadbalancerAgentManager) GetPropertyDefaultParams(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  340. params := SLoadbalancerAgentParams{}
  341. params.initDefault(jsonutils.NewDict())
  342. {
  343. clusterV := validators.NewModelIdOrNameValidator("cluster", "loadbalancercluster", userCred)
  344. clusterV.Optional(true)
  345. if err := clusterV.Validate(ctx, query.(*jsonutils.JSONDict)); err != nil {
  346. return nil, err
  347. }
  348. if clusterV.Model != nil {
  349. cluster := clusterV.Model.(*SLoadbalancerCluster)
  350. lbagents, err := LoadbalancerClusterManager.getLoadbalancerAgents(cluster.Id)
  351. if err != nil {
  352. return nil, httperrors.NewGeneralError(err)
  353. }
  354. if len(lbagents) > 0 {
  355. lbagent := lbagents[0]
  356. params.updateBy(lbagent.Params)
  357. }
  358. }
  359. }
  360. paramsObj := jsonutils.Marshal(params)
  361. r := jsonutils.NewDict()
  362. r.Set("params", paramsObj)
  363. return r, nil
  364. }
  365. func (man *SLoadbalancerAgentManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  366. // clusterV := validators.NewModelIdOrNameValidator("cluster", "loadbalancercluster", ownerId)
  367. {
  368. keyV := map[string]validators.IValidator{
  369. "hb_timeout": validators.NewNonNegativeValidator("hb_timeout").Default(3600),
  370. // "cluster": clusterV,
  371. }
  372. for _, v := range keyV {
  373. if err := v.Validate(ctx, data); err != nil {
  374. return nil, err
  375. }
  376. }
  377. }
  378. input := apis.StandaloneResourceCreateInput{}
  379. err := data.Unmarshal(&input)
  380. if err != nil {
  381. return nil, httperrors.NewInternalServerError("unmarshal StandaloneResourceCreateInput fail %s", err)
  382. }
  383. input, err = man.SStandaloneResourceBaseManager.ValidateCreateData(ctx, userCred, ownerId, query, input)
  384. if err != nil {
  385. return nil, err
  386. }
  387. data.Update(jsonutils.Marshal(input))
  388. return data, nil
  389. }
  390. func (agent *SLoadbalancerAgent) PostCreate(
  391. ctx context.Context,
  392. userCred mcclient.TokenCredential,
  393. ownerId mcclient.IIdentityProvider,
  394. query jsonutils.JSONObject,
  395. data jsonutils.JSONObject,
  396. ) {
  397. params := SLoadbalancerAgentParams{}
  398. params.initDefault(data.(*jsonutils.JSONDict))
  399. _, err := db.Update(agent, func() error {
  400. agent.Params = &params
  401. return nil
  402. })
  403. if err != nil {
  404. log.Errorf("init params fail: %s", err)
  405. }
  406. }
  407. // 负载均衡Agent列表
  408. func (man *SLoadbalancerAgentManager) ListItemFilter(
  409. ctx context.Context,
  410. q *sqlchemy.SQuery,
  411. userCred mcclient.TokenCredential,
  412. query api.LoadbalancerAgentListInput,
  413. ) (*sqlchemy.SQuery, error) {
  414. q, err := man.SStandaloneResourceBaseManager.ListItemFilter(ctx, q, userCred, query.StandaloneResourceListInput)
  415. if err != nil {
  416. return nil, errors.Wrap(err, "SStandaloneResourceBaseManager.ListItemFilter")
  417. }
  418. q, err = man.SLoadbalancerClusterResourceBaseManager.ListItemFilter(ctx, q, userCred, query.LoadbalancerClusterFilterListInput)
  419. if err != nil {
  420. return nil, errors.Wrap(err, "SLoadbalancerClusterResourceBaseManager.ListItemFilter")
  421. }
  422. if len(query.Version) > 0 {
  423. q = q.In("version", query.Version)
  424. }
  425. if len(query.IP) > 0 {
  426. q = q.In("ip", query.IP)
  427. }
  428. if len(query.HaState) > 0 {
  429. q = q.In("ha_state", query.HaState)
  430. }
  431. return q, nil
  432. }
  433. func (man *SLoadbalancerAgentManager) OrderByExtraFields(
  434. ctx context.Context,
  435. q *sqlchemy.SQuery,
  436. userCred mcclient.TokenCredential,
  437. query api.LoadbalancerAgentListInput,
  438. ) (*sqlchemy.SQuery, error) {
  439. var err error
  440. q, err = man.SStandaloneResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.StandaloneResourceListInput)
  441. if err != nil {
  442. return nil, errors.Wrap(err, "SStandaloneResourceBaseManager.OrderByExtraFields")
  443. }
  444. q, err = man.SLoadbalancerClusterResourceBaseManager.ListItemFilter(ctx, q, userCred, query.LoadbalancerClusterFilterListInput)
  445. if err != nil {
  446. return nil, errors.Wrap(err, "SLoadbalancerClusterResourceBaseManager.ListItemFilter")
  447. }
  448. return q, nil
  449. }
  450. func (man *SLoadbalancerAgentManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  451. var err error
  452. q, err = man.SStandaloneResourceBaseManager.QueryDistinctExtraField(q, field)
  453. if err == nil {
  454. return q, nil
  455. }
  456. q, err = man.SLoadbalancerClusterResourceBaseManager.QueryDistinctExtraField(q, field)
  457. if err == nil {
  458. return q, nil
  459. }
  460. return q, httperrors.ErrNotFound
  461. }
  462. func (lbagent *SLoadbalancerAgent) ValidateUpdateData(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  463. if data.Contains("cluster_id") {
  464. data.Remove("cluster_id")
  465. }
  466. {
  467. keyV := map[string]validators.IValidator{
  468. "hb_timeout": validators.NewNonNegativeValidator("hb_timeout").Optional(true),
  469. }
  470. for _, v := range keyV {
  471. if err := v.Validate(ctx, data); err != nil {
  472. return nil, err
  473. }
  474. }
  475. }
  476. keys := map[string]time.Time{
  477. "networks": lbagent.Networks,
  478. "loadbalancer_networks": lbagent.LoadbalancerNetworks,
  479. "loadbalancers": lbagent.Loadbalancers,
  480. "loadbalancer_listeners": lbagent.LoadbalancerListeners,
  481. "loadbalancer_listener_rules": lbagent.LoadbalancerListenerRules,
  482. "loadbalancer_backend_groups": lbagent.LoadbalancerBackendGroups,
  483. "loadbalancer_backends": lbagent.LoadbalancerBackends,
  484. "loadbalancer_acls": lbagent.LoadbalancerAcls,
  485. "loadbalancer_certificates": lbagent.LoadbalancerCertificates,
  486. }
  487. for k, curValue := range keys {
  488. if !data.Contains(k) {
  489. continue
  490. }
  491. newValue, err := data.GetTime(k)
  492. if err != nil {
  493. return nil, httperrors.NewInputParameterError("%s: time error: %s", k, err)
  494. }
  495. if newValue.Before(curValue) {
  496. // this is possible with objects deleted
  497. data.Remove(k)
  498. continue
  499. }
  500. if now := time.Now(); newValue.After(now) {
  501. return nil, httperrors.NewInputParameterError("%s: new time is in the future: %s > %s",
  502. k, newValue, now)
  503. }
  504. }
  505. data.Set("hb_last_seen", jsonutils.NewTimeString(time.Now()))
  506. return data, nil
  507. }
  508. func (agent *SLoadbalancerAgent) ValidateDeleteCondition(ctx context.Context, info jsonutils.JSONObject) error {
  509. if len(agent.ClusterId) > 0 {
  510. return errors.Wrap(httperrors.ErrResourceBusy, "agent join a cluster")
  511. }
  512. return agent.SStandaloneResourceBase.ValidateDeleteCondition(ctx, info)
  513. }
  514. func (manager *SLoadbalancerAgentManager) FetchCustomizeColumns(
  515. ctx context.Context,
  516. userCred mcclient.TokenCredential,
  517. query jsonutils.JSONObject,
  518. objs []interface{},
  519. fields stringutils2.SSortedStrings,
  520. isList bool,
  521. ) []api.LoadbalancerAgentDetails {
  522. rows := make([]api.LoadbalancerAgentDetails, len(objs))
  523. stdRows := manager.SStandaloneResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  524. clusterRows := manager.SLoadbalancerClusterResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  525. for i := range rows {
  526. rows[i] = api.LoadbalancerAgentDetails{
  527. StandaloneResourceDetails: stdRows[i],
  528. LoadbalancerClusterResourceInfo: clusterRows[i],
  529. }
  530. }
  531. return rows
  532. }
  533. /*func (manager *SLoadbalancerAgentManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  534. var err error
  535. q, err = manager.SStandaloneResourceBaseManager.QueryDistinctExtraField(q, field)
  536. if err == nil {
  537. return q, nil
  538. }
  539. switch field {
  540. case "cluster":
  541. clusterQuery := LoadbalancerClusterManager.Query("name", "id").Distinct().SubQuery()
  542. q = q.Join(clusterQuery, sqlchemy.Equals(q.Field("cluster_id"), clusterQuery.Field("id")))
  543. q.GroupBy(clusterQuery.Field("name"))
  544. q.AppendField(clusterQuery.Field("name", "cluster"))
  545. default:
  546. return q, httperrors.NewBadRequestError("unsupport field %s", field)
  547. }
  548. return q, nil
  549. }*/
  550. func (manager *SLoadbalancerAgentManager) ListItemExportKeys(ctx context.Context,
  551. q *sqlchemy.SQuery,
  552. userCred mcclient.TokenCredential,
  553. keys stringutils2.SSortedStrings,
  554. ) (*sqlchemy.SQuery, error) {
  555. var err error
  556. q, err = manager.SStandaloneResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  557. if err != nil {
  558. return nil, errors.Wrap(err, "SStandaloneResourceBaseManager.ListItemExportKeys")
  559. }
  560. if keys.ContainsAny(manager.SLoadbalancerClusterResourceBaseManager.GetExportKeys()...) {
  561. q, err = manager.SLoadbalancerClusterResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
  562. if err != nil {
  563. return nil, errors.Wrap(err, "SLoadbalancerClusterResourceBaseManager.ListItemExportKeys")
  564. }
  565. }
  566. return q, nil
  567. }
  568. func (man *SLoadbalancerAgentManager) getByClusterId(clusterId string) ([]SLoadbalancerAgent, error) {
  569. r := []SLoadbalancerAgent{}
  570. q := man.Query().Equals("cluster_id", clusterId)
  571. if err := db.FetchModelObjects(man, q, &r); err != nil {
  572. return nil, errors.Wrap(err, "FetchModelObjects")
  573. }
  574. return r, nil
  575. }
  576. func (lbagent *SLoadbalancerAgent) getCluster() *SLoadbalancerCluster {
  577. if len(lbagent.ClusterId) == 0 {
  578. return nil
  579. }
  580. clusterObj, err := LoadbalancerClusterManager.FetchById(lbagent.ClusterId)
  581. if err != nil {
  582. log.Errorf("SLoadbalancerAgent.getCluster error %s", err)
  583. return nil
  584. }
  585. return clusterObj.(*SLoadbalancerCluster)
  586. }
  587. func (lbagent *SLoadbalancerAgent) PerformHb(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  588. ipV := validators.NewIPv4AddrValidator("ip")
  589. haStateV := validators.NewStringChoicesValidator("ha_state", api.LB_HA_STATES)
  590. {
  591. keyV := map[string]validators.IValidator{
  592. "ip": ipV,
  593. "ha_state": haStateV,
  594. }
  595. for _, v := range keyV {
  596. v.Optional(true)
  597. if err := v.Validate(ctx, data); err != nil {
  598. return nil, err
  599. }
  600. }
  601. }
  602. diff, err := lbagent.GetModelManager().TableSpec().Update(ctx, lbagent, func() error {
  603. lbagent.HbLastSeen = time.Now()
  604. if jVer, err := data.Get("version"); err == nil {
  605. if jVerStr, ok := jVer.(*jsonutils.JSONString); ok {
  606. lbagent.Version, _ = jVerStr.GetString()
  607. }
  608. }
  609. if ipV.IP != nil {
  610. lbagent.IP = ipV.IP.String()
  611. }
  612. if haStateV.Value != "" {
  613. lbagent.HaState = haStateV.Value
  614. }
  615. return nil
  616. })
  617. if err != nil {
  618. return nil, err
  619. }
  620. if len(diff) > 1 {
  621. // other things changed besides hb_last_seen
  622. log.Infof("lbagent %s(%s) state changed: %s", lbagent.Name, lbagent.Id, diff)
  623. db.OpsLog.LogEvent(lbagent, db.ACT_UPDATE, diff, userCred)
  624. }
  625. return nil, nil
  626. }
  627. func (lbagent *SLoadbalancerAgent) IsActive() bool {
  628. if lbagent.HbLastSeen.IsZero() {
  629. return false
  630. }
  631. duration := time.Since(lbagent.HbLastSeen).Seconds()
  632. if int(duration) >= lbagent.HbTimeout {
  633. return false
  634. }
  635. return true
  636. }
  637. func (lbagent *SLoadbalancerAgent) PerformJoinCluster(
  638. ctx context.Context,
  639. userCred mcclient.TokenCredential,
  640. query jsonutils.JSONObject,
  641. input api.LoadbalancerAgentJoinClusterInput,
  642. ) (*jsonutils.JSONDict, error) {
  643. if len(lbagent.ClusterId) > 0 {
  644. return nil, errors.Wrap(httperrors.ErrConflict, "lbagent has been join cluster")
  645. }
  646. clusterObj, err := LoadbalancerClusterManager.FetchByIdOrName(ctx, userCred, input.ClusterId)
  647. if err != nil {
  648. if errors.Cause(err) == sql.ErrNoRows {
  649. return nil, errors.Wrapf(httperrors.ErrNotFound, "%s %s", LoadbalancerClusterManager.Keyword(), input.ClusterId)
  650. } else {
  651. return nil, errors.Wrap(err, "LoadbalancerClusterManager.FetchById")
  652. }
  653. }
  654. cluster := clusterObj.(*SLoadbalancerCluster)
  655. lockman.LockObject(ctx, cluster)
  656. defer lockman.ReleaseObject(ctx, cluster)
  657. peerAgents, err := LoadbalancerAgentManager.getByClusterId(cluster.Id)
  658. if err != nil {
  659. return nil, errors.Wrap(err, "LoadbalancerAgentManager.getByClusterId")
  660. }
  661. if len(peerAgents) >= 2 {
  662. return nil, errors.Wrap(httperrors.ErrTooLarge, "too many agents")
  663. }
  664. priority := 255
  665. if input.Priority > 0 {
  666. for i := range peerAgents {
  667. if input.Priority == peerAgents[i].Priority {
  668. return nil, errors.Wrap(httperrors.ErrDuplicateId, "duplicate priority in same cluster")
  669. }
  670. }
  671. priority = input.Priority
  672. } else {
  673. for i := range peerAgents {
  674. if priority >= peerAgents[i].Priority {
  675. priority = peerAgents[i].Priority - 1
  676. }
  677. }
  678. }
  679. var params SLoadbalancerAgentParams
  680. if lbagent.Params != nil {
  681. params = *lbagent.Params
  682. }
  683. params.Vrrp.SLoadbalancerClusterParams = *cluster.Params
  684. _, err = db.Update(lbagent, func() error {
  685. lbagent.ClusterId = cluster.Id
  686. lbagent.Priority = priority
  687. lbagent.Params = &params
  688. return nil
  689. })
  690. if err != nil {
  691. return nil, errors.Wrap(err, "Update")
  692. } else {
  693. notes := struct {
  694. ClusterId string
  695. Priority int
  696. }{
  697. ClusterId: cluster.Id,
  698. Priority: priority,
  699. }
  700. logclient.AddActionLogWithContext(ctx, lbagent, logclient.ACT_ATTACH_HOST, notes, userCred, true)
  701. }
  702. return nil, nil
  703. }
  704. func (lbagent *SLoadbalancerAgent) PerformLeaveCluster(
  705. ctx context.Context,
  706. userCred mcclient.TokenCredential,
  707. query jsonutils.JSONObject,
  708. data api.LoadbalancerAgentLeaveClusterInput,
  709. ) (*jsonutils.JSONDict, error) {
  710. if len(lbagent.ClusterId) == 0 {
  711. return nil, errors.Wrap(httperrors.ErrInvalidStatus, "lbagent not belong to any cluster")
  712. }
  713. oldClusterId := lbagent.ClusterId
  714. _, err := db.Update(lbagent, func() error {
  715. lbagent.ClusterId = ""
  716. lbagent.HaState = api.LB_HA_STATE_UNKNOWN
  717. return nil
  718. })
  719. if err != nil {
  720. return nil, errors.Wrap(err, "Update")
  721. } else {
  722. notes := struct {
  723. ClusterId string
  724. }{
  725. ClusterId: oldClusterId,
  726. }
  727. logclient.AddActionLogWithContext(ctx, lbagent, logclient.ACT_DETACH_HOST, notes, userCred, true)
  728. }
  729. return nil, nil
  730. }
  731. func (lbagent *SLoadbalancerAgent) PerformParamsPatch(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  732. oldParams := lbagent.Params
  733. params := gotypes.DeepCopy(*lbagent.Params).(SLoadbalancerAgentParams)
  734. d := jsonutils.NewDict()
  735. d.Set("params", data)
  736. paramsV := validators.NewStructValidator("params", &params)
  737. if err := paramsV.Validate(ctx, d); err != nil {
  738. return nil, err
  739. }
  740. {
  741. diff, err := db.Update(lbagent, func() error {
  742. lbagent.Params = &params
  743. return nil
  744. })
  745. if err != nil {
  746. return nil, err
  747. }
  748. db.OpsLog.LogEvent(lbagent, db.ACT_UPDATE, diff, userCred)
  749. }
  750. if oldParams.needsUpdatePeer(&params) {
  751. lbagents, err := LoadbalancerClusterManager.getLoadbalancerAgents(lbagent.ClusterId)
  752. if err != nil {
  753. return nil, httperrors.NewGeneralError(err)
  754. }
  755. log.Infof("updating peer lbagents' vrrp params by those from %s(%s)", lbagent.Name, lbagent.Id)
  756. for i := range lbagents {
  757. peerLbagent := &lbagents[i]
  758. if lbagent.Id != peerLbagent.Id {
  759. diff, err := db.Update(peerLbagent, func() error {
  760. peerLbagent.Params.updateBy(&params)
  761. return nil
  762. })
  763. if err != nil {
  764. return nil, err
  765. }
  766. db.OpsLog.LogEvent(peerLbagent, db.ACT_UPDATE, diff, userCred)
  767. }
  768. }
  769. }
  770. return nil, nil
  771. }
  772. const (
  773. loadbalancerKeepalivedConfTmplDefault = `
  774. global_defs {
  775. router_id {{ .agent.id }}
  776. #vrrp_strict
  777. vrrp_skip_check_adv_addr
  778. enable_script_security
  779. }
  780. vrrp_instance YunionLB {
  781. interface {{ .vrrp.interface }}
  782. virtual_router_id {{ .vrrp.virtual_router_id }}
  783. authentication {
  784. auth_type PASS
  785. auth_pass {{ .vrrp.pass }}
  786. }
  787. {{ if .vrrp.notify_script -}} notify {{ .vrrp.notify_script }} root {{- end }}
  788. {{ if .vrrp.unicast_peer -}} unicast_peer { {{- println }}
  789. {{- range .vrrp.unicast_peer }} {{ println . }} {{- end }}
  790. }
  791. {{- end }}
  792. priority {{ .vrrp.priority }}
  793. advert_int {{ .vrrp.advert_int }}
  794. garp_master_refresh {{ .vrrp.garp_master_refresh }}
  795. {{ if .vrrp.preempt -}} preempt {{- else -}} nopreempt {{- end }}
  796. virtual_ipaddress {
  797. {{- printf "\n" }}
  798. {{- range .vrrp.addresses }} {{ println . }} {{- end }}
  799. {{- printf "\t" -}}
  800. }
  801. }
  802. `
  803. loadbalancerHaproxyConfTmplDefault = `
  804. global
  805. maxconn 20480
  806. tune.ssl.default-dh-param 2048
  807. {{- println }}
  808. {{- if .haproxy.tune_http_maxhdr }} tune.http.maxhdr {{ println .haproxy.tune_http_maxhdr }} {{- end }}
  809. {{- if .haproxy.global_stats_socket }} {{ println .haproxy.global_stats_socket }} {{- end }}
  810. {{- if .haproxy.global_nbthread }} nbthread {{ println .haproxy.global_nbthread }} {{- end }}
  811. {{- if .haproxy.global_log }} {{ println .haproxy.global_log }} {{- end }}
  812. defaults
  813. timeout connect 10s
  814. timeout client 60s
  815. timeout server 60s
  816. timeout tunnel 1h
  817. {{- println }}
  818. {{- if .haproxy.global_log }} {{ println "log global" }} {{- end }}
  819. {{- if not .haproxy.log_normal }} {{ println "option dontlog-normal" }} {{- end }}
  820. listen stats
  821. mode http
  822. bind :778
  823. stats enable
  824. stats hide-version
  825. stats realm "Haproxy Statistics"
  826. stats auth Yunion:LBStats
  827. stats uri /
  828. `
  829. loadbalancerTelegrafConfTmplDefault = `
  830. [[outputs.influxdb]]
  831. urls = ["{{ .telegraf.influx_db_output_url }}"]
  832. database = "{{ .telegraf.influx_db_output_name }}"
  833. insecure_skip_verify = {{ .telegraf.influx_db_output_unsafe_ssl }}
  834. [[inputs.haproxy]]
  835. interval = "{{ .telegraf.haproxy_input_interval }}s"
  836. servers = ["{{ .telegraf.haproxy_input_stats_socket }}"]
  837. keep_field_names = true
  838. `
  839. )
  840. var (
  841. loadbalancerKeepalivedConfTmplDefaultEncoded = base64.StdEncoding.EncodeToString([]byte(loadbalancerKeepalivedConfTmplDefault))
  842. loadbalancerHaproxyConfTmplDefaultEncoded = base64.StdEncoding.EncodeToString([]byte(loadbalancerHaproxyConfTmplDefault))
  843. loadbalancerTelegrafConfTmplDefaultEncoded = base64.StdEncoding.EncodeToString([]byte(loadbalancerTelegrafConfTmplDefault))
  844. )