pending_usage.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "fmt"
  18. "runtime/debug"
  19. "sync"
  20. "time"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
  24. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  25. "yunion.io/x/onecloud/pkg/cloudcommon/db/quotas"
  26. computemodels "yunion.io/x/onecloud/pkg/compute/models"
  27. "yunion.io/x/onecloud/pkg/scheduler/api"
  28. "yunion.io/x/onecloud/pkg/scheduler/options"
  29. )
  30. var HostPendingUsageManager *SHostPendingUsageManager
  31. type SHostPendingUsageManager struct {
  32. store *SHostMemoryPendingUsageStore
  33. }
  34. func init() {
  35. pendingStore := NewHostMemoryPendingUsageStore()
  36. HostPendingUsageManager = &SHostPendingUsageManager{
  37. store: pendingStore,
  38. }
  39. }
  40. func (m *SHostPendingUsageManager) Keyword() string {
  41. return "pending_usage_manager"
  42. }
  43. func (m *SHostPendingUsageManager) newSessionUsage(req *api.SchedInfo, hostId string, candidate *schedapi.CandidateResource) *SessionPendingUsage {
  44. usage := NewPendingUsageBySchedInfo(hostId, req, candidate)
  45. su := NewSessionUsage(req.SessionId, hostId, usage)
  46. return su
  47. }
  48. func (m *SHostPendingUsageManager) GetPendingUsage(hostId string) (*SPendingUsage, error) {
  49. return m.getPendingUsage(hostId)
  50. }
  51. func (m *SHostPendingUsageManager) GetNetPendingUsage(netId string) int {
  52. return m.store.GetNetPendingUsage(netId)
  53. }
  54. func (m *SHostPendingUsageManager) getPendingUsage(hostId string) (*SPendingUsage, error) {
  55. pending, err := m.store.GetPendingUsage(hostId)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return pending, nil
  60. }
  61. func (m *SHostPendingUsageManager) GetSessionUsage(sessionId, hostId string) (*SessionPendingUsage, error) {
  62. return m.store.GetSessionUsage(sessionId, hostId)
  63. }
  64. func (m *SHostPendingUsageManager) AddPendingUsage(guestId string, req *api.SchedInfo, candidate *schedapi.CandidateResource) {
  65. hostId := candidate.HostId
  66. sessionUsage, _ := m.GetSessionUsage(req.SessionId, hostId)
  67. if sessionUsage == nil {
  68. sessionUsage = m.newSessionUsage(req, hostId, candidate)
  69. sessionUsage.StartTimer()
  70. }
  71. m.addSessionUsage(candidate.HostId, guestId, sessionUsage)
  72. if candidate.BackupCandidate != nil {
  73. m.AddPendingUsage(guestId, req, candidate.BackupCandidate)
  74. }
  75. }
  76. // addSessionUsage add pending usage and session usage
  77. func (m *SHostPendingUsageManager) addSessionUsage(hostId, guestId string, usage *SessionPendingUsage) {
  78. ctx := context.Background()
  79. lockman.LockClass(ctx, m, hostId)
  80. defer lockman.ReleaseClass(ctx, m, hostId)
  81. pendingUsage, _ := m.getPendingUsage(hostId)
  82. if pendingUsage == nil {
  83. pendingUsage = NewPendingUsageBySchedInfo(hostId, nil, nil)
  84. }
  85. // add pending usage
  86. pendingUsage.Add(usage.Usage, guestId)
  87. usage.AddCount(guestId)
  88. m.store.SetSessionUsage(usage.SessionId, hostId, usage)
  89. m.store.SetPendingUsage(hostId, pendingUsage)
  90. }
  91. func (m *SHostPendingUsageManager) CancelPendingUsage(hostId string, su *SessionPendingUsage) error {
  92. ctx := context.Background()
  93. lockman.LockClass(ctx, m, hostId)
  94. defer lockman.ReleaseClass(ctx, m, hostId)
  95. pendingUsage, _ := m.getPendingUsage(hostId)
  96. if pendingUsage == nil {
  97. return nil
  98. }
  99. if su == nil {
  100. return nil
  101. }
  102. pendingUsage.Sub(su.Usage)
  103. m.store.SetPendingUsage(hostId, pendingUsage)
  104. su.SubCount()
  105. return nil
  106. }
  107. func (m *SHostPendingUsageManager) DeleteSessionUsage(usage *SessionPendingUsage) {
  108. m.store.DeleteSessionUsage(usage)
  109. }
  110. type SHostMemoryPendingUsageStore struct {
  111. store *sync.Map
  112. }
  113. func NewHostMemoryPendingUsageStore() *SHostMemoryPendingUsageStore {
  114. return &SHostMemoryPendingUsageStore{
  115. store: new(sync.Map),
  116. }
  117. }
  118. func (self *SHostMemoryPendingUsageStore) sessionUsageKey(sid, hostId string) string {
  119. return fmt.Sprintf("%s-%s", sid, hostId)
  120. }
  121. func (self *SHostMemoryPendingUsageStore) GetSessionUsage(sessionId string, hostId string) (*SessionPendingUsage, error) {
  122. key := self.sessionUsageKey(sessionId, hostId)
  123. ret, ok := self.store.Load(key)
  124. if !ok {
  125. return nil, errors.Errorf("Not fond session pending usage by %s", key)
  126. }
  127. return ret.(*SessionPendingUsage), nil
  128. }
  129. func (self *SHostMemoryPendingUsageStore) SetSessionUsage(sessionId, hostId string, usage *SessionPendingUsage) {
  130. key := self.sessionUsageKey(sessionId, hostId)
  131. self.store.Store(key, usage)
  132. }
  133. func (self *SHostMemoryPendingUsageStore) GetPendingUsage(hostId string) (*SPendingUsage, error) {
  134. ret, ok := self.store.Load(hostId)
  135. if !ok {
  136. return nil, errors.Errorf("Not fond pending usage by %s", hostId)
  137. }
  138. usage := ret.(*SPendingUsage)
  139. return usage, nil
  140. }
  141. func (self *SHostMemoryPendingUsageStore) SetPendingUsage(hostId string, usage *SPendingUsage) {
  142. if usage.IsEmpty() {
  143. self.store.Delete(hostId)
  144. return
  145. }
  146. self.store.Store(hostId, usage)
  147. }
  148. func (self *SHostMemoryPendingUsageStore) DeleteSessionUsage(usage *SessionPendingUsage) {
  149. self.store.Delete(self.sessionUsageKey(usage.SessionId, usage.Usage.HostId))
  150. }
  151. func (self *SHostMemoryPendingUsageStore) GetNetPendingUsage(id string) int {
  152. total := 0
  153. self.store.Range(func(hostId, usageObj interface{}) bool {
  154. usage, ok := usageObj.(*SPendingUsage)
  155. if ok {
  156. total += usage.NetUsage.Get(id)
  157. }
  158. return true
  159. })
  160. return total
  161. }
  162. type SessionPendingUsage struct {
  163. HostId string
  164. SessionId string
  165. Usage *SPendingUsage
  166. countLock *sync.Mutex
  167. count int
  168. cancelCh chan string
  169. }
  170. func NewSessionUsage(sid, hostId string, usage *SPendingUsage) *SessionPendingUsage {
  171. su := &SessionPendingUsage{
  172. HostId: hostId,
  173. SessionId: sid,
  174. Usage: usage,
  175. count: 0,
  176. countLock: new(sync.Mutex),
  177. cancelCh: make(chan string),
  178. }
  179. return su
  180. }
  181. func (su *SessionPendingUsage) GetHostId() string {
  182. return su.Usage.HostId
  183. }
  184. func (su *SessionPendingUsage) AddCount(guestId string) {
  185. su.countLock.Lock()
  186. defer su.countLock.Unlock()
  187. su.count++
  188. su.Usage.PendingGuestIds[guestId] = struct{}{}
  189. }
  190. func (su *SessionPendingUsage) SubCount() {
  191. su.countLock.Lock()
  192. defer su.countLock.Unlock()
  193. su.count--
  194. for guestId, _ := range su.Usage.PendingGuestIds {
  195. delete(su.Usage.PendingGuestIds, guestId)
  196. break
  197. }
  198. }
  199. type SResourcePendingUsage struct {
  200. store *sync.Map
  201. }
  202. func NewResourcePendingUsage(vals map[string]int) *SResourcePendingUsage {
  203. u := &SResourcePendingUsage{
  204. store: new(sync.Map),
  205. }
  206. for key, val := range vals {
  207. u.Set(key, val)
  208. }
  209. return u
  210. }
  211. func (u *SResourcePendingUsage) ToMap() map[string]int {
  212. ret := make(map[string]int)
  213. u.Range(func(key string, val int) bool {
  214. ret[key] = val
  215. return true
  216. })
  217. return ret
  218. }
  219. func (u *SResourcePendingUsage) Get(key string) int {
  220. val, ok := u.store.Load(key)
  221. if !ok {
  222. return 0
  223. }
  224. return val.(int)
  225. }
  226. func (u *SResourcePendingUsage) Set(key string, size int) {
  227. u.store.Store(key, size)
  228. }
  229. func (u *SResourcePendingUsage) Range(f func(key string, size int) bool) {
  230. u.store.Range(func(key, val interface{}) bool {
  231. return f(key.(string), val.(int))
  232. })
  233. }
  234. func (u *SResourcePendingUsage) Add(su *SResourcePendingUsage) {
  235. u.Range(func(key string, size int) bool {
  236. size2 := su.Get(key)
  237. u.Set(key, size+size2)
  238. return true
  239. })
  240. su.Range(func(key string, size int) bool {
  241. if _, ok := u.store.Load(key); !ok {
  242. u.Set(key, size)
  243. }
  244. return true
  245. })
  246. }
  247. func (u *SResourcePendingUsage) Sub(su *SResourcePendingUsage) {
  248. u.Range(func(key string, size int) bool {
  249. size2 := su.Get(key)
  250. u.Set(key, quotas.NonNegative(size-size2))
  251. return true
  252. })
  253. }
  254. func (u *SResourcePendingUsage) IsEmpty() bool {
  255. empty := true
  256. u.Range(func(_ string, size int) bool {
  257. if size != 0 {
  258. empty = false
  259. return false
  260. }
  261. return true
  262. })
  263. return empty
  264. }
  265. type SPendingUsage struct {
  266. HostId string
  267. Cpu int
  268. CpuPin map[int]int
  269. Memory int
  270. PendingGuestIds map[string]struct{}
  271. // nodeId: memSizeMB
  272. NumaMemPin map[int]int
  273. IsolatedDevice int
  274. DiskUsage *SResourcePendingUsage
  275. NetUsage *SResourcePendingUsage
  276. // Lock is not need here
  277. InstanceGroupUsage map[string]*api.CandidateGroup
  278. }
  279. func NewPendingUsageBySchedInfo(hostId string, req *api.SchedInfo, candidate *schedapi.CandidateResource) *SPendingUsage {
  280. u := &SPendingUsage{
  281. HostId: hostId,
  282. DiskUsage: NewResourcePendingUsage(nil),
  283. NetUsage: NewResourcePendingUsage(nil),
  284. PendingGuestIds: make(map[string]struct{}),
  285. }
  286. // group init
  287. u.InstanceGroupUsage = make(map[string]*api.CandidateGroup)
  288. u.CpuPin = make(map[int]int)
  289. u.NumaMemPin = make(map[int]int)
  290. if req == nil {
  291. return u
  292. }
  293. u.Cpu = req.Ncpu
  294. u.Memory = req.Memory
  295. u.IsolatedDevice = len(req.IsolatedDevices)
  296. if candidate != nil && len(candidate.CpuNumaPin) > 0 {
  297. for _, cpuNumaPin := range candidate.CpuNumaPin {
  298. if cpuNumaPin.MemSizeMB != nil {
  299. if v, ok := u.NumaMemPin[cpuNumaPin.NodeId]; ok {
  300. u.NumaMemPin[cpuNumaPin.NodeId] = v + *cpuNumaPin.MemSizeMB
  301. } else {
  302. u.NumaMemPin[cpuNumaPin.NodeId] = *cpuNumaPin.MemSizeMB
  303. }
  304. }
  305. for i := range cpuNumaPin.CpuPin {
  306. if v, ok := u.CpuPin[cpuNumaPin.CpuPin[i]]; ok {
  307. u.CpuPin[cpuNumaPin.CpuPin[i]] = v + 1
  308. } else {
  309. u.CpuPin[cpuNumaPin.CpuPin[i]] = 1
  310. }
  311. }
  312. }
  313. }
  314. for _, disk := range req.Disks {
  315. backend := disk.Backend
  316. size := disk.SizeMb
  317. osize := u.DiskUsage.Get(backend)
  318. u.DiskUsage.Set(backend, osize+size)
  319. }
  320. if candidate != nil && len(candidate.Nets) > 0 {
  321. for _, net := range candidate.Nets {
  322. // 只对建议 network_id 为1个的时候设置 pending_usage
  323. // 多个的情况下只有交给 region 那边自己判断
  324. // 这里只是尽让调度器提前判断出子网是否空闲 ip
  325. if len(net.NetworkIds) != 1 {
  326. continue
  327. }
  328. id := net.NetworkIds[0]
  329. ocount := u.NetUsage.Get(id)
  330. u.NetUsage.Set(id, ocount+1)
  331. }
  332. } else {
  333. for _, net := range req.Networks {
  334. id := net.Network
  335. if id == "" {
  336. continue
  337. }
  338. ocount := u.NetUsage.Get(id)
  339. u.NetUsage.Set(id, ocount+1)
  340. }
  341. }
  342. // group add
  343. for _, groupId := range req.InstanceGroupIds {
  344. // For now, info about instancegroup in api.SchedInfo is only "ID",
  345. // but in the future, info may increase
  346. group := &computemodels.SGroup{}
  347. group.Id = groupId
  348. u.InstanceGroupUsage[groupId] = &api.CandidateGroup{
  349. SGroup: group,
  350. ReferCount: 1,
  351. }
  352. }
  353. return u
  354. }
  355. func (self *SPendingUsage) ToMap() map[string]interface{} {
  356. return map[string]interface{}{
  357. "cpu": self.Cpu,
  358. "memory": self.Memory,
  359. "isolated_device": self.IsolatedDevice,
  360. "disk": self.DiskUsage.ToMap(),
  361. "net": self.NetUsage.ToMap(),
  362. "instance_groups": self.InstanceGroupUsage,
  363. }
  364. }
  365. func (self *SPendingUsage) Add(sUsage *SPendingUsage, addGuestId string) {
  366. self.Cpu = self.Cpu + sUsage.Cpu
  367. for k, v1 := range sUsage.CpuPin {
  368. if v2, ok := self.CpuPin[k]; ok {
  369. self.CpuPin[k] = v1 + v2
  370. } else {
  371. self.CpuPin[k] = v1
  372. }
  373. }
  374. for guestId := range sUsage.PendingGuestIds {
  375. if _, ok := self.PendingGuestIds[guestId]; !ok {
  376. log.Infof("add guest %s in pending usage", guestId)
  377. self.PendingGuestIds[guestId] = struct{}{}
  378. }
  379. }
  380. if addGuestId != "" {
  381. log.Infof("add guest %s in pending usage", addGuestId)
  382. self.PendingGuestIds[addGuestId] = struct{}{}
  383. }
  384. self.Memory = self.Memory + sUsage.Memory
  385. for k, v1 := range sUsage.NumaMemPin {
  386. if v2, ok := self.NumaMemPin[k]; ok {
  387. self.NumaMemPin[k] = v1 + v2
  388. } else {
  389. self.NumaMemPin[k] = v1
  390. }
  391. }
  392. self.IsolatedDevice = self.IsolatedDevice + sUsage.IsolatedDevice
  393. self.DiskUsage.Add(sUsage.DiskUsage)
  394. self.NetUsage.Add(sUsage.NetUsage)
  395. for id, cg := range sUsage.InstanceGroupUsage {
  396. if scg, ok := self.InstanceGroupUsage[id]; ok {
  397. scg.ReferCount += cg.ReferCount
  398. continue
  399. }
  400. self.InstanceGroupUsage[id] = cg
  401. }
  402. }
  403. func (self *SPendingUsage) Sub(sUsage *SPendingUsage) {
  404. self.Cpu = quotas.NonNegative(self.Cpu - sUsage.Cpu)
  405. for k, v1 := range sUsage.CpuPin {
  406. if v2, ok := self.CpuPin[k]; ok {
  407. self.CpuPin[k] = quotas.NonNegative(v2 - v1)
  408. }
  409. }
  410. for guestId := range sUsage.PendingGuestIds {
  411. log.Infof("delete guest %s in pending usage", guestId)
  412. delete(self.PendingGuestIds, guestId)
  413. }
  414. self.Memory = quotas.NonNegative(self.Memory - sUsage.Memory)
  415. for k, v1 := range sUsage.NumaMemPin {
  416. if v2, ok := self.NumaMemPin[k]; ok {
  417. self.NumaMemPin[k] = quotas.NonNegative(v2 - v1)
  418. }
  419. }
  420. self.IsolatedDevice = quotas.NonNegative(self.IsolatedDevice - sUsage.IsolatedDevice)
  421. self.DiskUsage.Sub(sUsage.DiskUsage)
  422. self.NetUsage.Sub(sUsage.NetUsage)
  423. for id, cg := range sUsage.InstanceGroupUsage {
  424. if scg, ok := self.InstanceGroupUsage[id]; ok {
  425. count := scg.ReferCount - cg.ReferCount
  426. if count <= 0 {
  427. delete(self.InstanceGroupUsage, id)
  428. continue
  429. }
  430. scg.ReferCount = count
  431. }
  432. }
  433. }
  434. func (self *SPendingUsage) IsEmpty() bool {
  435. if self.Cpu > 0 {
  436. return false
  437. }
  438. if self.Memory > 0 {
  439. return false
  440. }
  441. if self.IsolatedDevice > 0 {
  442. return false
  443. }
  444. if !self.DiskUsage.IsEmpty() {
  445. return false
  446. }
  447. if !self.NetUsage.IsEmpty() {
  448. return false
  449. }
  450. if len(self.InstanceGroupUsage) != 0 {
  451. return false
  452. }
  453. return true
  454. }
  455. func (self *SessionPendingUsage) cancelSelf() {
  456. hostId := self.Usage.HostId
  457. count := self.count
  458. for i := 0; i <= count; i++ {
  459. HostPendingUsageManager.CancelPendingUsage(hostId, self)
  460. }
  461. }
  462. func (self *SessionPendingUsage) StartTimer() {
  463. timeout := time.Duration(options.Options.ExpireSessionUsageTimeout) * time.Second
  464. go func() {
  465. for {
  466. select {
  467. case <-time.After(timeout):
  468. log.Infof("timeout cancel session usage %#v", self)
  469. self.cancelSelf()
  470. goto ForEnd
  471. case sid := <-self.cancelCh:
  472. log.Infof("Cancel session %s usage, count: %d", sid, self.count)
  473. if self.count <= 0 {
  474. goto ForEnd
  475. } else {
  476. log.Infof("continue waiting next cancel...")
  477. }
  478. }
  479. }
  480. ForEnd:
  481. log.Infof("delete session usage %#v", self)
  482. HostPendingUsageManager.DeleteSessionUsage(self)
  483. }()
  484. }
  485. func (self *SessionPendingUsage) StopTimer() {
  486. defer func() {
  487. if r := recover(); r != nil {
  488. log.Errorf("SessionPendingUsage %#v stop timer: %v", self, r)
  489. debug.PrintStack()
  490. }
  491. }()
  492. self.cancelCh <- self.SessionId
  493. }