| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package hostutils
- import (
- "context"
- "fmt"
- "net/http"
- "os"
- "path"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/appctx"
- "yunion.io/x/pkg/util/regutils"
- "yunion.io/x/onecloud/pkg/apis"
- computeapi "yunion.io/x/onecloud/pkg/apis/compute"
- hostapi "yunion.io/x/onecloud/pkg/apis/host"
- "yunion.io/x/onecloud/pkg/appsrv"
- "yunion.io/x/onecloud/pkg/cloudcommon/consts"
- "yunion.io/x/onecloud/pkg/cloudcommon/workmanager"
- "yunion.io/x/onecloud/pkg/hostman/hostinfo/hostbridge"
- "yunion.io/x/onecloud/pkg/hostman/hostutils/kubelet"
- "yunion.io/x/onecloud/pkg/hostman/isolated_device"
- "yunion.io/x/onecloud/pkg/hostman/options"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- "yunion.io/x/onecloud/pkg/mcclient/modulebase"
- modules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- "yunion.io/x/onecloud/pkg/mcclient/modules/k8s"
- "yunion.io/x/onecloud/pkg/util/cgrouputils/cpuset"
- "yunion.io/x/onecloud/pkg/util/fileutils2"
- "yunion.io/x/onecloud/pkg/util/pod"
- )
- type SContainerCpufreqSimulateConfig struct {
- CpuinfoMaxFreq int `json:"cpuinfo_max_freq"`
- CpuinfoMinFreq int `json:"cpuinfo_min_freq"`
- CpuinfoCurFreq int `json:"cpuinfo_cur_freq"`
- CpuinfoTransitionLatency int `json:"cpuinfo_transition_latency"`
- ScalingDriver string `json:"scaling_driver"`
- ScalingGovernors string `json:"scaling_governor"`
- ScalingMaxFreq int `json:"scaling_max_freq"`
- ScalingMinFreq int `json:"scaling_min_freq"`
- ScalingCurFreq int `json:"scaling_cur_freq"`
- ScalingSetspeed string `json:"scaling_setspeed"`
- ScalingAvailableGovernors string `json:"scaling_available_governors"`
- }
- type IGuestManager interface {
- GetImageDeps(storageType string) []string
- }
- type IHost interface {
- GetZoneId() string
- GetHostId() string
- GetMasterIp() string
- GetCpuArchitecture() string
- GetKernelVersion() string
- IsAarch64() bool
- IsX8664() bool
- IsRiscv64() bool
- GetHostTopology() *hostapi.HostTopology
- GetReservedCpusInfo() (*cpuset.CPUSet, *cpuset.CPUSet)
- GetReservedMemMb() int
- IsHugepagesEnabled() bool
- HugepageSizeKb() int
- IsSchedulerNumaAllocateEnabled() bool
- CpuCmtBound() float32
- MemCmtBound() float32
- IsKvmSupport() bool
- IsNestedVirtualization() bool
- PutHostOnline() error
- StartDHCPServer()
- GetBridgeDev(bridge string) hostbridge.IBridgeDriver
- GetIsolatedDeviceManager() isolated_device.IsolatedDeviceManager
- // SyncRootPartitionUsedCapacity() error
- GetKubeletConfig() kubelet.KubeletConfig
- // containerd related methods
- IsContainerHost() bool
- GetContainerRuntimeEndpoint() string
- GetCRI() pod.CRI
- GetContainerCPUMap() *pod.HostContainerCPUMap
- GetContainerCpufreqSimulateConfig() *jsonutils.JSONDict
- OnCatalogChanged(catalog mcclient.KeystoneServiceCatalogV3)
- OnHostFilesChanged(hostfiles []computeapi.SHostFile) error
- SetIGuestManager(guestman IGuestManager)
- GetIGuestManager() IGuestManager
- OnGuestLoadingComplete()
- }
- func GetComputeSession(ctx context.Context) *mcclient.ClientSession {
- return auth.GetAdminSession(ctx, consts.GetRegion())
- }
- func GetK8sSession(ctx context.Context) *mcclient.ClientSession {
- return auth.GetAdminSession(ctx, consts.GetRegion())
- }
- func GetImageSession(ctx context.Context) *mcclient.ClientSession {
- return auth.AdminSession(ctx, consts.GetRegion(), consts.GetZone(), "")
- }
- func TaskFailed(ctx context.Context, reason string) {
- if taskId := ctx.Value(appctx.APP_CONTEXT_KEY_TASK_ID); taskId != nil {
- modules.ComputeTasks.TaskFailed2(GetComputeSession(ctx), taskId.(string), reason)
- } else {
- log.Errorf("Reqeuest task failed missing task id, with reason(%s)", reason)
- }
- }
- func TaskFailed2(ctx context.Context, reason string, params *jsonutils.JSONDict) {
- if taskId := ctx.Value(appctx.APP_CONTEXT_KEY_TASK_ID); taskId != nil {
- modules.ComputeTasks.TaskFailed3(GetComputeSession(ctx), taskId.(string), reason, params)
- } else {
- log.Errorf("Reqeuest task failed missing task id, with reason(%s)", reason)
- }
- }
- func TaskComplete(ctx context.Context, params jsonutils.JSONObject) {
- if taskId := ctx.Value(appctx.APP_CONTEXT_KEY_TASK_ID); taskId != nil {
- modules.ComputeTasks.TaskComplete(GetComputeSession(ctx), taskId.(string), params)
- } else {
- log.Errorln("Reqeuest task complete missing task id")
- }
- }
- func K8sTaskFailed(ctx context.Context, reason string) {
- if taskId := ctx.Value(appctx.APP_CONTEXT_KEY_TASK_ID); taskId != nil {
- k8s.KubeTasks.TaskFailed2(GetK8sSession(ctx), taskId.(string), reason)
- } else {
- log.Errorf("Reqeuest k8s task failed missing task id, with reason(%s)", reason)
- }
- }
- func K8sTaskComplete(ctx context.Context, params jsonutils.JSONObject) {
- if taskId := ctx.Value(appctx.APP_CONTEXT_KEY_TASK_ID); taskId != nil {
- k8s.KubeTasks.TaskComplete(GetK8sSession(ctx), taskId.(string), params)
- } else {
- log.Errorln("Reqeuest k8s task complete missing task id")
- }
- }
- func GetWireOfIp(ctx context.Context, params jsonutils.JSONObject) (jsonutils.JSONObject, error) {
- res, err := modules.Networks.List(GetComputeSession(ctx), params)
- if err != nil {
- return nil, err
- }
- if len(res.Data) == 1 {
- wireId, _ := res.Data[0].GetString("wire_id")
- return GetWireInfo(ctx, wireId)
- } else {
- return nil, fmt.Errorf("Fail to get network info: no networks")
- }
- }
- func GetWireInfo(ctx context.Context, wireId string) (jsonutils.JSONObject, error) {
- return modules.Wires.Get(GetComputeSession(ctx), wireId, nil)
- }
- func RemoteStoragecacheCacheImage(ctx context.Context, storagecacheId, imageId, status, spath string) (jsonutils.JSONObject, error) {
- var query = jsonutils.NewDict()
- query.Set("auto_create", jsonutils.JSONTrue)
- var params = jsonutils.NewDict()
- params.Set("status", jsonutils.NewString(status))
- params.Set("path", jsonutils.NewString(spath))
- return modules.Storagecachedimages.Update(GetComputeSession(ctx),
- storagecacheId, imageId, query, params)
- }
- func UpdateResourceStatus(ctx context.Context, man modulebase.IResourceManager, id string, statusInput *apis.PerformStatusInput) (jsonutils.JSONObject, error) {
- return man.PerformAction(GetComputeSession(ctx), id, "status", jsonutils.Marshal(statusInput))
- }
- func UpdateContainerStatus(ctx context.Context, cid string, statusInput *computeapi.ContainerPerformStatusInput) (jsonutils.JSONObject, error) {
- return modules.Containers.PerformAction(GetComputeSession(ctx), cid, "status", jsonutils.Marshal(statusInput))
- }
- func UpdateServerStatus(ctx context.Context, sid string, statusInput *apis.PerformStatusInput) (jsonutils.JSONObject, error) {
- return UpdateResourceStatus(ctx, &modules.Servers, sid, statusInput)
- }
- func UpdateServerContainersStatus(ctx context.Context, sid string, input *computeapi.ServerPerformStatusInput) (jsonutils.JSONObject, error) {
- return modules.Servers.PerformAction(GetComputeSession(ctx), sid, "status", jsonutils.Marshal(input))
- }
- func UpdateServerProgress(ctx context.Context, sid string, progress, progressMbps float64) (jsonutils.JSONObject, error) {
- params := map[string]float64{
- "progress": progress,
- "progress_mbps": progressMbps,
- }
- return modules.Servers.Update(GetComputeSession(ctx), sid, jsonutils.Marshal(params))
- }
- func UploadGuestStatus(ctx context.Context, sid string, resp *computeapi.HostUploadGuestStatusInput) (jsonutils.JSONObject, error) {
- return modules.Servers.PerformAction(GetComputeSession(ctx), sid, "upload-status", jsonutils.Marshal(resp))
- }
- func UploadGuestsStatus(ctx context.Context, resp *computeapi.HostUploadGuestsStatusInput) (jsonutils.JSONObject, error) {
- return modules.Servers.PerformClassAction(GetComputeSession(ctx), "upload-status", jsonutils.Marshal(resp))
- }
- func IsGuestDir(f os.FileInfo, serversPath string) bool {
- if !regutils.MatchUUID(f.Name()) {
- return false
- }
- if !f.Mode().IsDir() && f.Mode()&os.ModeSymlink == 0 {
- return false
- }
- descFile := path.Join(serversPath, f.Name(), "desc")
- if !fileutils2.Exists(descFile) {
- return false
- }
- return true
- }
- func ResponseOk(ctx context.Context, w http.ResponseWriter) {
- Response(ctx, w, map[string]string{"result": "ok"})
- }
- func Response(ctx context.Context, w http.ResponseWriter, res interface{}) {
- if taskId := ctx.Value(appctx.APP_CONTEXT_KEY_TASK_ID); taskId != nil {
- w.Header().Set("X-Request-Id", taskId.(string))
- }
- switch res.(type) {
- case string:
- appsrv.Send(w, res.(string))
- case jsonutils.JSONObject:
- appsrv.SendJSON(w, res.(jsonutils.JSONObject))
- case error:
- httperrors.GeneralServerError(ctx, w, res.(error))
- default:
- appsrv.SendStruct(w, res)
- }
- }
- var (
- wm *workmanager.SWorkManager
- imageCacheW *workmanager.SWorkManager
- backupW *workmanager.SWorkManager
- k8sWm *workmanager.SWorkManager
- imagePreCacheW *workmanager.SWorkManager
- ParamsError = fmt.Errorf("Delay task parse params error")
- )
- func GetWorkManager() *workmanager.SWorkManager {
- return wm
- }
- func DelayTask(ctx context.Context, task workmanager.DelayTaskFunc, params interface{}) {
- wm.DelayTask(ctx, task, params)
- }
- func DelayImageCacheTask(ctx context.Context, task workmanager.DelayTaskFunc, params interface{}) {
- imageCacheW.DelayTask(ctx, task, params)
- }
- func DelayImagePreCacheTask(ctx context.Context, task workmanager.DelayTaskFunc, params interface{}) {
- imagePreCacheW.DelayTask(ctx, task, params)
- }
- func DelayBackupTask(ctx context.Context, task workmanager.DelayTaskFunc, params interface{}) {
- backupW.DelayTask(ctx, task, params)
- }
- func DelayKubeTask(ctx context.Context, task workmanager.DelayTaskFunc, params interface{}) {
- k8sWm.DelayTask(ctx, task, params)
- }
- func DelayTaskWithoutReqctx(ctx context.Context, task workmanager.DelayTaskFunc, params interface{}) {
- wm.DelayTaskWithoutReqctx(ctx, task, params)
- }
- func DelayTaskWithWorker(
- ctx context.Context, task workmanager.DelayTaskFunc,
- params interface{}, worker *appsrv.SWorkerManager,
- ) {
- wm.DelayTaskWithWorker(ctx, task, params, worker)
- }
- func InitWorkerManager() {
- InitWorkerManagerWithCount(options.HostOptions.DefaultRequestWorkerCount)
- initImageCacheWorkerManager()
- }
- func initImageCacheWorkerManager() {
- imageCacheW = workmanager.NewWorkManger("ImageCacheDelayTaskWorkers", TaskFailed, TaskComplete, options.HostOptions.ImageCacheWorkerCount)
- imagePreCacheW = workmanager.NewWorkManger("ImagePrefetchCacheDelayTaskWorkers", TaskFailed, TaskComplete, options.HostOptions.ImageCacheWorkerCount)
- }
- func initBackupWorkerManager() {
- backupW = workmanager.NewWorkManger("BackupDelayTaskWorkers", TaskFailed, TaskComplete, options.HostOptions.BackupTaskWorkerCount)
- }
- func InitWorkerManagerWithCount(count int) {
- wm = workmanager.NewWorkManger("GeneralDelayedTaskWorkers", TaskFailed, TaskComplete, count)
- }
- func InitK8sWorkerManager() {
- k8sWm = workmanager.NewWorkManger("K8sDelayedTaskWorkers", K8sTaskFailed, K8sTaskComplete, options.HostOptions.DefaultRequestWorkerCount)
- }
- func Init() {
- InitWorkerManager()
- InitK8sWorkerManager()
- initBackupWorkerManager()
- }
|