llm_base.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. package models
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "time"
  7. "yunion.io/x/jsonutils"
  8. "yunion.io/x/pkg/errors"
  9. "yunion.io/x/sqlchemy"
  10. "yunion.io/x/onecloud/pkg/apis"
  11. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  12. api "yunion.io/x/onecloud/pkg/apis/llm"
  13. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  14. "yunion.io/x/onecloud/pkg/httperrors"
  15. "yunion.io/x/onecloud/pkg/llm/options"
  16. cloudutil "yunion.io/x/onecloud/pkg/llm/utils"
  17. "yunion.io/x/onecloud/pkg/mcclient"
  18. "yunion.io/x/onecloud/pkg/mcclient/auth"
  19. "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  20. computeoptions "yunion.io/x/onecloud/pkg/mcclient/options/compute"
  21. "yunion.io/x/onecloud/pkg/util/stringutils2"
  22. )
  23. func NewSLLMBaseManager(dt interface{}, tableName string, keyword string, keywordPlural string) SLLMBaseManager {
  24. return SLLMBaseManager{
  25. SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
  26. dt,
  27. tableName,
  28. keyword,
  29. keywordPlural,
  30. ),
  31. }
  32. }
  33. type SLLMBaseManager struct {
  34. db.SVirtualResourceBaseManager
  35. db.SEnabledResourceBaseManager
  36. }
  37. type SLLMBase struct {
  38. db.SVirtualResourceBase
  39. db.SEnabledResourceBase
  40. CmpId string `width:"128" charset:"ascii" nullable:"true" list:"user"`
  41. LLMIp string `width:"20" charset:"ascii" nullable:"true" list:"user"`
  42. // Hypervisor string `width:"128" charset:"ascii" nullable:"true" list:"user"`
  43. Priority int `nullable:"false" default:"100" list:"user"`
  44. BandwidthMb int `nullable:"true" list:"user" create:"admin_optional"`
  45. LastInstantModelProbe time.Time `nullable:"true" list:"user" create:"admin_optional"`
  46. // 是否请求同步更新镜像
  47. SyncImageRequest bool `default:"false" nullable:"false" list:"user" update:"user"`
  48. VolumeUsedMb int `nullable:"true" list:"user"`
  49. VolumeUsedAt time.Time `nullable:"true" list:"user"`
  50. // 秒装应用配额(可安装的总容量限制)
  51. // InstantAppQuotaGb int `list:"user" update:"user" create:"optional" default:"0" nullable:"false"`
  52. DebugMode bool `default:"false" nullable:"false" list:"user" update:"user"`
  53. RootfsUnlimit bool `default:"false" nullable:"false" list:"user" update:"user"`
  54. NetworkType string `charset:"utf8" list:"user" update:"user" create:"optional"`
  55. NetworkId string `charset:"utf8" nullable:"true" list:"user" update:"user" create:"optional"`
  56. }
  57. func (man *SLLMBaseManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, input api.LLMBaseCreateInput) (api.LLMBaseCreateInput, error) {
  58. var err error
  59. input.VirtualResourceCreateInput, err = man.SVirtualResourceBaseManager.ValidateCreateData(ctx, userCred, ownerId, query, input.VirtualResourceCreateInput)
  60. if err != nil {
  61. return input, errors.Wrap(err, "validate VirtualResourceCreateInput")
  62. }
  63. /*
  64. if len(input.PreferHost) > 0 {
  65. s := auth.GetSession(ctx, userCred, "")
  66. hostJson, err := compute.Hosts.Get(s, input.PreferHost, nil)
  67. if err != nil {
  68. return input, errors.Wrap(err, "get host")
  69. }
  70. hostDetails := computeapi.HostDetails{}
  71. if err := hostJson.Unmarshal(&hostDetails); err != nil {
  72. return input, errors.Wrap(err, "unmarshal hostDetails")
  73. }
  74. if hostDetails.Enabled == nil || !*hostDetails.Enabled {
  75. return input, errors.Wrap(errors.ErrInvalidStatus, "not enabled")
  76. }
  77. if hostDetails.HostStatus != computeapi.HOST_ONLINE {
  78. return input, errors.Wrap(errors.ErrInvalidStatus, "not online")
  79. }
  80. if hostDetails.HostType != computeapi.HOST_TYPE_CONTAINER {
  81. return input, errors.Wrapf(httperrors.ErrNotAcceptable, "host_type %s not supported", hostDetails.HostType)
  82. }
  83. input.PreferHost = hostDetails.Id
  84. }
  85. */
  86. // 处理网络配置
  87. var firstNet *computeapi.NetworkConfig
  88. if len(input.Nets) > 0 {
  89. firstNet = input.Nets[0]
  90. firstNet.Index = 0
  91. if len(string(firstNet.NetType)) > 0 && !api.IsLLMSkuBaseNetworkType(string(firstNet.NetType)) {
  92. return input, errors.Wrapf(httperrors.ErrInputParameter, "invalid network type %s", firstNet.NetType)
  93. }
  94. if len(firstNet.Network) > 0 {
  95. s := auth.GetSession(ctx, userCred, "")
  96. netObj, err := compute.Networks.Get(s, firstNet.Network, nil)
  97. if err != nil {
  98. return input, errors.Wrapf(httperrors.ErrInputParameter, "invalid network_id %s", firstNet.Network)
  99. }
  100. netId, _ := netObj.GetString("id")
  101. netType, _ := netObj.GetString("server_type")
  102. firstNet.Network = netId
  103. if len(string(firstNet.NetType)) == 0 {
  104. firstNet.NetType = computeapi.TNetworkType(netType)
  105. }
  106. }
  107. } else {
  108. return input, errors.Wrap(httperrors.ErrInputParameter, "nets cannot be empty")
  109. }
  110. return input, nil
  111. }
  112. func (llmBase *SLLMBase) CustomizeCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
  113. err := llmBase.SVirtualResourceBase.CustomizeCreate(ctx, userCred, ownerId, query, data)
  114. if err != nil {
  115. return errors.Wrap(err, "SVirtualResourceBase.CustomizeCreate")
  116. }
  117. var input api.LLMBaseCreateInput
  118. if err := data.Unmarshal(&input); err != nil {
  119. return errors.Wrap(err, "unmarshal LLMBaseCreateInput")
  120. }
  121. if len(input.Nets) > 0 {
  122. firstNet := input.Nets[0]
  123. if len(string(firstNet.NetType)) > 0 {
  124. llmBase.NetworkType = string(firstNet.NetType)
  125. }
  126. if len(firstNet.Network) > 0 {
  127. llmBase.NetworkId = firstNet.Network
  128. }
  129. }
  130. return nil
  131. }
  132. func GetServerIdsByHost(ctx context.Context, userCred mcclient.TokenCredential, hostId string) ([]string, error) {
  133. s := auth.GetSession(ctx, userCred, options.Options.Region)
  134. params := computeoptions.ServerListOptions{}
  135. params.Scope = "maxallowed"
  136. params.Host = hostId
  137. params.Field = []string{"id"}
  138. limit := 1024
  139. params.Limit = &limit
  140. offset := 0
  141. total := -1
  142. idList := stringutils2.NewSortedStrings(nil)
  143. for total < 0 || offset < total {
  144. params.Offset = &offset
  145. results, err := compute.Servers.List(s, jsonutils.Marshal(params))
  146. if err != nil {
  147. return nil, errors.Wrap(err, "Servers.List")
  148. }
  149. total = results.Total
  150. for i := range results.Data {
  151. idStr, _ := results.Data[i].GetString("id")
  152. if len(idStr) > 0 {
  153. idList = idList.Append(idStr)
  154. }
  155. }
  156. offset += len(results.Data)
  157. }
  158. return idList, nil
  159. }
  160. func (man *SLLMBaseManager) ListItemFilter(ctx context.Context, q *sqlchemy.SQuery, userCred mcclient.TokenCredential, input api.LLMBaseListInput) (*sqlchemy.SQuery, error) {
  161. q, err := man.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, input.VirtualResourceListInput)
  162. if err != nil {
  163. return q, errors.Wrap(err, "VirtualResourceBaseManager.ListItemFilter")
  164. }
  165. q, err = man.SEnabledResourceBaseManager.ListItemFilter(ctx, q, userCred, input.EnabledResourceBaseListInput)
  166. if err != nil {
  167. return q, errors.Wrap(err, "SEnabledResourceBaseManager.ListItemFilter")
  168. }
  169. if len(input.NetworkType) > 0 {
  170. q = q.Equals("network_type", input.NetworkType)
  171. }
  172. if len(input.NetworkId) > 0 {
  173. s := auth.GetSession(ctx, userCred, "")
  174. netObj, err := compute.Networks.Get(s, input.NetworkId, nil)
  175. if err != nil {
  176. if errors.Cause(err) == sql.ErrNoRows {
  177. return nil, errors.Wrapf(httperrors.ErrResourceNotFound, "network %s not found", input.NetworkId)
  178. }
  179. return nil, errors.Wrap(err, "Networks.Get")
  180. }
  181. netId, _ := netObj.GetString("id")
  182. q = q.Equals("network_id", netId)
  183. }
  184. if len(input.Host) > 0 {
  185. serverIds, err := GetServerIdsByHost(ctx, userCred, input.Host)
  186. if err != nil {
  187. return nil, errors.Wrap(err, "GetServerIdsByHost")
  188. }
  189. q = q.In("svr_id", serverIds)
  190. }
  191. if len(input.Status) > 0 {
  192. s := auth.GetSession(ctx, userCred, options.Options.Region)
  193. params := computeoptions.ServerListOptions{}
  194. params.Scope = "maxallowed"
  195. params.Status = input.Status
  196. params.Field = []string{"guest_id"}
  197. limit := 1024
  198. params.Limit = &limit
  199. offset := 0
  200. total := -1
  201. idList := stringutils2.NewSortedStrings(nil)
  202. for total < 0 || offset < total {
  203. params.Offset = &offset
  204. results, err := compute.Containers.List(s, jsonutils.Marshal(params))
  205. if err != nil {
  206. return nil, errors.Wrap(err, "Containers.List")
  207. }
  208. total = results.Total
  209. for i := range results.Data {
  210. idStr, _ := results.Data[i].GetString("guest_id")
  211. if len(idStr) > 0 {
  212. idList = idList.Append(idStr)
  213. }
  214. }
  215. offset += len(results.Data)
  216. }
  217. q = q.In("svr_id", idList)
  218. }
  219. if input.NoVolume != nil {
  220. volumeQ := GetVolumeManager().Query("llm_id").SubQuery()
  221. q = q.LeftJoin(volumeQ, sqlchemy.Equals(q.Field("id"), volumeQ.Field("llm_id")))
  222. if *input.NoVolume {
  223. q = q.Filter(sqlchemy.IsNull(volumeQ.Field("llm_id")))
  224. } else {
  225. q = q.Filter(sqlchemy.IsNotNull(volumeQ.Field("llm_id")))
  226. }
  227. }
  228. if len(input.VolumeId) > 0 {
  229. volumeObj, err := GetVolumeManager().FetchByIdOrName(ctx, userCred, input.VolumeId)
  230. if err != nil {
  231. return nil, errors.Wrap(err, "VolumeManager.FetchByIdOrName")
  232. }
  233. vq := GetVolumeManager().Query().SubQuery()
  234. q = q.Join(vq, sqlchemy.Equals(q.Field("id"), vq.Field("llm_id")))
  235. q = q.Filter(sqlchemy.Equals(vq.Field("id"), volumeObj.GetId()))
  236. }
  237. accessQ := GetAccessInfoManager().Query().SubQuery()
  238. if input.ListenPort > 0 {
  239. q = q.Join(accessQ, sqlchemy.Equals(q.Field("id"), accessQ.Field("llm_id")))
  240. q = q.Filter(sqlchemy.Equals(accessQ.Field("listen_port"), input.ListenPort))
  241. }
  242. if len(input.PublicIp) > 0 {
  243. s := auth.GetSession(ctx, userCred, "")
  244. hostInput := computeapi.HostListInput{
  245. PublicIp: []string{input.PublicIp},
  246. }
  247. hostInput.Field = []string{"id"}
  248. hosts, err := compute.Hosts.List(s, jsonutils.Marshal(hostInput))
  249. if err != nil {
  250. return nil, errors.Wrap(err, "Hosts.List")
  251. }
  252. if len(hosts.Data) == 0 {
  253. return nil, httperrors.NewNotFoundError("Not found host by public_ip %s", input.PublicIp)
  254. }
  255. hostIds := []string{}
  256. for i := range hosts.Data {
  257. idStr, _ := hosts.Data[i].GetString("id")
  258. if len(idStr) > 0 {
  259. hostIds = append(hostIds, idStr)
  260. }
  261. }
  262. if len(hostIds) > 0 {
  263. serverIds, err := GetServerIdsByHost(ctx, userCred, hostIds[0])
  264. if err != nil {
  265. return nil, errors.Wrap(err, "GetServerIdsByHost")
  266. }
  267. q = q.In("svr_id", serverIds)
  268. }
  269. }
  270. return q, nil
  271. }
  272. func (llm *SLLMBase) GetServer(ctx context.Context) (*computeapi.ServerDetails, error) {
  273. return cloudutil.GetServer(ctx, llm.CmpId)
  274. }
  275. func (llm *SLLMBase) GetVolume() (*SVolume, error) {
  276. volume := &SVolume{}
  277. err := GetVolumeManager().Query().Equals("llm_id", llm.Id).First(volume)
  278. if err != nil {
  279. if errors.Cause(err) == sql.ErrNoRows {
  280. return nil, errors.Wrap(errors.ErrNotFound, "query volume")
  281. }
  282. return nil, errors.Wrap(err, "FetchVolume")
  283. }
  284. volume.SetModelManager(GetVolumeManager(), volume)
  285. return volume, nil
  286. }
  287. func GetDiskVolumeMounts(vols *api.Volumes, containerIndex int, postOverlays []*apis.ContainerVolumeMountDiskPostOverlay) []*apis.ContainerVolumeMount {
  288. if vols == nil {
  289. return nil
  290. }
  291. mounts := make([]*apis.ContainerVolumeMount, 0)
  292. for idx, vol := range *vols {
  293. volRelation := vol.GetVolumeByContainer(containerIndex)
  294. if volRelation == nil {
  295. continue
  296. }
  297. mounts = append(mounts, &apis.ContainerVolumeMount{
  298. UniqueName: fmt.Sprintf("volume-%d-%d-%s", idx, containerIndex, volRelation.MountPath),
  299. Type: apis.CONTAINER_VOLUME_MOUNT_TYPE_DISK,
  300. MountPath: volRelation.MountPath,
  301. Propagation: apis.MOUNTPROPAGATION_PROPAGATION_HOST_TO_CONTAINER,
  302. Disk: &apis.ContainerVolumeMountDisk{
  303. Index: &idx,
  304. SubDirectory: volRelation.SubDirectory,
  305. Overlay: volRelation.Overlay,
  306. PostOverlay: postOverlays,
  307. },
  308. FsUser: volRelation.FsUser,
  309. FsGroup: volRelation.FsGroup,
  310. })
  311. }
  312. return mounts
  313. }
  314. // 取消自动删除
  315. func (llm *SLLMBase) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
  316. return nil
  317. }
  318. func (llm *SLLMBase) RealDelete(ctx context.Context, userCred mcclient.TokenCredential) error {
  319. return llm.SVirtualResourceBase.Delete(ctx, userCred)
  320. }
  321. func (llm *SLLMBase) ServerDelete(ctx context.Context, userCred mcclient.TokenCredential, s *mcclient.ClientSession) error {
  322. if len(llm.CmpId) == 0 {
  323. return nil
  324. }
  325. server, err := llm.GetServer(ctx)
  326. if err != nil {
  327. if errors.Cause(err) == errors.ErrNotFound {
  328. return nil
  329. } else {
  330. return errors.Wrap(err, "GetServer")
  331. }
  332. }
  333. if server.DisableDelete != nil && *server.DisableDelete {
  334. // update to allow delete
  335. s2 := auth.GetSession(ctx, userCred, "")
  336. _, err = compute.Servers.Update(s2, llm.CmpId, jsonutils.Marshal(map[string]interface{}{"disable_delete": false}))
  337. if err != nil {
  338. return errors.Wrap(err, "update server to delete")
  339. }
  340. }
  341. _, err = compute.Servers.DeleteWithParam(s, llm.CmpId, jsonutils.Marshal(map[string]interface{}{
  342. "override_pending_delete": true,
  343. }), nil)
  344. if err != nil {
  345. return errors.Wrap(err, "delete server err:")
  346. }
  347. return nil
  348. }
  349. func (llm *SLLMBase) WaitDelete(ctx context.Context, userCred mcclient.TokenCredential, timeoutSecs int) error {
  350. return cloudutil.WaitDelete[computeapi.ServerDetails](ctx, &compute.Servers, llm.CmpId, timeoutSecs)
  351. }
  352. func (llm *SLLMBase) getImage(imageId string) (*SLLMImage, error) {
  353. image, err := GetLLMImageManager().FetchById(imageId)
  354. if err != nil {
  355. return nil, errors.Wrap(err, "fetch LLMImage")
  356. }
  357. return image.(*SLLMImage), nil
  358. }
  359. func (llm *SLLMBase) WaitServerStatus(ctx context.Context, userCred mcclient.TokenCredential, targetStatus []string, timeoutSecs int) (*computeapi.ServerDetails, error) {
  360. return cloudutil.WaitServerStatus(ctx, llm.CmpId, targetStatus, timeoutSecs)
  361. }