task_history.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package manager
  15. import (
  16. "container/list"
  17. "fmt"
  18. "strings"
  19. "sync"
  20. "time"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/util/wait"
  23. u "yunion.io/x/pkg/utils"
  24. "yunion.io/x/onecloud/pkg/scheduler/api"
  25. "yunion.io/x/onecloud/pkg/scheduler/models"
  26. o "yunion.io/x/onecloud/pkg/scheduler/options"
  27. )
  28. type HistoryItem struct {
  29. Task *Task
  30. Time time.Time
  31. }
  32. func NewHistoryItem(task *Task) *HistoryItem {
  33. return &HistoryItem{
  34. Task: task,
  35. Time: time.Now(),
  36. }
  37. }
  38. func (h *HistoryItem) ToAPI() *api.HistoryItem {
  39. task := h.Task
  40. schedInfo := task.SchedInfo
  41. tenants := []string{}
  42. forGuests := []string{}
  43. countDict := make(map[string]int64)
  44. tenants = append(tenants, schedInfo.Project)
  45. for _, forGuest := range schedInfo.ForGuests {
  46. //forGuests = append(forGuests, fmt.Sprintf("%v(%v)", forGuest.ID, forGuest.Name))
  47. forGuests = append(forGuests, fmt.Sprintf("%v", forGuest))
  48. }
  49. guestType := schedInfo.Hypervisor
  50. if c, ok := countDict[guestType]; !ok {
  51. countDict[guestType] = int64(schedInfo.Count)
  52. } else {
  53. countDict[guestType] = c + int64(schedInfo.Count)
  54. }
  55. counts := []string{}
  56. for guestType, count := range countDict {
  57. s := ""
  58. if count > 1 {
  59. s = "s"
  60. }
  61. counts = append(counts, fmt.Sprintf("%v %v%v", count, guestType, s))
  62. }
  63. countStr := strings.Join(counts, ", ")
  64. return &api.HistoryItem{
  65. Time: h.Time.Local().Format("2006-01-02 15:04:05"),
  66. Consuming: fmt.Sprintf("%s", task.Consuming),
  67. SessionID: task.GetSessionID(),
  68. Status: task.GetStatus(),
  69. Tenants: u.Distinct(tenants),
  70. Guests: forGuests,
  71. Count: countStr,
  72. IsSuggestion: schedInfo.IsSuggestion,
  73. }
  74. }
  75. func (h *HistoryItem) ToMap() map[string]string {
  76. ret := make(map[string]string)
  77. ret["SessionID"] = h.Task.GetSessionID()
  78. return ret
  79. }
  80. func (h *HistoryItem) IsSuggestion() bool {
  81. return h.Task.SchedInfo.IsSuggestion
  82. }
  83. type HistoryManager struct {
  84. capacity int
  85. historyMap map[string]*HistoryItem
  86. historyList *list.List
  87. normalHistoryList *list.List // exclude scheduler-test
  88. lock sync.Mutex
  89. stopCh <-chan struct{}
  90. }
  91. func NewHistoryManager(stopCh <-chan struct{}) *HistoryManager {
  92. return &HistoryManager{
  93. capacity: o.Options.SchedulerHistoryLimit,
  94. historyMap: make(map[string]*HistoryItem),
  95. historyList: list.New(),
  96. normalHistoryList: list.New(),
  97. lock: sync.Mutex{},
  98. stopCh: stopCh,
  99. }
  100. }
  101. func (m *HistoryManager) NewHistoryItem(task *Task) *HistoryItem {
  102. m.lock.Lock()
  103. defer m.lock.Unlock()
  104. for _, ls := range []*list.List{m.historyList, m.normalHistoryList} {
  105. for ls.Len() > m.capacity {
  106. h := ls.Back()
  107. ls.Remove(h)
  108. }
  109. }
  110. historyItem := NewHistoryItem(task)
  111. m.historyList.PushFront(historyItem)
  112. if !historyItem.IsSuggestion() {
  113. m.normalHistoryList.PushFront(historyItem)
  114. }
  115. m.historyMap[task.GetSessionID()] = historyItem
  116. return historyItem
  117. }
  118. func (m *HistoryManager) cleanHistoryMap() {
  119. m.lock.Lock()
  120. defer m.lock.Unlock()
  121. if len(m.historyMap) <= m.capacity {
  122. return
  123. }
  124. oldHistoryMap := m.historyMap
  125. newHistoryMap := make(map[string]*HistoryItem)
  126. for _, ls := range []*list.List{m.historyList, m.normalHistoryList} {
  127. for element := ls.Front(); element != nil; element = element.Next() {
  128. sessionId := (element.Value.(*HistoryItem)).Task.GetSessionID()
  129. if h, ok := oldHistoryMap[sessionId]; ok {
  130. newHistoryMap[sessionId] = h
  131. }
  132. }
  133. }
  134. oldHistoryMap = nil
  135. m.historyMap = newHistoryMap
  136. }
  137. func (m *HistoryManager) Run() {
  138. go wait.Until(m.cleanHistoryMap, u.ToDuration(o.Options.SchedulerHistoryCleanPeriod), m.stopCh)
  139. }
  140. func (m *HistoryManager) GetHistoryList(offset int64, limit int64, all bool, isSuggestion bool) ([]*HistoryItem, int64) {
  141. m.lock.Lock()
  142. defer m.lock.Unlock()
  143. var hList *list.List
  144. if all || isSuggestion {
  145. hList = m.historyList
  146. } else {
  147. hList = m.normalHistoryList
  148. }
  149. historyItems := []*HistoryItem{}
  150. element := hList.Front()
  151. for idx := 0; idx < hList.Len(); idx++ {
  152. item := element.Value.(*HistoryItem)
  153. if isSuggestion {
  154. if !item.IsSuggestion() {
  155. element = element.Next()
  156. continue
  157. }
  158. }
  159. historyItems = append(historyItems, item)
  160. element = element.Next()
  161. }
  162. total := len(historyItems)
  163. ret := make([]*HistoryItem, 0)
  164. if offset <= int64(total) {
  165. historyItems = historyItems[offset:]
  166. } else {
  167. return ret, int64(total)
  168. }
  169. for index := 0; int64(index) < limit && index < len(historyItems); index++ {
  170. ret = append(ret, historyItems[index])
  171. }
  172. return historyItems, int64(total)
  173. }
  174. func (m *HistoryManager) GetHistory(sessionId string) *HistoryItem {
  175. m.lock.Lock()
  176. defer m.lock.Unlock()
  177. if historyItem, ok := m.historyMap[sessionId]; ok {
  178. return historyItem
  179. }
  180. return nil
  181. }
  182. func (m *HistoryManager) GetCancelUsage(sessionId string, hostId string) *models.SessionPendingUsage {
  183. item := m.GetHistory(sessionId)
  184. if item == nil {
  185. return nil
  186. }
  187. usage, _ := models.HostPendingUsageManager.GetSessionUsage(sessionId, hostId)
  188. return usage
  189. }
  190. func (m *HistoryManager) CancelCandidatesPendingUsage(hosts []*expireHost) {
  191. for _, h := range hosts {
  192. hostId := h.Id
  193. sid := h.SessionId
  194. if len(sid) == 0 {
  195. continue
  196. }
  197. cancelUsage := m.GetCancelUsage(sid, hostId)
  198. if cancelUsage == nil {
  199. log.Errorf("failed find pending usage for session: %s, host: %s", sid, hostId)
  200. continue
  201. }
  202. if err := models.HostPendingUsageManager.CancelPendingUsage(hostId, cancelUsage); err != nil {
  203. log.Errorf("Cancel host %s usage %#v: %v", hostId, cancelUsage, err)
  204. } else {
  205. cancelUsage.StopTimer()
  206. }
  207. }
  208. }