llm_create_task.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package llm
  2. import (
  3. "context"
  4. "yunion.io/x/jsonutils"
  5. "yunion.io/x/pkg/errors"
  6. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  7. api "yunion.io/x/onecloud/pkg/apis/llm"
  8. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  9. "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
  10. "yunion.io/x/onecloud/pkg/llm/models"
  11. "yunion.io/x/onecloud/pkg/mcclient/auth"
  12. "yunion.io/x/onecloud/pkg/util/logclient"
  13. )
  14. type LLMCreateTask struct {
  15. taskman.STask
  16. }
  17. func init() {
  18. taskman.RegisterTask(LLMCreateTask{})
  19. }
  20. func (task *LLMCreateTask) taskFailed(ctx context.Context, llm *models.SLLM, err error) {
  21. llm.SetStatus(ctx, task.UserCred, api.LLM_STATUS_CREATE_FAIL, err.Error())
  22. db.OpsLog.LogEvent(llm, db.ACT_CREATE, err, task.UserCred)
  23. logclient.AddActionLogWithStartable(task, llm, logclient.ACT_CREATE, err, task.UserCred, false)
  24. task.SetStageFailed(ctx, jsonutils.NewString(err.Error()))
  25. }
  26. func (task *LLMCreateTask) taskComplete(ctx context.Context, llm *models.SLLM, status string) {
  27. llm.SetStatus(ctx, task.GetUserCred(), status, "create success")
  28. task.SetStageComplete(ctx, nil)
  29. }
  30. func (task *LLMCreateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject) {
  31. llm := obj.(*models.SLLM)
  32. serverCreateInput := api.LLMCreateInput{}
  33. err := body.Unmarshal(&serverCreateInput)
  34. if err != nil {
  35. task.taskFailed(ctx, llm, err)
  36. return
  37. }
  38. serverCreateInput.Name = llm.Name
  39. task.SetStage("OnLLMRefreshStatusComplete", nil)
  40. s := auth.GetSession(ctx, task.GetUserCred(), "")
  41. err = s.WithTaskCallback(task.GetId(), func() error {
  42. serverId, err := llm.ServerCreate(ctx, task.UserCred, s, &serverCreateInput)
  43. if err != nil {
  44. task.taskFailed(ctx, llm, err)
  45. return err
  46. }
  47. db.Update(llm, func() error {
  48. llm.CmpId = serverId
  49. return nil
  50. })
  51. llm.CmpId = serverId
  52. return nil
  53. })
  54. if err != nil {
  55. task.OnLLMRefreshStatusCompleteFailed(ctx, llm, jsonutils.Marshal(err))
  56. }
  57. // var expectStatus []string
  58. // if serverCreateInput.AutoStart {
  59. // expectStatus = []string{computeapi.VM_RUNNING}
  60. // } else {
  61. // expectStatus = []string{computeapi.VM_READY}
  62. // }
  63. // taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
  64. // server, err := llm.WaitServerStatus(ctx, task.UserCred, expectStatus, 7200)
  65. // if err != nil {
  66. // return nil, errors.Wrap(err, "WaitServerStatus")
  67. // }
  68. // return jsonutils.Marshal(server), nil
  69. // })
  70. }
  71. func (task *LLMCreateTask) OnLLMRefreshStatusCompleteFailed(ctx context.Context, llm *models.SLLM, err jsonutils.JSONObject) {
  72. task.taskFailed(ctx, llm, errors.Error(err.String()))
  73. }
  74. func (task *LLMCreateTask) OnLLMRefreshStatusComplete(ctx context.Context, llm *models.SLLM, body jsonutils.JSONObject) {
  75. server, err := llm.GetServer(ctx)
  76. if err != nil {
  77. task.taskFailed(ctx, llm, errors.Wrap(err, "Get Server"))
  78. return
  79. }
  80. mountedModels, err := llm.FetchMountedModelFullName()
  81. if err != nil {
  82. task.taskFailed(ctx, llm, errors.Wrap(err, "FetchMountedModelFullName"))
  83. return
  84. }
  85. // 创建磁盘
  86. for _, disk := range server.DisksInfo {
  87. volume := models.SVolume{}
  88. volume.CmpId = disk.Id
  89. volume.LLMId = llm.Id
  90. volume.SizeMB = disk.SizeMb
  91. volume.Name = disk.Name
  92. volume.StorageType = disk.StorageType
  93. volume.Status = computeapi.DISK_READY
  94. volume.DomainId = llm.DomainId
  95. volume.ProjectId = llm.ProjectId
  96. volume.ProjectSrc = llm.ProjectSrc
  97. // if len(input.TemplateId) > 0 {
  98. volume.TemplateId = disk.ImageId
  99. // }
  100. volume.MountedModels = mountedModels
  101. err := models.GetVolumeManager().TableSpec().Insert(ctx, &volume)
  102. if err != nil {
  103. task.taskFailed(ctx, llm, errors.Wrap(err, "VolumeManager.TableSpec().Insert"))
  104. return
  105. }
  106. }
  107. // 创建访问信息、portmappings
  108. if len(server.Nics) > 0 {
  109. db.Update(llm, func() error {
  110. llm.LLMIp = server.Nics[0].IpAddr
  111. return nil
  112. })
  113. for _, portMapping := range server.Nics[0].PortMappings {
  114. access := models.SAccessInfo{}
  115. access.LLMId = llm.Id
  116. access.ListenPort = int(portMapping.Port)
  117. access.AccessPort = int(*portMapping.HostPort)
  118. access.Protocol = string(portMapping.Protocol)
  119. access.RemoteIps = portMapping.RemoteIps
  120. envs := make([]api.PortMappingEnv, 0)
  121. for _, env := range portMapping.Envs {
  122. envs = append(envs, api.PortMappingEnv{
  123. Key: env.Key,
  124. ValueFrom: string(env.ValueFrom),
  125. })
  126. }
  127. access.PortMappingEnvs = envs
  128. models.GetAccessInfoManager().TableSpec().Insert(ctx, &access)
  129. }
  130. }
  131. if _, err := llm.SyncLLMContainer(ctx, task.GetUserCred(), server); err != nil {
  132. task.taskFailed(ctx, llm, errors.Wrap(err, "SyncLLMContainer"))
  133. return
  134. }
  135. // When AutoStart was true, compute auto-starts the server so LLMStartTask is never run. We must run StartLLM here.
  136. var createInput api.LLMCreateInput
  137. if task.GetParams() != nil && task.GetParams().Unmarshal(&createInput) == nil && createInput.AutoStart {
  138. _, err = llm.WaitServerStatus(ctx, task.GetUserCred(), []string{computeapi.VM_RUNNING}, 7200)
  139. if err != nil {
  140. task.taskFailed(ctx, llm, errors.Wrap(err, "WaitServerStatus VM_RUNNING"))
  141. return
  142. }
  143. _, err = llm.WaitContainerStatus(ctx, task.GetUserCred(), []string{computeapi.CONTAINER_STATUS_RUNNING}, 120)
  144. if err != nil {
  145. task.taskFailed(ctx, llm, errors.Wrap(err, "WaitContainerStatus"))
  146. return
  147. }
  148. err = llm.GetLLMContainerDriver().StartLLM(ctx, task.GetUserCred(), llm)
  149. if err != nil {
  150. task.taskFailed(ctx, llm, errors.Wrap(err, "StartLLM"))
  151. return
  152. }
  153. task.taskComplete(ctx, llm, api.LLM_STATUS_RUNNING)
  154. return
  155. }
  156. task.taskComplete(ctx, llm, server.Status)
  157. }