default.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package candidate
  15. import (
  16. "fmt"
  17. "reflect"
  18. gosync "sync"
  19. "time"
  20. "yunion.io/x/log"
  21. u "yunion.io/x/pkg/utils"
  22. "yunion.io/x/onecloud/pkg/scheduler/cache"
  23. "yunion.io/x/onecloud/pkg/scheduler/options"
  24. )
  25. const (
  26. CacheKind = "CandidateCache"
  27. HostCandidateCache = "Hosts"
  28. BaremetalCandidateCache = "Baremetals"
  29. HostDescBuilder = HostCandidateCache
  30. BaremetalDescBuilder = BaremetalCandidateCache
  31. )
  32. func defaultCadidateItems() []cache.CachedItem {
  33. return []cache.CachedItem{
  34. newHostCache(),
  35. newBaremetalCache(),
  36. }
  37. }
  38. func uuidKey(obj interface{}) (string, error) {
  39. return obj.(descer).GetId(), nil
  40. }
  41. func generalUpdateFunc(act BuildActor, mutex *gosync.Mutex) cache.UpdateFunc {
  42. return func(ids []string) ([]interface{}, error) {
  43. mutex.Lock()
  44. defer mutex.Unlock()
  45. newAct := act.Clone()
  46. builder := NewDescBuilder(newAct)
  47. descs, err := builder.Build(ids)
  48. if err != nil {
  49. return nil, err
  50. }
  51. return descs, nil
  52. }
  53. }
  54. func generalLoadFunc(act BuildActor, mutex *gosync.Mutex) cache.LoadFunc {
  55. return func() ([]interface{}, error) {
  56. mutex.Lock()
  57. defer mutex.Unlock()
  58. newAct := act.Clone()
  59. builder := NewDescBuilder(newAct)
  60. ids, err := act.AllIDs()
  61. if err != nil {
  62. return nil, err
  63. }
  64. descs, err := builder.Build(ids)
  65. if err != nil {
  66. return nil, err
  67. }
  68. return descs, nil
  69. }
  70. }
  71. // generalGetUpdateFunc provides the ability to generate regularly updated data.
  72. func generalGetUpdateFunc(isBaremetal bool) cache.GetUpdateFunc {
  73. // The purpose of the counter is to update the data in full at regular intervals.
  74. fullUpdateCounter := 0
  75. return func(d []interface{}) ([]string, error) {
  76. // Full update every 10 minutes(30s * 20)
  77. if isBaremetal && fullUpdateCounter >= options.Options.BaremetalCandidateCacheReloadCount {
  78. fullUpdateCounter = 1
  79. log.Infof("FullUpdateCounter: %d, update all baremetals.", fullUpdateCounter)
  80. return nil, nil
  81. }
  82. if !isBaremetal && fullUpdateCounter >= options.Options.HostCandidateCacheReloadCount {
  83. fullUpdateCounter = 1
  84. log.Infof("FullUpdateCounter: %d, update all hosts.", fullUpdateCounter)
  85. return nil, nil
  86. }
  87. allStatus := make(map[string]time.Time, len(d))
  88. // This will reflect the key `ID` and `UpdatedAt`,maybe one day can optimize this part.
  89. for _, item := range d {
  90. r := reflect.ValueOf(item)
  91. f := reflect.Indirect(r)
  92. key := f.FieldByName("Id")
  93. if !key.IsValid() {
  94. key = f.FieldByName("ID")
  95. }
  96. value := f.FieldByName("UpdatedAt")
  97. if key.IsValid() && value.IsValid() {
  98. allStatus[key.String()] = value.Interface().(time.Time)
  99. } else {
  100. log.Errorf("get `ID` and `UpdatedAt` errror in host:%v\n", item)
  101. }
  102. }
  103. fullUpdateCounter++
  104. modified, err := FetchHostsUpdateStatus(isBaremetal)
  105. if err != nil {
  106. return nil, err
  107. }
  108. modifiedIds := make([]string, 0, len(modified))
  109. // Aggregate the updated hosts
  110. for _, status := range modified {
  111. // If host does not exist[ok=false] or has updated will be in update list.
  112. if t, ok := allStatus[status.Id]; !ok || !t.Equal(status.UpdatedAt) {
  113. modifiedIds = append(modifiedIds, status.Id)
  114. }
  115. }
  116. if len(modifiedIds) == 0 {
  117. return nil, fmt.Errorf("%s", "no need update all")
  118. }
  119. return modifiedIds, nil
  120. }
  121. }
  122. func newHostCache() cache.CachedItem {
  123. mutex := new(gosync.Mutex)
  124. update := generalUpdateFunc(newHostBuilder(), mutex)
  125. load := generalLoadFunc(newHostBuilder(), mutex)
  126. getUpdate := generalGetUpdateFunc(false)
  127. item := new(candidateItem)
  128. item.CachedItem = cache.NewCacheItem(
  129. HostCandidateCache,
  130. u.ToDuration(options.Options.HostCandidateCacheTTL),
  131. u.ToDuration(options.Options.HostCandidateCachePeriod),
  132. uuidKey,
  133. update,
  134. load,
  135. getUpdate,
  136. )
  137. return item
  138. }
  139. func newBaremetalCache() cache.CachedItem {
  140. // The mutex solves the possible dirty data asked lead to over-commit.
  141. mutex := new(gosync.Mutex)
  142. update := generalUpdateFunc(newBaremetalBuilder(), mutex)
  143. load := generalLoadFunc(newBaremetalBuilder(), mutex)
  144. getUpdate := generalGetUpdateFunc(true)
  145. item := new(candidateItem)
  146. item.CachedItem = cache.NewCacheItem(
  147. BaremetalCandidateCache,
  148. u.ToDuration(options.Options.BaremetalCandidateCacheTTL),
  149. u.ToDuration(options.Options.BaremetalCandidateCachePeriod),
  150. uuidKey,
  151. update,
  152. load,
  153. getUpdate,
  154. )
  155. return item
  156. }