builder.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package candidate
  15. import (
  16. "fmt"
  17. "strings"
  18. gosync "sync"
  19. "sync/atomic"
  20. "time"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/errors"
  23. "yunion.io/x/pkg/util/sets"
  24. "yunion.io/x/pkg/util/workqueue"
  25. "yunion.io/x/pkg/utils"
  26. "yunion.io/x/sqlchemy"
  27. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  29. computemodels "yunion.io/x/onecloud/pkg/compute/models"
  30. "yunion.io/x/onecloud/pkg/scheduler/data_manager/cloudaccount"
  31. "yunion.io/x/onecloud/pkg/scheduler/data_manager/cloudprovider"
  32. o "yunion.io/x/onecloud/pkg/scheduler/options"
  33. )
  34. type baseBuilder struct {
  35. resourceType string
  36. builder IResourceBuilder
  37. hosts []computemodels.SHost
  38. hostDict map[string]*computemodels.SHost
  39. isolatedDevicesDict map[string][]interface{}
  40. hostCloudproviers map[string]*computemodels.SCloudprovider
  41. hostCloudaccounts map[string]*computemodels.SCloudaccount
  42. }
  43. type InitFunc func(ids []computemodels.SHost, errChan chan error)
  44. type IResourceBuilder interface {
  45. FetchHosts(ids []string) ([]computemodels.SHost, error)
  46. InitFuncs() []InitFunc
  47. BuildOne(host *computemodels.SHost, getter *networkGetter, desc *BaseHostDesc) (interface{}, error)
  48. }
  49. func newBaseBuilder(resourceType string, builder IResourceBuilder) *baseBuilder {
  50. return &baseBuilder{
  51. resourceType: resourceType,
  52. builder: builder,
  53. }
  54. }
  55. func (b *baseBuilder) Type() string {
  56. return b.resourceType
  57. }
  58. func (b *baseBuilder) Do(ids []string) ([]interface{}, error) {
  59. err := b.init(ids)
  60. if err != nil {
  61. return nil, err
  62. }
  63. netGetter := newNetworkGetter()
  64. descs, err := b.build(netGetter)
  65. if err != nil {
  66. return nil, err
  67. }
  68. return descs, nil
  69. }
  70. func (b *baseBuilder) init(ids []string) error {
  71. if err := b.setHosts(ids); err != nil {
  72. return errors.Wrap(err, "set host objects")
  73. }
  74. wg := &WaitGroupWrapper{}
  75. errMessageChannel := make(chan error, 12)
  76. defer close(errMessageChannel)
  77. setFuncs := []func(){
  78. // func() { b.setHosts(ids, errMessageChannel) },
  79. func() {
  80. b.setIsolatedDevs(ids, errMessageChannel)
  81. },
  82. func() {
  83. b.setCloudproviderAccounts(b.hosts, errMessageChannel)
  84. },
  85. func() {
  86. for _, f := range b.builder.InitFuncs() {
  87. f(b.hosts, errMessageChannel)
  88. }
  89. },
  90. }
  91. for _, f := range setFuncs {
  92. wg.Wrap(f)
  93. }
  94. if ok := waitTimeOut(wg, time.Duration(20*time.Second)); !ok {
  95. log.Errorln("HostBuilder waitgroup timeout.")
  96. }
  97. if len(errMessageChannel) != 0 {
  98. errMessages := make([]string, 0)
  99. lengthChan := len(errMessageChannel)
  100. for ; lengthChan > 0; lengthChan-- {
  101. msg := fmt.Sprintf("%s", <-errMessageChannel)
  102. log.Errorf("Get error from chan: %s", msg)
  103. errMessages = append(errMessages, msg)
  104. }
  105. return fmt.Errorf("%s\n", strings.Join(errMessages, ";"))
  106. }
  107. return nil
  108. }
  109. func (b *baseBuilder) build(netGetter *networkGetter) ([]interface{}, error) {
  110. schedDescs := make([]interface{}, len(b.hosts))
  111. errs := []error{}
  112. var descResultLock gosync.Mutex
  113. var descedLen int32
  114. buildOne := func(i int) {
  115. if i >= len(b.hosts) {
  116. log.Errorf("invalid host index[%d] in b.hosts: %v", i, b.hosts)
  117. return
  118. }
  119. host := b.hosts[i]
  120. desc, err := b.buildOne(&host, netGetter)
  121. if err != nil {
  122. descResultLock.Lock()
  123. errs = append(errs, err)
  124. descResultLock.Unlock()
  125. return
  126. }
  127. descResultLock.Lock()
  128. schedDescs[atomic.AddInt32(&descedLen, 1)-1] = desc
  129. descResultLock.Unlock()
  130. }
  131. workqueue.Parallelize(o.Options.HostBuildParallelizeSize, len(b.hosts), buildOne)
  132. schedDescs = schedDescs[:descedLen]
  133. if len(errs) > 0 {
  134. //return nil, errors.NewAggregate(errs)
  135. err := errors.NewAggregate(errs)
  136. log.Errorf("Build schedule desc of %s error: %s", b.resourceType, err)
  137. }
  138. return schedDescs, nil
  139. }
  140. func (b *baseBuilder) buildOne(host *computemodels.SHost, netGetter *networkGetter) (interface{}, error) {
  141. baseDesc, err := newBaseHostDesc(b, host, netGetter)
  142. if err != nil {
  143. return nil, err
  144. }
  145. return b.builder.BuildOne(host, netGetter, baseDesc)
  146. }
  147. func (b *baseBuilder) setHosts(ids []string) error {
  148. hostObjs, err := b.builder.FetchHosts(ids)
  149. if err != nil {
  150. return errors.Wrap(err, "FetchHosts")
  151. }
  152. hostDict := ToDict(hostObjs)
  153. b.hosts = hostObjs
  154. b.hostDict = hostDict
  155. return nil
  156. }
  157. func (b *baseBuilder) getIsolatedDevices(hostID string) (devs []computemodels.SIsolatedDevice) {
  158. devObjs, ok := b.isolatedDevicesDict[hostID]
  159. devs = make([]computemodels.SIsolatedDevice, 0)
  160. if !ok {
  161. return
  162. }
  163. for _, obj := range devObjs {
  164. dev := obj.(computemodels.SIsolatedDevice)
  165. devs = append(devs, dev)
  166. }
  167. return
  168. }
  169. func (b *baseBuilder) setIsolatedDevs(ids []string, errMessageChannel chan error) {
  170. devs := computemodels.IsolatedDeviceManager.FindByHosts(ids)
  171. dict, err := utils.GroupBy(devs, func(obj interface{}) (string, error) {
  172. dev, ok := obj.(computemodels.SIsolatedDevice)
  173. if !ok {
  174. return "", utils.ConvertError(obj, "computemodels.SIsolatedDevice")
  175. }
  176. return dev.HostId, nil
  177. })
  178. if err != nil {
  179. errMessageChannel <- err
  180. return
  181. }
  182. b.isolatedDevicesDict = dict
  183. }
  184. func (b *baseBuilder) setCloudproviderAccounts(hosts []computemodels.SHost, errCh chan error) {
  185. providerSets := sets.NewString()
  186. for _, host := range hosts {
  187. mId := host.ManagerId
  188. if mId != "" {
  189. providerSets.Insert(mId)
  190. }
  191. }
  192. providerObjs := make([]computemodels.SCloudprovider, 0)
  193. for _, pId := range providerSets.List() {
  194. pObj, ok := cloudprovider.GetManager().GetResource(pId)
  195. if !ok {
  196. errCh <- errors.Errorf("Not found cloudprovider by id: %q", pId)
  197. return
  198. }
  199. providerObjs = append(providerObjs, pObj)
  200. }
  201. providerDict := ToDict(providerObjs)
  202. accountSets := sets.NewString()
  203. for _, provider := range providerObjs {
  204. accountSets.Insert(provider.CloudaccountId)
  205. }
  206. accountObjs := make([]computemodels.SCloudaccount, 0)
  207. for _, aId := range accountSets.List() {
  208. aObj, ok := cloudaccount.Manager.GetResource(aId)
  209. if !ok {
  210. errCh <- errors.Errorf("Not found cloudaccount by id: %q", aId)
  211. return
  212. }
  213. accountObjs = append(accountObjs, aObj)
  214. }
  215. accountDict := ToDict(accountObjs)
  216. b.hostCloudproviers = make(map[string]*computemodels.SCloudprovider, 0)
  217. b.hostCloudaccounts = make(map[string]*computemodels.SCloudaccount, 0)
  218. for _, host := range hosts {
  219. pId := host.ManagerId
  220. provider, ok := providerDict[pId]
  221. if !ok {
  222. continue
  223. }
  224. b.hostCloudproviers[host.GetId()] = provider
  225. aId := provider.CloudaccountId
  226. account, ok := accountDict[aId]
  227. if !ok {
  228. continue
  229. }
  230. b.hostCloudaccounts[host.GetId()] = account
  231. }
  232. }
  233. func FetchModelIds(q *sqlchemy.SQuery) ([]string, error) {
  234. rs, err := q.Rows()
  235. if err != nil {
  236. return nil, err
  237. }
  238. ret := []string{}
  239. defer rs.Close()
  240. for rs.Next() {
  241. var id string
  242. if err := rs.Scan(&id); err != nil {
  243. return nil, err
  244. }
  245. ret = append(ret, id)
  246. }
  247. return ret, nil
  248. }
  249. func FetchHostsByIds(ids []string) ([]computemodels.SHost, error) {
  250. hosts := computemodels.HostManager.Query()
  251. q := hosts.In("id", ids)
  252. hostObjs := make([]computemodels.SHost, 0)
  253. if err := db.FetchModelObjects(computemodels.HostManager, q, &hostObjs); err != nil {
  254. return nil, err
  255. }
  256. return hostObjs, nil
  257. }
  258. type UpdateStatus struct {
  259. Id string `json:"id"`
  260. UpdatedAt time.Time `json:"updated_at"`
  261. }
  262. func FetchModelUpdateStatus(man db.IStandaloneModelManager, cond sqlchemy.ICondition) ([]UpdateStatus, error) {
  263. ret := make([]UpdateStatus, 0)
  264. err := man.Query("id", "updated_at").Filter(cond).All(&ret)
  265. return ret, err
  266. }
  267. func FetchHostsUpdateStatus(isBaremetal bool) ([]UpdateStatus, error) {
  268. q := computemodels.HostManager.Query("id", "updated_at")
  269. if isBaremetal {
  270. q = q.Equals("host_type", computeapi.HOST_TYPE_BAREMETAL)
  271. } else {
  272. q = q.NotEquals("host_type", computeapi.HOST_TYPE_BAREMETAL)
  273. }
  274. ret := make([]UpdateStatus, 0)
  275. err := q.All(&ret)
  276. return ret, err
  277. }
  278. type ResidentTenant struct {
  279. HostId string `json:"host_id"`
  280. TenantId string `json:"tenant_id"`
  281. TenantCount int64 `json:"tenant_count"`
  282. }
  283. func (t ResidentTenant) First() string {
  284. return t.HostId
  285. }
  286. func (t ResidentTenant) Second() string {
  287. return t.TenantId
  288. }
  289. func (t ResidentTenant) Third() interface{} {
  290. return t.TenantCount
  291. }
  292. func FetchHostsResidentTenants(hostIds []string) ([]ResidentTenant, error) {
  293. guests := computemodels.GuestManager.Query().SubQuery()
  294. q := guests.Query(
  295. guests.Field("host_id"),
  296. guests.Field("tenant_id"),
  297. sqlchemy.COUNT("tenant_count", guests.Field("tenant_id")),
  298. ).In("host_id", hostIds).GroupBy("tenant_id", "host_id")
  299. ret := make([]ResidentTenant, 0)
  300. err := q.All(&ret)
  301. return ret, err
  302. }