| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package candidate
- import (
- "fmt"
- "strings"
- gosync "sync"
- "sync/atomic"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/sets"
- "yunion.io/x/pkg/util/workqueue"
- "yunion.io/x/pkg/utils"
- "yunion.io/x/sqlchemy"
- computeapi "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/cloudcommon/db"
- computemodels "yunion.io/x/onecloud/pkg/compute/models"
- "yunion.io/x/onecloud/pkg/scheduler/data_manager/cloudaccount"
- "yunion.io/x/onecloud/pkg/scheduler/data_manager/cloudprovider"
- o "yunion.io/x/onecloud/pkg/scheduler/options"
- )
- type baseBuilder struct {
- resourceType string
- builder IResourceBuilder
- hosts []computemodels.SHost
- hostDict map[string]*computemodels.SHost
- isolatedDevicesDict map[string][]interface{}
- hostCloudproviers map[string]*computemodels.SCloudprovider
- hostCloudaccounts map[string]*computemodels.SCloudaccount
- }
- type InitFunc func(ids []computemodels.SHost, errChan chan error)
- type IResourceBuilder interface {
- FetchHosts(ids []string) ([]computemodels.SHost, error)
- InitFuncs() []InitFunc
- BuildOne(host *computemodels.SHost, getter *networkGetter, desc *BaseHostDesc) (interface{}, error)
- }
- func newBaseBuilder(resourceType string, builder IResourceBuilder) *baseBuilder {
- return &baseBuilder{
- resourceType: resourceType,
- builder: builder,
- }
- }
- func (b *baseBuilder) Type() string {
- return b.resourceType
- }
- func (b *baseBuilder) Do(ids []string) ([]interface{}, error) {
- err := b.init(ids)
- if err != nil {
- return nil, err
- }
- netGetter := newNetworkGetter()
- descs, err := b.build(netGetter)
- if err != nil {
- return nil, err
- }
- return descs, nil
- }
- func (b *baseBuilder) init(ids []string) error {
- if err := b.setHosts(ids); err != nil {
- return errors.Wrap(err, "set host objects")
- }
- wg := &WaitGroupWrapper{}
- errMessageChannel := make(chan error, 12)
- defer close(errMessageChannel)
- setFuncs := []func(){
- // func() { b.setHosts(ids, errMessageChannel) },
- func() {
- b.setIsolatedDevs(ids, errMessageChannel)
- },
- func() {
- b.setCloudproviderAccounts(b.hosts, errMessageChannel)
- },
- func() {
- for _, f := range b.builder.InitFuncs() {
- f(b.hosts, errMessageChannel)
- }
- },
- }
- for _, f := range setFuncs {
- wg.Wrap(f)
- }
- if ok := waitTimeOut(wg, time.Duration(20*time.Second)); !ok {
- log.Errorln("HostBuilder waitgroup timeout.")
- }
- if len(errMessageChannel) != 0 {
- errMessages := make([]string, 0)
- lengthChan := len(errMessageChannel)
- for ; lengthChan > 0; lengthChan-- {
- msg := fmt.Sprintf("%s", <-errMessageChannel)
- log.Errorf("Get error from chan: %s", msg)
- errMessages = append(errMessages, msg)
- }
- return fmt.Errorf("%s\n", strings.Join(errMessages, ";"))
- }
- return nil
- }
- func (b *baseBuilder) build(netGetter *networkGetter) ([]interface{}, error) {
- schedDescs := make([]interface{}, len(b.hosts))
- errs := []error{}
- var descResultLock gosync.Mutex
- var descedLen int32
- buildOne := func(i int) {
- if i >= len(b.hosts) {
- log.Errorf("invalid host index[%d] in b.hosts: %v", i, b.hosts)
- return
- }
- host := b.hosts[i]
- desc, err := b.buildOne(&host, netGetter)
- if err != nil {
- descResultLock.Lock()
- errs = append(errs, err)
- descResultLock.Unlock()
- return
- }
- descResultLock.Lock()
- schedDescs[atomic.AddInt32(&descedLen, 1)-1] = desc
- descResultLock.Unlock()
- }
- workqueue.Parallelize(o.Options.HostBuildParallelizeSize, len(b.hosts), buildOne)
- schedDescs = schedDescs[:descedLen]
- if len(errs) > 0 {
- //return nil, errors.NewAggregate(errs)
- err := errors.NewAggregate(errs)
- log.Errorf("Build schedule desc of %s error: %s", b.resourceType, err)
- }
- return schedDescs, nil
- }
- func (b *baseBuilder) buildOne(host *computemodels.SHost, netGetter *networkGetter) (interface{}, error) {
- baseDesc, err := newBaseHostDesc(b, host, netGetter)
- if err != nil {
- return nil, err
- }
- return b.builder.BuildOne(host, netGetter, baseDesc)
- }
- func (b *baseBuilder) setHosts(ids []string) error {
- hostObjs, err := b.builder.FetchHosts(ids)
- if err != nil {
- return errors.Wrap(err, "FetchHosts")
- }
- hostDict := ToDict(hostObjs)
- b.hosts = hostObjs
- b.hostDict = hostDict
- return nil
- }
- func (b *baseBuilder) getIsolatedDevices(hostID string) (devs []computemodels.SIsolatedDevice) {
- devObjs, ok := b.isolatedDevicesDict[hostID]
- devs = make([]computemodels.SIsolatedDevice, 0)
- if !ok {
- return
- }
- for _, obj := range devObjs {
- dev := obj.(computemodels.SIsolatedDevice)
- devs = append(devs, dev)
- }
- return
- }
- func (b *baseBuilder) setIsolatedDevs(ids []string, errMessageChannel chan error) {
- devs := computemodels.IsolatedDeviceManager.FindByHosts(ids)
- dict, err := utils.GroupBy(devs, func(obj interface{}) (string, error) {
- dev, ok := obj.(computemodels.SIsolatedDevice)
- if !ok {
- return "", utils.ConvertError(obj, "computemodels.SIsolatedDevice")
- }
- return dev.HostId, nil
- })
- if err != nil {
- errMessageChannel <- err
- return
- }
- b.isolatedDevicesDict = dict
- }
- func (b *baseBuilder) setCloudproviderAccounts(hosts []computemodels.SHost, errCh chan error) {
- providerSets := sets.NewString()
- for _, host := range hosts {
- mId := host.ManagerId
- if mId != "" {
- providerSets.Insert(mId)
- }
- }
- providerObjs := make([]computemodels.SCloudprovider, 0)
- for _, pId := range providerSets.List() {
- pObj, ok := cloudprovider.GetManager().GetResource(pId)
- if !ok {
- errCh <- errors.Errorf("Not found cloudprovider by id: %q", pId)
- return
- }
- providerObjs = append(providerObjs, pObj)
- }
- providerDict := ToDict(providerObjs)
- accountSets := sets.NewString()
- for _, provider := range providerObjs {
- accountSets.Insert(provider.CloudaccountId)
- }
- accountObjs := make([]computemodels.SCloudaccount, 0)
- for _, aId := range accountSets.List() {
- aObj, ok := cloudaccount.Manager.GetResource(aId)
- if !ok {
- errCh <- errors.Errorf("Not found cloudaccount by id: %q", aId)
- return
- }
- accountObjs = append(accountObjs, aObj)
- }
- accountDict := ToDict(accountObjs)
- b.hostCloudproviers = make(map[string]*computemodels.SCloudprovider, 0)
- b.hostCloudaccounts = make(map[string]*computemodels.SCloudaccount, 0)
- for _, host := range hosts {
- pId := host.ManagerId
- provider, ok := providerDict[pId]
- if !ok {
- continue
- }
- b.hostCloudproviers[host.GetId()] = provider
- aId := provider.CloudaccountId
- account, ok := accountDict[aId]
- if !ok {
- continue
- }
- b.hostCloudaccounts[host.GetId()] = account
- }
- }
- func FetchModelIds(q *sqlchemy.SQuery) ([]string, error) {
- rs, err := q.Rows()
- if err != nil {
- return nil, err
- }
- ret := []string{}
- defer rs.Close()
- for rs.Next() {
- var id string
- if err := rs.Scan(&id); err != nil {
- return nil, err
- }
- ret = append(ret, id)
- }
- return ret, nil
- }
- func FetchHostsByIds(ids []string) ([]computemodels.SHost, error) {
- hosts := computemodels.HostManager.Query()
- q := hosts.In("id", ids)
- hostObjs := make([]computemodels.SHost, 0)
- if err := db.FetchModelObjects(computemodels.HostManager, q, &hostObjs); err != nil {
- return nil, err
- }
- return hostObjs, nil
- }
- type UpdateStatus struct {
- Id string `json:"id"`
- UpdatedAt time.Time `json:"updated_at"`
- }
- func FetchModelUpdateStatus(man db.IStandaloneModelManager, cond sqlchemy.ICondition) ([]UpdateStatus, error) {
- ret := make([]UpdateStatus, 0)
- err := man.Query("id", "updated_at").Filter(cond).All(&ret)
- return ret, err
- }
- func FetchHostsUpdateStatus(isBaremetal bool) ([]UpdateStatus, error) {
- q := computemodels.HostManager.Query("id", "updated_at")
- if isBaremetal {
- q = q.Equals("host_type", computeapi.HOST_TYPE_BAREMETAL)
- } else {
- q = q.NotEquals("host_type", computeapi.HOST_TYPE_BAREMETAL)
- }
- ret := make([]UpdateStatus, 0)
- err := q.All(&ret)
- return ret, err
- }
- type ResidentTenant struct {
- HostId string `json:"host_id"`
- TenantId string `json:"tenant_id"`
- TenantCount int64 `json:"tenant_count"`
- }
- func (t ResidentTenant) First() string {
- return t.HostId
- }
- func (t ResidentTenant) Second() string {
- return t.TenantId
- }
- func (t ResidentTenant) Third() interface{} {
- return t.TenantCount
- }
- func FetchHostsResidentTenants(hostIds []string) ([]ResidentTenant, error) {
- guests := computemodels.GuestManager.Query().SubQuery()
- q := guests.Query(
- guests.Field("host_id"),
- guests.Field("tenant_id"),
- sqlchemy.COUNT("tenant_count", guests.Field("tenant_id")),
- ).In("host_id", hostIds).GroupBy("tenant_id", "host_id")
- ret := make([]ResidentTenant, 0)
- err := q.All(&ret)
- return ret, err
- }
|