storage_predicate.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package guest
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "yunion.io/x/pkg/errors"
  20. "yunion.io/x/pkg/tristate"
  21. "yunion.io/x/pkg/util/sets"
  22. "yunion.io/x/pkg/utils"
  23. "yunion.io/x/onecloud/pkg/apis/compute"
  24. "yunion.io/x/onecloud/pkg/scheduler/algorithm/predicates"
  25. "yunion.io/x/onecloud/pkg/scheduler/cache/candidate"
  26. "yunion.io/x/onecloud/pkg/scheduler/core"
  27. )
  28. // StoragePredicate used to filter whether the storage capacity of the
  29. // current candidate matches the type of the disk. If not matched, the
  30. // storage capacity will be set to 0.
  31. type StoragePredicate struct {
  32. predicates.BasePredicate
  33. }
  34. func (p *StoragePredicate) Name() string {
  35. return "host_storage"
  36. }
  37. func (p *StoragePredicate) Clone() core.FitPredicate {
  38. return &StoragePredicate{}
  39. }
  40. func (p *StoragePredicate) PreExecute(ctx context.Context, u *core.Unit, cs []core.Candidater) (bool, error) {
  41. driver := u.GetHypervisorDriver()
  42. if driver != nil && !driver.DoScheduleStorageFilter() {
  43. return false, nil
  44. }
  45. if u.SchedData().ResetCpuNumaPin {
  46. return false, nil
  47. }
  48. return true, nil
  49. }
  50. type diskSizeRequest struct {
  51. backend string
  52. max int64
  53. total int64
  54. disks []*compute.DiskConfig
  55. }
  56. func newDiskSizeRequest(backend string) *diskSizeRequest {
  57. return &diskSizeRequest{
  58. backend: backend,
  59. max: -1,
  60. total: 0,
  61. disks: make([]*compute.DiskConfig, 0),
  62. }
  63. }
  64. func (req *diskSizeRequest) GetMax() int64 {
  65. return req.max
  66. }
  67. func (req *diskSizeRequest) GetTotal() int64 {
  68. return req.total
  69. }
  70. func (req *diskSizeRequest) Add(disk *compute.DiskConfig) *diskSizeRequest {
  71. req.total += int64(disk.SizeMb)
  72. if req.max < int64(disk.SizeMb) {
  73. req.max = int64(disk.SizeMb)
  74. }
  75. req.disks = append(req.disks, disk)
  76. return req
  77. }
  78. func (req *diskSizeRequest) NewByMediumType(mt string) *diskSizeRequest {
  79. newReq := newDiskSizeRequest(req.backend)
  80. for _, d := range req.disks {
  81. if d.Medium == mt {
  82. newReq.Add(d)
  83. }
  84. }
  85. return newReq
  86. }
  87. type diskBackendSizeRequest struct {
  88. reqs map[string]*diskSizeRequest
  89. // beMdm is a map that use backend as key and medium types as value
  90. beMdm map[string]sets.String
  91. }
  92. func newDiskBackendSizeRequest() *diskBackendSizeRequest {
  93. return &diskBackendSizeRequest{
  94. reqs: make(map[string]*diskSizeRequest),
  95. beMdm: make(map[string]sets.String),
  96. }
  97. }
  98. func (ds *diskBackendSizeRequest) get(backend string) (*diskSizeRequest, bool) {
  99. req, ok := ds.reqs[backend]
  100. return req, ok
  101. }
  102. func (ds *diskBackendSizeRequest) set(backend string, req *diskSizeRequest) *diskBackendSizeRequest {
  103. ds.reqs[backend] = req
  104. return ds
  105. }
  106. func (ds *diskBackendSizeRequest) Add(disk *compute.DiskConfig) *diskBackendSizeRequest {
  107. backend := disk.Backend
  108. req, ok := ds.get(backend)
  109. if !ok {
  110. req = newDiskSizeRequest(backend)
  111. }
  112. req.Add(disk)
  113. ds.set(backend, req)
  114. mds, ok := ds.beMdm[backend]
  115. if !ok {
  116. mds = sets.NewString()
  117. }
  118. mds.Insert(disk.Medium)
  119. ds.beMdm[backend] = mds
  120. return ds
  121. }
  122. func (ds *diskBackendSizeRequest) GetBackendMediumMap() map[string]sets.String {
  123. return ds.beMdm
  124. }
  125. func (ds *diskBackendSizeRequest) Get(backend string, mediumType string) (*diskSizeRequest, error) {
  126. req, ok := ds.get(backend)
  127. if !ok {
  128. return nil, errors.Errorf("Not found diskBackendSizeRequest by backend %q", backend)
  129. }
  130. return req.NewByMediumType(mediumType), nil
  131. }
  132. func (p *StoragePredicate) Execute(ctx context.Context, u *core.Unit, c core.Candidater) (bool, []core.PredicateFailureReason, error) {
  133. h := predicates.NewPredicateHelper(p, u, c)
  134. d := u.SchedData()
  135. getter := c.Getter()
  136. storages := getter.Storages()
  137. isMigrate := func() bool {
  138. return len(d.HostId) > 0
  139. }
  140. isLocalhostBackend := func(backend string) bool {
  141. return utils.IsLocalStorage(backend)
  142. }
  143. isStorageAccessible := func(storage string) bool {
  144. for _, s := range storages {
  145. if storage == s.Id || storage == s.Name {
  146. return true
  147. }
  148. }
  149. return false
  150. }
  151. type storageCapacity struct {
  152. capacity int64
  153. free int64
  154. isActual bool
  155. }
  156. newStorageCapacity := func(capacity int64, free int64, isActual bool) *storageCapacity {
  157. return &storageCapacity{
  158. capacity: capacity,
  159. free: free,
  160. isActual: isActual,
  161. }
  162. }
  163. getStorageCapacity := func(backend string, mediumType string, reqMaxSize int64, reqTotalSize int64, useRsvd bool) (*storageCapacity, *storageCapacity, error) {
  164. totalFree, actualFree, err := getter.GetFreeStorageSizeOfType(backend, mediumType, useRsvd, reqMaxSize)
  165. if err != nil {
  166. return nil, nil, err
  167. }
  168. reqTotalSize = utils.Max(reqTotalSize, 1)
  169. capacity := totalFree / reqTotalSize
  170. actualCapacity := actualFree / reqTotalSize
  171. return newStorageCapacity(capacity, totalFree, false), newStorageCapacity(actualCapacity, actualFree, true), nil
  172. }
  173. getReqSizeStr := func(backend string) string {
  174. ss := make([]string, 0, len(d.Disks))
  175. for _, disk := range d.Disks {
  176. if disk.Backend == backend {
  177. ss = append(ss, fmt.Sprintf("%v", disk.SizeMb))
  178. }
  179. }
  180. return strings.Join(ss, "+")
  181. }
  182. getStorageFreeStr := func(backend string, mediumType string, useRsvd bool, isActual bool) string {
  183. ss := []string{}
  184. for _, s := range getter.Storages() {
  185. if candidate.IsStorageBackendMediumMatch(s, backend, mediumType) {
  186. if isActual {
  187. total := s.Capacity
  188. free := total - s.ActualCapacityUsed
  189. ss = append(ss, fmt.Sprintf("storage %q, actual_total:%d - actual_used:%d = free:%d", s.GetName(), total, s.ActualCapacityUsed, free))
  190. } else {
  191. total := int64(float32(s.Capacity) * s.Cmtbound)
  192. used := s.GetUsedCapacity(tristate.True)
  193. waste := s.GetUsedCapacity(tristate.False)
  194. free := total - int64(used) - int64(waste)
  195. ss = append(ss, fmt.Sprintf("storage %q, total:%d - used:%d - waste:%d = free:%d", s.GetName(), total, used, waste, free))
  196. }
  197. }
  198. }
  199. return strings.Join(ss, " + ")
  200. }
  201. sizeRequest := newDiskBackendSizeRequest()
  202. storeRequest := make(map[string]int64, 0)
  203. for _, disk := range d.Disks {
  204. if isMigrate() && !isLocalhostBackend(disk.Backend) {
  205. storeRequest[disk.Storage] = 1
  206. } else if len(disk.DiskId) > 0 && len(disk.Storage) > 0 {
  207. // server attach to an existing disk
  208. storeRequest[disk.Storage] = 1
  209. } else if !isMigrate() || (isMigrate() && isLocalhostBackend(disk.Backend)) {
  210. // if migrate, only local storage need check capacity constraint
  211. sizeRequest.Add(disk)
  212. }
  213. }
  214. for store := range storeRequest {
  215. if !isStorageAccessible(store) {
  216. h.Exclude(fmt.Sprintf("storage %v not accessible", store))
  217. return h.GetResult()
  218. }
  219. }
  220. useRsvd := h.UseReserved()
  221. minCapacity := int64(0xFFFFFFFF)
  222. appendFailMsg := func(backend string, mediumType string, req *diskSizeRequest, useRsvd bool, capacity *storageCapacity) {
  223. reqStr := fmt.Sprintf("no enough backend %q, mediumType %q storage, req=%v(%v)", backend, mediumType, req.GetTotal(), getReqSizeStr(backend))
  224. freePrex := "free"
  225. isActual := capacity.isActual
  226. if isActual {
  227. freePrex = "actual_free"
  228. }
  229. freeStr := fmt.Sprintf("%s=%v(%v)", freePrex, capacity.free, getStorageFreeStr(backend, mediumType, useRsvd, isActual))
  230. msg := reqStr + ", " + freeStr
  231. h.AppendPredicateFailMsg(msg)
  232. }
  233. for be, mds := range sizeRequest.GetBackendMediumMap() {
  234. for _, medium := range mds.List() {
  235. req, err := sizeRequest.Get(be, medium)
  236. if err != nil {
  237. h.Exclude(fmt.Sprintf("get request size by backend %q, medium %q: %v", be, medium, err))
  238. return h.GetResult()
  239. }
  240. capacity, actualCapacity, err := getStorageCapacity(be, medium, req.GetMax(), req.GetTotal(), useRsvd)
  241. if err != nil {
  242. h.Exclude(err.Error())
  243. return h.GetResult()
  244. }
  245. tmpCap := utils.Min(capacity.capacity, actualCapacity.capacity)
  246. if capacity.capacity <= 0 {
  247. appendFailMsg(be, medium, req, useRsvd, capacity)
  248. } else if actualCapacity.capacity <= 0 {
  249. appendFailMsg(be, medium, req, useRsvd, actualCapacity)
  250. }
  251. minCapacity = utils.Min(minCapacity, tmpCap)
  252. }
  253. }
  254. h.SetCapacity(minCapacity)
  255. return h.GetResult()
  256. }