host_health.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "fmt"
  18. "sync"
  19. "time"
  20. "yunion.io/x/log"
  21. "yunion.io/x/pkg/errors"
  22. api "yunion.io/x/onecloud/pkg/apis/compute"
  23. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  24. "yunion.io/x/onecloud/pkg/cloudcommon/etcd"
  25. "yunion.io/x/onecloud/pkg/cloudmon/misc"
  26. "yunion.io/x/onecloud/pkg/mcclient/auth"
  27. )
  28. var hostHealthChecker *SHostHealthChecker
  29. type SHostHealthChecker struct {
  30. // etcd client
  31. cli *etcd.SEtcdClient
  32. // time of wait host reconnect
  33. timeout time.Duration
  34. // hosts chan
  35. hc *sync.Map
  36. }
  37. func hostKey(hostname string) string {
  38. return fmt.Sprintf("%s/%s", api.HOST_HEALTH_PREFIX, hostname)
  39. }
  40. func InitHostHealthChecker(cli *etcd.SEtcdClient, timeout int) *SHostHealthChecker {
  41. if hostHealthChecker != nil {
  42. return hostHealthChecker
  43. }
  44. hostHealthChecker = &SHostHealthChecker{
  45. cli: cli,
  46. timeout: time.Duration(timeout) * time.Second,
  47. hc: new(sync.Map),
  48. }
  49. return hostHealthChecker
  50. }
  51. func (h *SHostHealthChecker) StartHostsHealthCheck(ctx context.Context) error {
  52. log.Infof("Start host health check......")
  53. return h.startHealthCheck(ctx)
  54. }
  55. func (h *SHostHealthChecker) load(hostname string) chan struct{} {
  56. v, _ := h.hc.Load(hostname)
  57. return v.(chan struct{})
  58. }
  59. func (h *SHostHealthChecker) startHealthCheck(ctx context.Context) error {
  60. q := HostManager.Query().IsTrue("enabled").Equals("host_type", api.HOST_TYPE_HYPERVISOR)
  61. rows, err := q.Rows()
  62. if err != nil {
  63. log.Errorf("HostHealth check Query hosts %s", err)
  64. return err
  65. }
  66. defer rows.Close()
  67. for rows.Next() {
  68. host := new(SHost)
  69. err = q.Row2Struct(rows, host)
  70. if err != nil {
  71. return errors.Wrap(err, "q.Row2Struct")
  72. }
  73. host.SetModelManager(HostManager, host)
  74. err = h.startWatcher(ctx, host.GetHostnameByName())
  75. if err != nil {
  76. return errors.Wrap(err, "startWatcher")
  77. }
  78. }
  79. return nil
  80. }
  81. func (h *SHostHealthChecker) startWatcher(ctx context.Context, hostname string) error {
  82. log.Infof("Start watch host %s", hostname)
  83. var key = hostKey(hostname)
  84. if _, ok := h.hc.Load(hostname); !ok {
  85. h.hc.Store(hostname, make(chan struct{}))
  86. }
  87. if err := h.cli.Watch(
  88. ctx, key,
  89. h.onHostOnlineCreated(ctx, hostname),
  90. h.onHostOnlineModified(ctx, hostname),
  91. h.onHostOfflineDeleted(ctx, hostname),
  92. ); err != nil {
  93. return err
  94. }
  95. // watched key not found, wait 60s(default) and do onHostUnhealthy
  96. _, err := h.cli.Get(ctx, key)
  97. if err == etcd.ErrNoSuchKey {
  98. log.Warningf("No such key %s", hostname)
  99. go func() {
  100. select {
  101. case <-time.NewTimer(h.timeout).C:
  102. h.onHostUnhealthy(ctx, hostname)
  103. case <-h.load(hostname):
  104. if _err := h.startWatcher(ctx, hostname); _err != nil {
  105. log.Errorf("failed start watcher %s", _err)
  106. }
  107. case <-ctx.Done():
  108. log.Infof("exit watch host %s", hostname)
  109. }
  110. }()
  111. return nil
  112. }
  113. return err
  114. }
  115. func (h *SHostHealthChecker) onHostUnhealthy(ctx context.Context, hostname string) {
  116. lockman.LockRawObject(ctx, api.HOST_HEALTH_LOCK_PREFIX, hostname)
  117. defer lockman.ReleaseRawObject(ctx, api.HOST_HEALTH_LOCK_PREFIX, hostname)
  118. host := HostManager.FetchHostByHostname(hostname)
  119. if host != nil {
  120. pingRes, err := misc.Ping([]string{host.AccessIp}, 3, 10, false)
  121. if err != nil {
  122. log.Errorf("failed ping dest host %s", hostname)
  123. return
  124. }
  125. if ps := pingRes[host.AccessIp]; ps.Loss() < 100 {
  126. log.Infof("ping host %s access ip %s succeed %s, skip host down", hostname, host.AccessIp, ps)
  127. } else {
  128. log.Errorf("ping host %s access ip %s failed %s, host down", hostname, host.AccessIp, ps)
  129. host.OnHostDown(ctx, auth.AdminCredential())
  130. }
  131. }
  132. }
  133. func (h *SHostHealthChecker) onHostOnlineCreated(ctx context.Context, hostname string) etcd.TEtcdCreateEventFunc {
  134. return func(ctx context.Context, key, value []byte) {
  135. log.Infof("Got host online %s", hostname)
  136. if v, ok := h.hc.Load(hostname); ok {
  137. c := v.(chan struct{})
  138. c <- struct{}{}
  139. }
  140. }
  141. }
  142. func (h *SHostHealthChecker) processHostOffline(ctx context.Context, hostname string) {
  143. log.Warningf("host %s disconnect with etcd", hostname)
  144. go func() {
  145. select {
  146. case <-time.NewTimer(h.timeout).C:
  147. h.onHostUnhealthy(ctx, hostname)
  148. case <-h.load(hostname):
  149. if err := h.startWatcher(ctx, hostname); err != nil {
  150. log.Errorf("failed start watcher %s", err)
  151. }
  152. }
  153. }()
  154. }
  155. func (h *SHostHealthChecker) onHostOnlineModified(ctx context.Context, hostname string) etcd.TEtcdModifyEventFunc {
  156. return func(ctx context.Context, key, oldvalue, value []byte) {
  157. log.Infof("watch host key modified %s %s %s", key, oldvalue, value)
  158. h.onHostOnlineCreated(ctx, hostname)
  159. }
  160. }
  161. func (h *SHostHealthChecker) onHostOfflineDeleted(ctx context.Context, hostname string) etcd.TEtcdDeleteEventFunc {
  162. return func(ctx context.Context, key []byte) {
  163. log.Errorf("watch host key deleled %s", key)
  164. h.processHostOffline(ctx, hostname)
  165. }
  166. }
  167. func (h *SHostHealthChecker) WatchHost(ctx context.Context, hostname string) error {
  168. h.onHostOnlineCreated(ctx, hostname)
  169. h.cli.Unwatch(hostKey(hostname))
  170. return h.startWatcher(ctx, hostname)
  171. }
  172. func (h *SHostHealthChecker) UnwatchHost(ctx context.Context, hostname string) {
  173. log.Infof("Unwatch host %s", hostname)
  174. h.cli.Unwatch(hostKey(hostname))
  175. h.hc.Delete(hostname)
  176. }