schedpolicies.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "yunion.io/x/jsonutils"
  18. "yunion.io/x/log"
  19. "yunion.io/x/pkg/errors"
  20. "yunion.io/x/pkg/tristate"
  21. "yunion.io/x/pkg/utils"
  22. "yunion.io/x/sqlchemy"
  23. "yunion.io/x/onecloud/pkg/apis"
  24. api "yunion.io/x/onecloud/pkg/apis/compute"
  25. schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
  26. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  27. "yunion.io/x/onecloud/pkg/httperrors"
  28. "yunion.io/x/onecloud/pkg/mcclient"
  29. "yunion.io/x/onecloud/pkg/util/conditionparser"
  30. "yunion.io/x/onecloud/pkg/util/stringutils2"
  31. )
  32. type SSchedpolicyManager struct {
  33. db.SStandaloneResourceBaseManager
  34. SSchedtagResourceBaseManager
  35. }
  36. var SchedpolicyManager *SSchedpolicyManager
  37. func init() {
  38. SchedpolicyManager = &SSchedpolicyManager{
  39. SStandaloneResourceBaseManager: db.NewStandaloneResourceBaseManager(
  40. SSchedpolicy{},
  41. "schedpolicies_tbl",
  42. "schedpolicy",
  43. "schedpolicies",
  44. ),
  45. }
  46. SchedpolicyManager.SetVirtualObject(SchedpolicyManager)
  47. }
  48. // sched policy is called before calling scheduler, add additional preferences for schedtags
  49. type SSchedpolicy struct {
  50. db.SStandaloneResourceBase
  51. SSchedtagResourceBase
  52. Condition string `width:"1024" charset:"ascii" nullable:"false" list:"user" create:"required" update:"user"`
  53. Strategy string `width:"32" charset:"ascii" nullable:"false" list:"user" create:"required" update:"user"`
  54. Enabled tristate.TriState `default:"true" create:"optional" list:"user" update:"user"`
  55. }
  56. func validateSchedpolicyInputData(ctx context.Context, data *jsonutils.JSONDict, create bool) error {
  57. err := validateDynamicSchedtagInputData(ctx, data, create)
  58. if err != nil {
  59. return err
  60. }
  61. strategyStr := jsonutils.GetAnyString(data, []string{"strategy"})
  62. if len(strategyStr) == 0 && create {
  63. return httperrors.NewMissingParameterError("strategy")
  64. }
  65. if len(strategyStr) > 0 && !utils.IsInStringArray(strategyStr, STRATEGY_LIST) {
  66. return httperrors.NewInputParameterError("invalid strategy %s", strategyStr)
  67. }
  68. return nil
  69. }
  70. func (manager *SSchedpolicyManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  71. err := validateSchedpolicyInputData(ctx, data, true)
  72. if err != nil {
  73. return nil, err
  74. }
  75. input := apis.StandaloneResourceCreateInput{}
  76. err = data.Unmarshal(&input)
  77. if err != nil {
  78. return nil, httperrors.NewInternalServerError("unmarshal StandaloneResourceCreateInput fail %s", err)
  79. }
  80. input, err = manager.SStandaloneResourceBaseManager.ValidateCreateData(ctx, userCred, ownerId, query, input)
  81. if err != nil {
  82. return nil, err
  83. }
  84. data.Update(jsonutils.Marshal(input))
  85. return data, nil
  86. }
  87. func (self *SSchedpolicy) ValidateUpdateData(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  88. err := validateSchedpolicyInputData(ctx, data, false)
  89. if err != nil {
  90. return nil, err
  91. }
  92. input := apis.StandaloneResourceBaseUpdateInput{}
  93. err = data.Unmarshal(&input)
  94. if err != nil {
  95. return nil, errors.Wrap(err, "Unmarshal")
  96. }
  97. input, err = self.SStandaloneResourceBase.ValidateUpdateData(ctx, userCred, query, input)
  98. if err != nil {
  99. return nil, errors.Wrap(err, "SStandaloneResourceBase.ValidateUpdateData")
  100. }
  101. data.Update(jsonutils.Marshal(input))
  102. return data, nil
  103. }
  104. func (self *SSchedpolicy) getSchedtag() *SSchedtag {
  105. obj, err := SchedtagManager.FetchById(self.SchedtagId)
  106. if err != nil {
  107. log.Errorf("fail to fetch sched tag by id %s", err)
  108. return nil
  109. }
  110. return obj.(*SSchedtag)
  111. }
  112. func (manager *SSchedpolicyManager) FetchCustomizeColumns(
  113. ctx context.Context,
  114. userCred mcclient.TokenCredential,
  115. query jsonutils.JSONObject,
  116. objs []interface{},
  117. fields stringutils2.SSortedStrings,
  118. isList bool,
  119. ) []api.SchedpolicyDetails {
  120. rows := make([]api.SchedpolicyDetails, len(objs))
  121. stdRows := manager.SStandaloneResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  122. tagRows := manager.SSchedtagResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
  123. for i := range rows {
  124. rows[i] = api.SchedpolicyDetails{
  125. StandaloneResourceDetails: stdRows[i],
  126. SchedtagResourceInfo: tagRows[i],
  127. }
  128. }
  129. return rows
  130. }
  131. func (manager *SSchedpolicyManager) getAllEnabledPoliciesByResource(resType string) []SSchedpolicy {
  132. policies := make([]SSchedpolicy, 0)
  133. q := SchedpolicyManager.Query().IsTrue("enabled")
  134. schedtags := SchedtagManager.Query().SubQuery()
  135. q = q.Join(schedtags, sqlchemy.AND(
  136. sqlchemy.Equals(q.Field("schedtag_id"), schedtags.Field("id")),
  137. sqlchemy.Equals(schedtags.Field("resource_type"), resType)))
  138. err := db.FetchModelObjects(manager, q, &policies)
  139. if err != nil {
  140. log.Errorf("getAllEnabledPolicies fail %s", err)
  141. return nil
  142. }
  143. return policies
  144. }
  145. func (manager *SSchedpolicyManager) getHostEnabledPolicies() []SSchedpolicy {
  146. return manager.getAllEnabledPoliciesByResource(HostManager.KeywordPlural())
  147. }
  148. func (manager *SSchedpolicyManager) getStorageEnabledPolicies() []SSchedpolicy {
  149. return manager.getAllEnabledPoliciesByResource(StorageManager.KeywordPlural())
  150. }
  151. func (manager *SSchedpolicyManager) getNetworkEnabledPolicies() []SSchedpolicy {
  152. return manager.getAllEnabledPoliciesByResource(NetworkManager.KeywordPlural())
  153. }
  154. func (self *SSchedpolicy) PerformEvaluate(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  155. objectId := jsonutils.GetAnyString(data, []string{"object", "object_id"})
  156. resType := jsonutils.GetAnyString(data, []string{"resource_type"})
  157. resMan := DynamicschedtagManager.VirtualResourcesManager[resType]
  158. if resMan == nil {
  159. return nil, httperrors.NewNotAcceptableError("ResourceType %q not support", resType)
  160. }
  161. obj, err := FetchDynamicResourceObject(ctx, resMan, userCred, objectId)
  162. if err != nil {
  163. return nil, err
  164. }
  165. desc := obj.GetDynamicConditionInput()
  166. params := jsonutils.NewDict()
  167. params.Add(desc, obj.Keyword())
  168. log.V(10).Debugf("Schedpolicy evaluate input: %s", params.PrettyString())
  169. meet, err := conditionparser.EvalBool(self.Condition, params)
  170. if err != nil {
  171. return nil, err
  172. }
  173. result := jsonutils.NewDict()
  174. result.Add(desc, obj.Keyword())
  175. if meet {
  176. result.Add(jsonutils.JSONTrue, "result")
  177. } else {
  178. result.Add(jsonutils.JSONFalse, "result")
  179. }
  180. return result, nil
  181. }
  182. func matchResourceSchedPolicy(
  183. policy SSchedpolicy,
  184. input *jsonutils.JSONDict,
  185. ) bool {
  186. meet, err := conditionparser.EvalBool(policy.Condition, input)
  187. if err != nil {
  188. log.Errorf("Eval Condition %s error: %v", policy.Condition, err)
  189. return false
  190. }
  191. return meet
  192. }
  193. func applyResourceSchedPolicy(
  194. policies []SSchedpolicy,
  195. oldTags []*api.SchedtagConfig,
  196. input *jsonutils.JSONDict,
  197. setTags func([]*api.SchedtagConfig),
  198. ) {
  199. schedtags := make(map[string]*api.SchedtagConfig)
  200. for _, tag := range oldTags {
  201. schedtags[tag.Id] = tag
  202. }
  203. log.Infof("original schedtag %#v", schedtags)
  204. for i := 0; i < len(policies); i += 1 {
  205. policy := policies[i]
  206. st := policy.getSchedtag()
  207. if matchResourceSchedPolicy(policy, input) {
  208. if conf, idOk := schedtags[st.GetId()]; idOk {
  209. conf.Id = st.GetId()
  210. conf.Strategy = policy.Strategy
  211. schedtags[st.GetId()] = conf
  212. } else if conf, nameOk := schedtags[st.GetName()]; nameOk {
  213. conf.Id = st.GetId()
  214. conf.Strategy = policy.Strategy
  215. schedtags[st.GetId()] = conf
  216. delete(schedtags, st.GetName())
  217. } else {
  218. schedtags[st.GetId()] = &api.SchedtagConfig{
  219. Id: st.GetId(),
  220. Strategy: policy.Strategy,
  221. ResourceType: st.ResourceType,
  222. }
  223. }
  224. }
  225. }
  226. log.Infof("updated sched tag %#v", schedtags)
  227. newSchedtags := make([]*api.SchedtagConfig, 0)
  228. for _, tag := range schedtags {
  229. newSchedtags = append(newSchedtags, tag)
  230. }
  231. setTags(newSchedtags)
  232. }
  233. func GetDynamicConditionInput(man IDynamicResourceManager, input *jsonutils.JSONDict) *jsonutils.JSONDict {
  234. ret := jsonutils.NewDict()
  235. ret.Add(input, man.Keyword())
  236. return ret
  237. }
  238. func applyServerSchedtags(policies []SSchedpolicy, input *schedapi.ScheduleInput) {
  239. inputCond := GetDynamicConditionInput(GuestManager, input.ToConditionInput())
  240. setFunc := func(tags []*api.SchedtagConfig) {
  241. input.Schedtags = tags
  242. }
  243. applyResourceSchedPolicy(policies, input.Schedtags, inputCond, setFunc)
  244. }
  245. func applyDiskSchedtags(policies []SSchedpolicy, input *api.DiskConfig) {
  246. inputCond := GetDynamicConditionInput(DiskManager, jsonutils.Marshal(input).(*jsonutils.JSONDict))
  247. setFunc := func(tags []*api.SchedtagConfig) {
  248. input.Schedtags = tags
  249. }
  250. applyResourceSchedPolicy(policies, input.Schedtags, inputCond, setFunc)
  251. }
  252. func applyNetworkSchedtags(policies []SSchedpolicy, input *api.NetworkConfig) {
  253. inputCond := GetDynamicConditionInput(NetworkManager, jsonutils.Marshal(input).(*jsonutils.JSONDict))
  254. setFunc := func(tags []*api.SchedtagConfig) {
  255. input.Schedtags = tags
  256. }
  257. applyResourceSchedPolicy(policies, input.Schedtags, inputCond, setFunc)
  258. }
  259. func ApplySchedPolicies(input *schedapi.ScheduleInput) *schedapi.ScheduleInput {
  260. // TODO: refactor this duplicate code
  261. hostPolicies := SchedpolicyManager.getHostEnabledPolicies()
  262. storagePolicies := SchedpolicyManager.getStorageEnabledPolicies()
  263. networkPolicies := SchedpolicyManager.getNetworkEnabledPolicies()
  264. config := input.ServerConfigs
  265. applyServerSchedtags(hostPolicies, input)
  266. for _, disk := range config.Disks {
  267. applyDiskSchedtags(storagePolicies, disk)
  268. }
  269. for _, net := range config.Networks {
  270. applyNetworkSchedtags(networkPolicies, net)
  271. }
  272. input.ServerConfig.ServerConfigs = config
  273. return input
  274. }
  275. // 动态调度策略列表
  276. func (manager *SSchedpolicyManager) ListItemFilter(
  277. ctx context.Context,
  278. q *sqlchemy.SQuery,
  279. userCred mcclient.TokenCredential,
  280. input api.SchedpolicyListInput,
  281. ) (*sqlchemy.SQuery, error) {
  282. var err error
  283. q, err = manager.SStandaloneResourceBaseManager.ListItemFilter(ctx, q, userCred, input.StandaloneResourceListInput)
  284. if err != nil {
  285. return nil, errors.Wrap(err, "SStandaloneResourceBaseManager.ListItemFilter")
  286. }
  287. q, err = manager.SSchedtagResourceBaseManager.ListItemFilter(ctx, q, userCred, input.SchedtagFilterListInput)
  288. if err != nil {
  289. return nil, errors.Wrap(err, "SSchedtagResourceBaseManager.ListItemFilter")
  290. }
  291. if len(input.Strategy) > 0 {
  292. q = q.In("strategy", input.Strategy)
  293. }
  294. if input.Enabled != nil {
  295. if *input.Enabled {
  296. q = q.IsTrue("enabled")
  297. } else {
  298. q = q.IsFalse("enabled")
  299. }
  300. }
  301. return q, nil
  302. }
  303. func (manager *SSchedpolicyManager) OrderByExtraFields(
  304. ctx context.Context,
  305. q *sqlchemy.SQuery,
  306. userCred mcclient.TokenCredential,
  307. input api.SchedpolicyListInput,
  308. ) (*sqlchemy.SQuery, error) {
  309. var err error
  310. q, err = manager.SStandaloneResourceBaseManager.OrderByExtraFields(ctx, q, userCred, input.StandaloneResourceListInput)
  311. if err != nil {
  312. return nil, errors.Wrap(err, "SStandaloneResourceBaseManager.OrderByExtraFields")
  313. }
  314. q, err = manager.SSchedtagResourceBaseManager.OrderByExtraFields(ctx, q, userCred, input.SchedtagFilterListInput)
  315. if err != nil {
  316. return nil, errors.Wrap(err, "SSchedtagResourceBaseManager.OrderByExtraFields")
  317. }
  318. return q, nil
  319. }
  320. func (manager *SSchedpolicyManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
  321. var err error
  322. q, err = manager.SStandaloneResourceBaseManager.QueryDistinctExtraField(q, field)
  323. if err == nil {
  324. return q, nil
  325. }
  326. q, err = manager.SSchedtagResourceBaseManager.QueryDistinctExtraField(q, field)
  327. if err == nil {
  328. return q, nil
  329. }
  330. return q, httperrors.ErrNotFound
  331. }