| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package models
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- api "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
- "yunion.io/x/onecloud/pkg/cloudcommon/etcd"
- "yunion.io/x/onecloud/pkg/cloudmon/misc"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- )
- var hostHealthChecker *SHostHealthChecker
- type SHostHealthChecker struct {
- // etcd client
- cli *etcd.SEtcdClient
- // time of wait host reconnect
- timeout time.Duration
- // hosts chan
- hc *sync.Map
- }
- func hostKey(hostname string) string {
- return fmt.Sprintf("%s/%s", api.HOST_HEALTH_PREFIX, hostname)
- }
- func InitHostHealthChecker(cli *etcd.SEtcdClient, timeout int) *SHostHealthChecker {
- if hostHealthChecker != nil {
- return hostHealthChecker
- }
- hostHealthChecker = &SHostHealthChecker{
- cli: cli,
- timeout: time.Duration(timeout) * time.Second,
- hc: new(sync.Map),
- }
- return hostHealthChecker
- }
- func (h *SHostHealthChecker) StartHostsHealthCheck(ctx context.Context) error {
- log.Infof("Start host health check......")
- return h.startHealthCheck(ctx)
- }
- func (h *SHostHealthChecker) load(hostname string) chan struct{} {
- v, _ := h.hc.Load(hostname)
- return v.(chan struct{})
- }
- func (h *SHostHealthChecker) startHealthCheck(ctx context.Context) error {
- q := HostManager.Query().IsTrue("enabled").Equals("host_type", api.HOST_TYPE_HYPERVISOR)
- rows, err := q.Rows()
- if err != nil {
- log.Errorf("HostHealth check Query hosts %s", err)
- return err
- }
- defer rows.Close()
- for rows.Next() {
- host := new(SHost)
- err = q.Row2Struct(rows, host)
- if err != nil {
- return errors.Wrap(err, "q.Row2Struct")
- }
- host.SetModelManager(HostManager, host)
- err = h.startWatcher(ctx, host.GetHostnameByName())
- if err != nil {
- return errors.Wrap(err, "startWatcher")
- }
- }
- return nil
- }
- func (h *SHostHealthChecker) startWatcher(ctx context.Context, hostname string) error {
- log.Infof("Start watch host %s", hostname)
- var key = hostKey(hostname)
- if _, ok := h.hc.Load(hostname); !ok {
- h.hc.Store(hostname, make(chan struct{}))
- }
- if err := h.cli.Watch(
- ctx, key,
- h.onHostOnlineCreated(ctx, hostname),
- h.onHostOnlineModified(ctx, hostname),
- h.onHostOfflineDeleted(ctx, hostname),
- ); err != nil {
- return err
- }
- // watched key not found, wait 60s(default) and do onHostUnhealthy
- _, err := h.cli.Get(ctx, key)
- if err == etcd.ErrNoSuchKey {
- log.Warningf("No such key %s", hostname)
- go func() {
- select {
- case <-time.NewTimer(h.timeout).C:
- h.onHostUnhealthy(ctx, hostname)
- case <-h.load(hostname):
- if _err := h.startWatcher(ctx, hostname); _err != nil {
- log.Errorf("failed start watcher %s", _err)
- }
- case <-ctx.Done():
- log.Infof("exit watch host %s", hostname)
- }
- }()
- return nil
- }
- return err
- }
- func (h *SHostHealthChecker) onHostUnhealthy(ctx context.Context, hostname string) {
- lockman.LockRawObject(ctx, api.HOST_HEALTH_LOCK_PREFIX, hostname)
- defer lockman.ReleaseRawObject(ctx, api.HOST_HEALTH_LOCK_PREFIX, hostname)
- host := HostManager.FetchHostByHostname(hostname)
- if host != nil {
- pingRes, err := misc.Ping([]string{host.AccessIp}, 3, 10, false)
- if err != nil {
- log.Errorf("failed ping dest host %s", hostname)
- return
- }
- if ps := pingRes[host.AccessIp]; ps.Loss() < 100 {
- log.Infof("ping host %s access ip %s succeed %s, skip host down", hostname, host.AccessIp, ps)
- } else {
- log.Errorf("ping host %s access ip %s failed %s, host down", hostname, host.AccessIp, ps)
- host.OnHostDown(ctx, auth.AdminCredential())
- }
- }
- }
- func (h *SHostHealthChecker) onHostOnlineCreated(ctx context.Context, hostname string) etcd.TEtcdCreateEventFunc {
- return func(ctx context.Context, key, value []byte) {
- log.Infof("Got host online %s", hostname)
- if v, ok := h.hc.Load(hostname); ok {
- c := v.(chan struct{})
- c <- struct{}{}
- }
- }
- }
- func (h *SHostHealthChecker) processHostOffline(ctx context.Context, hostname string) {
- log.Warningf("host %s disconnect with etcd", hostname)
- go func() {
- select {
- case <-time.NewTimer(h.timeout).C:
- h.onHostUnhealthy(ctx, hostname)
- case <-h.load(hostname):
- if err := h.startWatcher(ctx, hostname); err != nil {
- log.Errorf("failed start watcher %s", err)
- }
- }
- }()
- }
- func (h *SHostHealthChecker) onHostOnlineModified(ctx context.Context, hostname string) etcd.TEtcdModifyEventFunc {
- return func(ctx context.Context, key, oldvalue, value []byte) {
- log.Infof("watch host key modified %s %s %s", key, oldvalue, value)
- h.onHostOnlineCreated(ctx, hostname)
- }
- }
- func (h *SHostHealthChecker) onHostOfflineDeleted(ctx context.Context, hostname string) etcd.TEtcdDeleteEventFunc {
- return func(ctx context.Context, key []byte) {
- log.Errorf("watch host key deleled %s", key)
- h.processHostOffline(ctx, hostname)
- }
- }
- func (h *SHostHealthChecker) WatchHost(ctx context.Context, hostname string) error {
- h.onHostOnlineCreated(ctx, hostname)
- h.cli.Unwatch(hostKey(hostname))
- return h.startWatcher(ctx, hostname)
- }
- func (h *SHostHealthChecker) UnwatchHost(ctx context.Context, hostname string) {
- log.Infof("Unwatch host %s", hostname)
- h.cli.Unwatch(hostKey(hostname))
- h.hc.Delete(hostname)
- }
|