| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package balancer
- import (
- "context"
- "fmt"
- "math"
- "sort"
- "sync"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/utils"
- computeapi "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/apis/monitor"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- compute_options "yunion.io/x/onecloud/pkg/mcclient/options/compute"
- "yunion.io/x/onecloud/pkg/monitor/alerting"
- "yunion.io/x/onecloud/pkg/monitor/datasource"
- "yunion.io/x/onecloud/pkg/monitor/models"
- "yunion.io/x/onecloud/pkg/monitor/tsdb"
- )
- func init() {
- for _, drv := range []IMetricDriver{
- newMemAvailable(),
- newCPUUsageActive(),
- } {
- GetMetricDrivers().register(drv)
- }
- }
- var (
- drivers *MetricDrivers
- )
- type MetricDrivers struct {
- *sync.Map
- }
- func NewMetricDrivers() *MetricDrivers {
- return &MetricDrivers{
- Map: new(sync.Map),
- }
- }
- func (d *MetricDrivers) register(id IMetricDriver) *MetricDrivers {
- d.Store(id.GetType(), id)
- return d
- }
- func (d *MetricDrivers) Get(mT monitor.MigrationAlertMetricType) (IMetricDriver, error) {
- drv, ok := d.Load(mT)
- if !ok {
- return nil, errors.Errorf("Not found driver by %q", mT)
- }
- return drv.(IMetricDriver), nil
- }
- func GetMetricDrivers() *MetricDrivers {
- if drivers == nil {
- drivers = NewMetricDrivers()
- }
- return drivers
- }
- type IMetricDriver interface {
- GetType() monitor.MigrationAlertMetricType
- GetTsdbQuery() *TsdbQuery
- GetCandidate(gst jsonutils.JSONObject, host IHost, ds *tsdb.DataSource) (ICandidate, error)
- SetHostCurrent(h IHost, values map[string]float64) error
- GetTarget(host jsonutils.JSONObject) (ITarget, error)
- GetCondition(s *monitor.AlertSetting) (ICondition, error)
- }
- type ICondition interface {
- GetThreshold() float64
- // GetSourceThresholdDelta must > 0
- GetSourceThresholdDelta(threshold float64, srcHost IHost) float64
- IsFitTarget(settings *monitor.MigrationAlertSettings, t ITarget, c ICandidate) error
- }
- type Rules struct {
- Alert *models.SMigrationAlert
- Condtion ICondition
- Source *SourceRule
- Target *TargetRule
- ResultMustPair bool
- }
- func (r *Rules) GetAlert() *models.SMigrationAlert {
- return r.Alert
- }
- func NewRules(_ *alerting.EvalContext, m *monitor.EvalMatch, alert *models.SMigrationAlert, drv IMetricDriver, resultMustPair bool) (*Rules, error) {
- hostId, ok := m.Tags["host_id"]
- if !ok {
- return nil, errors.Errorf("Not found host_id in tags: %#v", m.Tags)
- }
- ok, hObjs := models.MonitorResourceManager.GetResourceObjByResType(monitor.METRIC_RES_TYPE_HOST)
- if !ok {
- return nil, errors.Errorf("GetResourceObjByResType host returns false")
- }
- var srcHostObj jsonutils.JSONObject = nil
- for _, obj := range hObjs {
- id, err := obj.GetString("id")
- if err != nil {
- return nil, errors.Wrapf(err, "get host obj id: %s", obj)
- }
- if id == hostId {
- srcHostObj = obj
- break
- }
- }
- if srcHostObj == nil {
- return nil, errors.Errorf("Not found source host object by id: %q, %q", hostId, srcHostObj)
- }
- srcHost, err := drv.GetTarget(srcHostObj)
- if err != nil {
- return nil, errors.Wrap(err, "new host")
- }
- allHosts := []IResource{srcHost}
- msettings, _ := alert.GetMigrationSettings()
- targetHosts, err := filterTargetHosts(drv, srcHost, hObjs, msettings)
- if err != nil {
- return nil, errors.Wrap(err, "filterTargetHosts")
- }
- for _, oh := range targetHosts {
- allHosts = append(allHosts, oh)
- }
- ds, err := datasource.GetDefaultSource("telegraf")
- if err != nil {
- return nil, errors.Wrapf(err, "Get default DataSource")
- }
- // find guests to filtered by source setting of source host alerted
- cds, err := findGuestsOfHost(drv, srcHost, ds, msettings)
- if err != nil {
- return nil, errors.Wrapf(err, "findGuestsOfHost %s", srcHost.GetName())
- }
- metrics, err := InfluxdbQuery(ds, "host_id", allHosts, drv.GetTsdbQuery())
- if err != nil {
- return nil, errors.Wrapf(err, "InfluxdbQuery all hosts metrics")
- }
- for _, host := range allHosts {
- m := metrics.Get(host.GetId())
- if m == nil {
- return nil, errors.Errorf("Influxdb metrics of %s(%s) not found", host.GetName(), host.GetId())
- }
- if err := drv.SetHostCurrent(host.(IHost), m.Values); err != nil {
- return nil, errors.Wrapf(err, "SetHostCurrent %q", host.GetName())
- }
- }
- settings, err := alert.GetSettings()
- if err != nil {
- return nil, errors.Wrapf(err, "Get alert settings")
- }
- cond, err := drv.GetCondition(settings)
- if err != nil {
- return nil, errors.Wrapf(err, "Get Condtion")
- }
- rs := &Rules{
- Alert: alert,
- Condtion: cond,
- ResultMustPair: resultMustPair,
- }
- rs.Source = NewSourceRule(srcHost, cds)
- rs.Target = NewTargetRule(targetHosts)
- return rs, nil
- }
- func filterTargetHosts(drv IMetricDriver, srcHost IHost, allHost []jsonutils.JSONObject, ms *monitor.MigrationAlertSettings) ([]ITarget, error) {
- specifyTargetHostIds := []string{}
- specifySrcHostIds := []string{}
- if ms != nil && ms.Target != nil {
- specifyTargetHostIds = ms.Target.HostIds
- }
- if ms != nil && ms.Source != nil {
- specifySrcHostIds = ms.Source.HostIds
- }
- srcHostId := srcHost.GetId()
- srcHostObj := srcHost.GetObject()
- srcHostType, err := srcHostObj.GetString("host_type")
- if err != nil {
- return nil, errors.Wrap(err, "get source host_type")
- }
- srcArch, err := srcHostObj.GetString("cpu_architecture")
- if err != nil {
- return nil, errors.Wrapf(err, "get source cpu_architecture")
- }
- targets := make([]ITarget, 0)
- for _, obj := range allHost {
- id, err := obj.GetString("id")
- if err != nil {
- return nil, errors.Wrapf(err, "get host obj id: %s", obj)
- }
- if id == srcHostId {
- continue
- }
- if len(specifySrcHostIds) != 0 {
- if utils.IsInStringArray(id, specifySrcHostIds) {
- // filter target host if it in source specified hosts
- continue
- }
- }
- if len(specifyTargetHostIds) != 0 {
- // filter target host if it not in target specified hosts
- if !utils.IsInStringArray(id, specifyTargetHostIds) {
- continue
- }
- }
- hostType, _ := obj.GetString("host_type")
- if hostType != srcHostType {
- continue
- }
- arch, _ := obj.GetString("cpu_architecture")
- if arch != srcArch {
- continue
- }
- enabled, _ := obj.Bool("enabled")
- if !enabled {
- continue
- }
- hostStatus, _ := obj.GetString("host_status")
- if hostStatus != "online" {
- continue
- }
- th, err := drv.GetTarget(obj)
- if err != nil {
- return nil, errors.Wrapf(err, "drv.GetTarget %s", obj)
- }
- targets = append(targets, th)
- }
- return targets, nil
- }
- func findGuestsOfHost(drv IMetricDriver, host IHost, ds *tsdb.DataSource, ms *monitor.MigrationAlertSettings) ([]ICandidate, error) {
- ok, objs := models.MonitorResourceManager.GetResourceObjByResType(monitor.METRIC_RES_TYPE_GUEST)
- if !ok {
- return nil, errors.Errorf("GetResourceObjByResType by guest return false")
- }
- specifyGuestIds := []string{}
- if ms != nil && ms.Source != nil {
- specifyGuestIds = ms.Source.GuestIds
- }
- ret := make([]ICandidate, 0)
- found := false
- errs := []error{}
- for _, obj := range objs {
- gHostId, err := obj.GetString("host_id")
- if err != nil {
- return nil, errors.Wrapf(err, "get host_id from cache guest %s", obj)
- }
- if gHostId == host.GetId() {
- status, err := obj.GetString("status")
- if err != nil {
- return nil, errors.Wrapf(err, "get status of guest: %s", obj)
- }
- name, _ := obj.GetString("name")
- // filter running guest
- if status != computeapi.VM_RUNNING {
- log.Debugf("ignore guest %s cause status is %s", name, status)
- continue
- }
- gId, _ := obj.GetString("id")
- if len(specifyGuestIds) != 0 {
- if !utils.IsInStringArray(gId, specifyGuestIds) {
- log.Debugf("ignore guest %s(%s) cause not in specified ids %v", name, gId, specifyGuestIds)
- continue
- }
- }
- c, err := drv.GetCandidate(obj, host, ds)
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "drv.GetCandidate of guest %s", obj))
- continue
- }
- if c.GetScore() == 0 {
- log.Debugf("ignore guest %s cause %s score is 0", c.GetName(), drv.GetType())
- continue
- }
- ret = append(ret, c)
- found = true
- }
- }
- if !found {
- return nil, errors.NewAggregate(errs)
- }
- if len(errs) != 0 {
- log.Warningf("not all guests found: %s", errors.NewAggregate(errs))
- }
- return ret, nil
- }
- // SourceRule 定义触发了报警的宿主机和上面可以迁移的虚拟机
- type SourceRule struct {
- Host IHost
- Candidates []ICandidate
- }
- func NewSourceRule(host IHost, cds []ICandidate) *SourceRule {
- return &SourceRule{
- Host: host,
- Candidates: cds,
- }
- }
- type IHost interface {
- IResource
- GetHostResource() *HostResource
- GetCurrent() float64
- SetCurrent(float64) IHost
- Compare(oh IHost) bool
- }
- // TargetRule 定义可以选择迁移的宿主机
- type TargetRule struct {
- Items []ITarget
- }
- func NewTargetRule(hosts []ITarget) *TargetRule {
- return &TargetRule{
- Items: hosts,
- }
- }
- type ItemType string
- const (
- ItemTypeHost = "host"
- ItemTypeGuest = "guest"
- )
- type ITarget interface {
- IHost
- Selected(c ICandidate) ITarget
- }
- type iTargets []ITarget
- func (i iTargets) Len() int {
- return len(i)
- }
- func (ts iTargets) Less(i, j int) bool {
- a, b := ts[i], ts[j]
- return a.Compare(b)
- }
- func (ts iTargets) Swap(i, j int) {
- ts[i], ts[j] = ts[j], ts[i]
- }
- func RecoverInProcessAlerts(ctx context.Context, s *mcclient.ClientSession) error {
- alerts, err := models.GetMigrationAlertManager().GetInMigrationAlerts()
- if err != nil {
- return errors.Wrap(err, "GetInMigrationAlerts")
- }
- recorder := NewRecorder()
- errs := make([]error, 0)
- for _, alert := range alerts {
- notes, err := alert.GetMigrateNotes()
- if err != nil {
- errs = append(errs, errors.Wrapf(err, "GetMigrateNotes for %s", alert.GetId()))
- continue
- }
- for _, note := range notes {
- log.Infof("Start recover alert %s(%s) note %s", alert.GetName(), alert.GetId(), jsonutils.Marshal(note))
- notePrt := ¬e
- recorder.StartWatchMigratingProcess(ctx, s, alert, notePrt)
- }
- }
- return errors.NewAggregate(errs)
- }
- func DoBalance(ctx context.Context, s *mcclient.ClientSession, rules *Rules, recorder IRecorder) error {
- // check whether having migration in process
- alerts, err := models.GetMigrationAlertManager().GetInMigrationAlerts()
- if err != nil {
- return errors.Wrap(err, "GetInMigrationAlerts")
- }
- if len(alerts) != 0 {
- ids := make([]string, len(alerts))
- for i := range alerts {
- alert := alerts[i]
- ids[i] = fmt.Sprintf("%s(%s)", alert.GetName(), alert.GetId())
- }
- return errors.Errorf("Others migration alerts in process: %v", ids)
- }
- rst, err := findResult(rules)
- if err != nil {
- err = errors.Wrapf(err, "find result to migrate")
- recorder.RecordError(s.GetToken(), rules.GetAlert(), err, EventActionFindResultFail)
- return err
- }
- if err := doMigrate(ctx, s, rules, rst, recorder); err != nil {
- return errors.Wrapf(err, "do migrate for result %#v", rst)
- }
- return nil
- }
- type result struct {
- pairs []*resultPair
- }
- type resultPair struct {
- source ICandidate
- target ITarget
- }
- func findResult(rules *Rules) (*result, error) {
- // 找到 rules.Source 里面可以迁移的虚拟机
- // guests, err := findCandidates(rules.Source, rules.Condtion)
- // if err != nil {
- // return nil, errors.Wrap(err, "find source candidates to migrate")
- // }
- // TODO
- // 将找到的 guests 进行配对,调用 scheduler-forecast 接口判断能否迁移到宿主机
- // 如果不能迁就提出这些 guests,重新 findCandidates
- // 将找到的虚拟机分配到对应的宿主机,形成 1-1 配对
- settings, _ := rules.GetAlert().GetMigrationSettings()
- return pairMigratResult(settings, rules.Source, rules.Target, rules.Condtion, rules.ResultMustPair)
- }
- type IResource interface {
- GetId() string
- GetName() string
- GetObject() jsonutils.JSONObject
- }
- type ICandidate interface {
- IResource
- GetHostName() string
- GetScore() float64
- }
- func getCScores(css []ICandidate) []float64 {
- ret := make([]float64, len(css))
- for i := range css {
- ret[i] = css[i].GetScore()
- }
- return ret
- }
- func findFitCandidates(css []ICandidate, delta float64) ([]ICandidate, error) {
- if len(css) == 0 {
- return nil, errors.Errorf("Not found fit guest candidate for delta %f", delta)
- }
- first := css[0]
- rest := css[1:]
- if first.GetScore() >= delta {
- return []ICandidate{first}, nil
- }
- rRests, err := findFitCandidates(rest, delta-first.GetScore())
- if err != nil {
- return nil, errors.Wrapf(err, "Found in rest %v", rest)
- }
- ret := []ICandidate{first}
- ret = append(ret, rRests...)
- return ret, nil
- }
- type candidatesThresholdSort struct {
- cds []ICandidate
- threshold float64
- }
- func newCandidatesThresholdSort(cds []ICandidate, th float64) *candidatesThresholdSort {
- return &candidatesThresholdSort{
- cds: cds,
- threshold: th,
- }
- }
- func (s *candidatesThresholdSort) Len() int {
- return len(s.cds)
- }
- func (s *candidatesThresholdSort) Swap(i, j int) {
- s.cds[i], s.cds[j] = s.cds[j], s.cds[i]
- }
- func (s *candidatesThresholdSort) Less(i, j int) bool {
- d1 := math.Abs(s.cds[i].GetScore() - s.threshold)
- d2 := math.Abs(s.cds[j].GetScore() - s.threshold)
- return d1 < d2
- }
- func (s *candidatesThresholdSort) CandidatesString() string {
- str := fmt.Sprintf("delta: %f\n", s.threshold)
- for _, c := range s.cds {
- str += fmt.Sprintf("%s: %f\n", c.GetName(), c.GetScore())
- }
- return str
- }
- func (s *candidatesThresholdSort) Debug(prefix string) {
- log.Infof("%s:\n%s", prefix, s.CandidatesString())
- }
- func sortCandidatesByThreshold(cds []ICandidate, th float64) []ICandidate {
- ss := newCandidatesThresholdSort(cds, th)
- // ss.Debug("Pre sort")
- sort.Sort(ss)
- // ss.Debug("After sort")
- return ss.cds
- }
- func findCandidates(src *SourceRule, cond ICondition) ([]ICandidate, error) {
- threshold := cond.GetThreshold()
- delta := cond.GetSourceThresholdDelta(threshold, src.Host)
- cds := sortCandidatesByThreshold(src.Candidates, threshold)
- return findFitCandidates(cds, delta)
- }
- func findFitTarget(settings *monitor.MigrationAlertSettings, c ICandidate, tr *TargetRule, targets iTargets, cond ICondition) (ITarget, error) {
- // sort targets
- sort.Sort(targets)
- var errs []error
- for i := range targets {
- target := targets[i]
- if err := cond.IsFitTarget(settings, target, c); err == nil {
- return target, nil
- } else {
- errs = append(errs, err)
- }
- }
- return nil, errors.NewAggregate(errs)
- }
- func pairMigratResult(
- settings *monitor.MigrationAlertSettings,
- src *SourceRule, target *TargetRule, cond ICondition, mustPair bool) (*result, error) {
- // all guests of source host to migrate
- gsts := src.Candidates
- pairs := make([]*resultPair, 0)
- hosts := target.Items
- errs := []error{}
- for _, gst := range gsts {
- host, err := findFitTarget(settings, gst, target, hosts, cond)
- if err != nil {
- err = errors.Wrapf(err, "not found target for guest %s on %s", gst.GetName(), gst.GetHostName())
- if mustPair {
- return nil, err
- } else {
- errs = append(errs, err)
- continue
- }
- }
- host.Selected(gst)
- pairs = append(pairs, &resultPair{
- source: gst,
- target: host,
- })
- }
- if len(gsts) != len(pairs) {
- if mustPair {
- return nil, errors.Errorf("%v: Paired: %d candidates != %d hosts", errors.NewAggregate(errs), len(gsts), len(pairs))
- }
- }
- if len(pairs) == 0 {
- return nil, errors.Errorf("%v: Not found any pairs, mustPair is %v", errors.NewAggregate(errs), mustPair)
- }
- if !mustPair && len(errs) != 0 {
- log.Warningf("some guest not paired: %v", errors.NewAggregate(errs))
- }
- return &result{
- pairs: pairs,
- }, nil
- }
- func doMigrate(ctx context.Context, s *mcclient.ClientSession, rules *Rules, rst *result, recorder IRecorder) error {
- // migrate must be executed on by one
- alert := rules.GetAlert()
- for _, pair := range rst.pairs {
- note, err := NewMigrateNote(pair, nil)
- if err != nil {
- return errors.Wrap(err, "NewMigrateNotes")
- }
- if obj, err := doMigrateByPair(s, pair); err != nil {
- err = errors.Wrapf(err, "doMigrateByPair %s to %s", pair.source.GetName(), pair.target.GetName())
- if rErr := recorder.RecordMigrateError(s.GetToken(), alert, note, err); rErr != nil {
- log.Errorf("RecordMigrate %s to %s error: %v", obj, pair.target.GetId(), rErr)
- }
- return err
- } else {
- if err := recorder.RecordMigrate(ctx, s, alert, note); err != nil {
- log.Errorf("RecordMigrate %s to %s error: %v", obj, pair.target.GetId(), err)
- }
- }
- }
- return nil
- }
- func doMigrateByPair(s *mcclient.ClientSession, pair *resultPair) (jsonutils.JSONObject, error) {
- gst := pair.source
- trueObj := true
- input := &compute_options.ServerLiveMigrateOptions{
- ID: gst.GetId(),
- PreferHost: pair.target.GetId(),
- SkipCpuCheck: &trueObj,
- SkipKernelCheck: &trueObj,
- }
- params, err := input.Params()
- if err != nil {
- return nil, errors.Wrapf(err, "live migrate input %#v", input)
- }
- obj, err := compute.Servers.PerformAction(s, input.GetId(), "live-migrate", params)
- if err != nil {
- return nil, errors.Wrapf(err, "live migrate with params: %s", params)
- }
- return obj, nil
- }
- type resource struct {
- id string
- name string
- obj jsonutils.JSONObject
- }
- func newResource(obj jsonutils.JSONObject) (IResource, error) {
- id, err := obj.GetString("id")
- if err != nil {
- return nil, errors.Wrap(err, "get id")
- }
- name, err := obj.GetString("name")
- if err != nil {
- return nil, errors.Wrap(err, "get name")
- }
- return &resource{
- id: id,
- name: name,
- obj: obj,
- }, nil
- }
- func (r *resource) GetId() string {
- return r.id
- }
- func (r *resource) GetName() string {
- return r.name
- }
- func (r *resource) GetObject() jsonutils.JSONObject {
- return r.obj
- }
- type guestResource struct {
- IResource
- hostName string
- guest jsonutils.JSONObject
- }
- func newGuestResource(gst jsonutils.JSONObject, hostName string) (*guestResource, error) {
- res, err := newResource(gst)
- if err != nil {
- return nil, errors.Wrap(err, "newResource")
- }
- return &guestResource{
- IResource: res,
- hostName: hostName,
- guest: gst,
- }, nil
- }
- func (s *guestResource) GetHostName() string {
- return s.hostName
- }
- type HostResource struct {
- IResource
- host jsonutils.JSONObject
- totalMemSize float64
- cpuCount int64
- }
- func newHostResource(host jsonutils.JSONObject) (*HostResource, error) {
- res, err := newResource(host)
- if err != nil {
- return nil, errors.Wrap(err, "newResource")
- }
- memSize, err := host.Int("mem_size")
- if err != nil {
- return nil, errors.Wrap(err, "get mem_size")
- }
- cpuCount, err := host.Int("cpu_count")
- if err != nil {
- return nil, errors.Wrap(err, "get cpu_count")
- }
- return &HostResource{
- IResource: res,
- host: host,
- totalMemSize: float64(memSize * 1024 * 1024),
- cpuCount: cpuCount,
- }, nil
- }
- func (h *HostResource) GetHostResource() *HostResource {
- return h
- }
|