podhandlers.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package podhandlers
  15. import (
  16. "context"
  17. "fmt"
  18. "net/http"
  19. "net/url"
  20. "time"
  21. "k8s.io/apimachinery/pkg/util/proxy"
  22. "yunion.io/x/jsonutils"
  23. "yunion.io/x/log"
  24. "yunion.io/x/pkg/errors"
  25. "yunion.io/x/onecloud/pkg/apis"
  26. "yunion.io/x/onecloud/pkg/apis/compute"
  27. hostapi "yunion.io/x/onecloud/pkg/apis/host"
  28. "yunion.io/x/onecloud/pkg/appsrv"
  29. "yunion.io/x/onecloud/pkg/hostman/guestman"
  30. "yunion.io/x/onecloud/pkg/hostman/hostutils"
  31. "yunion.io/x/onecloud/pkg/hostman/options"
  32. "yunion.io/x/onecloud/pkg/httperrors"
  33. "yunion.io/x/onecloud/pkg/mcclient"
  34. "yunion.io/x/onecloud/pkg/mcclient/auth"
  35. "yunion.io/x/onecloud/pkg/util/flushwriter"
  36. )
  37. const (
  38. POD_ID = "<podId>"
  39. CONTAINER_ID = "<containerId>"
  40. )
  41. type containerActionFunc func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, containerId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error)
  42. type containerDelayActionParams struct {
  43. pod guestman.PodInstance
  44. containerId string
  45. body jsonutils.JSONObject
  46. }
  47. func containerSyncActionHandler(cf containerActionFunc) appsrv.FilterHandler {
  48. return _containerActionHandler(cf, true, nil)
  49. }
  50. func containerActionHandler(cf containerActionFunc) appsrv.FilterHandler {
  51. return _containerActionHandler(cf, false, nil)
  52. }
  53. func containerActionHandlerWithWorker(cf containerActionFunc, workerMan *appsrv.SWorkerManager) appsrv.FilterHandler {
  54. return _containerActionHandler(cf, false, workerMan)
  55. }
  56. func _containerActionHandler(cf containerActionFunc, isSync bool, workerMan *appsrv.SWorkerManager) appsrv.FilterHandler {
  57. return auth.Authenticate(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  58. params, _, body := appsrv.FetchEnv(ctx, w, r)
  59. podId := params[POD_ID]
  60. ctrId := params[CONTAINER_ID]
  61. userCred := auth.FetchUserCredential(ctx, nil)
  62. if body == nil {
  63. body = jsonutils.NewDict()
  64. }
  65. podObj, ok := guestman.GetGuestManager().GetServer(podId)
  66. if !ok {
  67. hostutils.Response(ctx, w, httperrors.NewNotFoundError("Not found pod %s", podId))
  68. return
  69. }
  70. pod, ok := podObj.(guestman.PodInstance)
  71. if !ok {
  72. hostutils.Response(ctx, w, httperrors.NewBadRequestError("runtime instance is %#v", podObj))
  73. return
  74. }
  75. delayParams := &containerDelayActionParams{
  76. pod: pod,
  77. containerId: ctrId,
  78. body: body,
  79. }
  80. if isSync {
  81. data, err := cf(ctx, userCred, delayParams.pod, delayParams.containerId, delayParams.body)
  82. if err != nil {
  83. hostutils.Response(ctx, w, httperrors.NewBadRequestError("error: %v", err))
  84. return
  85. }
  86. hostutils.Response(ctx, w, data)
  87. return
  88. } else {
  89. delayFunc := func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
  90. dp := params.(*containerDelayActionParams)
  91. return cf(ctx, userCred, dp.pod, dp.containerId, dp.body)
  92. }
  93. if workerMan != nil {
  94. hostutils.DelayTaskWithWorker(ctx, delayFunc, delayParams, workerMan)
  95. } else {
  96. hostutils.DelayTask(ctx, delayFunc, delayParams)
  97. }
  98. hostutils.ResponseOk(ctx, w)
  99. }
  100. })
  101. }
  102. func AddPodHandlers(prefix string, app *appsrv.Application) {
  103. ctrHandlers := map[string]containerActionFunc{
  104. "create": createContainer,
  105. "delete": deleteContainer,
  106. "sync-status": syncContainerStatus,
  107. "pull-image": pullImage,
  108. "save-volume-mount-to-image": saveVolumeMountToImage,
  109. "commit": commitContainer,
  110. "add-volume-mount-post-overlay": containerAddVolumeMountPostOverlay,
  111. "remove-volume-mount-post-overlay": containerRemoveVolumeMountPostOverlay,
  112. }
  113. for action, f := range ctrHandlers {
  114. app.AddHandler("POST",
  115. fmt.Sprintf("%s/pods/%s/containers/%s/%s", prefix, POD_ID, CONTAINER_ID, action),
  116. containerActionHandler(f))
  117. }
  118. startWorker := appsrv.NewWorkerManager("container-start-worker", options.HostOptions.ContainerStartWorkerCount, appsrv.DEFAULT_BACKLOG, false)
  119. stopWorker := appsrv.NewWorkerManager("container-stop-worker", options.HostOptions.ContainerStopWorkerCount, appsrv.DEFAULT_BACKLOG, false)
  120. ctrWorkerHanders := map[string]struct {
  121. workerMan *appsrv.SWorkerManager
  122. f containerActionFunc
  123. }{
  124. "start": {startWorker, startContainer},
  125. "stop": {stopWorker, stopContainer},
  126. }
  127. for action, fw := range ctrWorkerHanders {
  128. app.AddHandler("POST",
  129. fmt.Sprintf("%s/pods/%s/containers/%s/%s", prefix, POD_ID, CONTAINER_ID, action),
  130. containerActionHandlerWithWorker(fw.f, fw.workerMan))
  131. }
  132. execWorker := appsrv.NewWorkerManager("container-exec-worker", 16, appsrv.DEFAULT_BACKLOG, false)
  133. app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec-sync", prefix, POD_ID, CONTAINER_ID), execWorker, containerSyncActionHandler(containerExecSync)))
  134. app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec", prefix, POD_ID, CONTAINER_ID), execWorker, execContainer()))
  135. logWorker := appsrv.NewWorkerManager("container-log-worker", 64, appsrv.DEFAULT_BACKLOG, false)
  136. app.AddHandler3(newContainerWorkerHandler("GET", fmt.Sprintf("%s/pods/%s/containers/%s/log", prefix, POD_ID, CONTAINER_ID), logWorker, logContainer()))
  137. syncWorker := appsrv.NewWorkerManager("container-sync-action-worker", 16, appsrv.DEFAULT_BACKLOG, false)
  138. app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/set-resources-limit", prefix, POD_ID, CONTAINER_ID), syncWorker, containerSyncActionHandler(containerSetResourcesLimit)))
  139. }
  140. func pullImage(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  141. input := new(hostapi.ContainerPullImageInput)
  142. if err := body.Unmarshal(input); err != nil {
  143. return nil, errors.Wrap(err, "unmarshal to ContainerPullImageInput")
  144. }
  145. return pod.PullImage(ctx, userCred, ctrId, input)
  146. }
  147. func createContainer(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, id string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  148. input := new(hostapi.ContainerCreateInput)
  149. if err := body.Unmarshal(input); err != nil {
  150. return nil, errors.Wrap(err, "unmarshal to ContainerCreateInput")
  151. }
  152. return pod.CreateContainer(ctx, userCred, id, input)
  153. }
  154. func startContainer(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, containerId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  155. input := new(hostapi.ContainerCreateInput)
  156. if err := body.Unmarshal(input); err != nil {
  157. return nil, errors.Wrap(err, "unmarshal to ContainerCreateInput")
  158. }
  159. return pod.StartContainer(ctx, userCred, containerId, input)
  160. }
  161. func stopContainer(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  162. input := new(hostapi.ContainerStopInput)
  163. if err := body.Unmarshal(input); err != nil {
  164. return nil, errors.Wrapf(err, "unmarshal to ContainerStopInput: %s", body.String())
  165. }
  166. return pod.StopContainer(ctx, userCred, ctrId, input)
  167. }
  168. func deleteContainer(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, containerId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  169. return pod.DeleteContainer(ctx, userCred, containerId)
  170. }
  171. func syncContainerStatus(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, id string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  172. return pod.SyncContainerStatus(ctx, userCred, id)
  173. }
  174. func saveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  175. input := new(hostapi.ContainerSaveVolumeMountToImageInput)
  176. if err := body.Unmarshal(input); err != nil {
  177. return nil, errors.Wrap(err, "unmarshal to input")
  178. }
  179. return pod.SaveVolumeMountToImage(ctx, userCred, input, ctrId)
  180. }
  181. func newContainerWorkerHandler(method, urlPath string, worker *appsrv.SWorkerManager, hander appsrv.FilterHandler) *appsrv.SHandlerInfo {
  182. hi := &appsrv.SHandlerInfo{}
  183. hi.SetMethod(method)
  184. hi.SetPath(urlPath)
  185. hi.SetHandler(hander)
  186. hi.SetProcessTimeout(1 * time.Hour)
  187. hi.SetWorkerManager(worker)
  188. return hi
  189. }
  190. type containerWorkerActionHander func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter)
  191. func containerWorkerAction(handler containerWorkerActionHander) appsrv.FilterHandler {
  192. return auth.Authenticate(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  193. params, query, body := appsrv.FetchEnv(ctx, w, r)
  194. podId := params[POD_ID]
  195. ctrId := params[CONTAINER_ID]
  196. userCred := auth.FetchUserCredential(ctx, nil)
  197. podObj, ok := guestman.GetGuestManager().GetServer(podId)
  198. if !ok {
  199. hostutils.Response(ctx, w, httperrors.NewNotFoundError("Not found pod %s", podId))
  200. return
  201. }
  202. pod, ok := podObj.(guestman.PodInstance)
  203. if !ok {
  204. hostutils.Response(ctx, w, httperrors.NewBadRequestError("runtime instance is %#v", podObj))
  205. return
  206. }
  207. handler(ctx, userCred, pod, ctrId, query, body, r, w)
  208. })
  209. }
  210. func logContainer() appsrv.FilterHandler {
  211. return containerWorkerAction(func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter) {
  212. input := new(compute.PodLogOptions)
  213. if err := query.Unmarshal(input); err != nil {
  214. hostutils.Response(ctx, w, errors.Wrap(err, "unmarshal to PodLogOptions"))
  215. return
  216. }
  217. if err := compute.ValidatePodLogOptions(input); err != nil {
  218. hostutils.Response(ctx, w, err)
  219. return
  220. }
  221. if _, ok := w.(http.Flusher); !ok {
  222. hostutils.Response(ctx, w, errors.Errorf("unable to convert to http.Flusher"))
  223. return
  224. }
  225. w.Header().Set("Transfer-Encoding", "chunked")
  226. fw := flushwriter.Wrap(w)
  227. ctx, cancel := context.WithCancel(ctx)
  228. go func() {
  229. for {
  230. // check whether client request is closed
  231. select {
  232. case <-r.Context().Done():
  233. log.Infof("client request is closed, end session")
  234. cancel()
  235. return
  236. }
  237. }
  238. }()
  239. if err := pod.ReadLogs(ctx, userCred, ctrId, input, fw, fw); err != nil {
  240. hostutils.Response(ctx, w, errors.Wrap(err, "Read logs"))
  241. return
  242. }
  243. })
  244. }
  245. func execContainer() appsrv.FilterHandler {
  246. return containerWorkerAction(func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter) {
  247. input := new(compute.ContainerExecInput)
  248. if err := query.Unmarshal(input); err != nil {
  249. hostutils.Response(ctx, w, errors.Wrap(err, "unmarshal to ContainerExecInput"))
  250. return
  251. }
  252. criUrl, err := pod.ExecContainer(ctx, userCred, ctrId, input)
  253. if err != nil {
  254. hostutils.Response(ctx, w, errors.Wrap(err, "get exec url"))
  255. return
  256. }
  257. proxyStream(w, r, criUrl)
  258. return
  259. })
  260. }
  261. type responder struct {
  262. errorMessage string
  263. }
  264. func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
  265. http.Error(w, err.Error(), http.StatusInternalServerError)
  266. }
  267. func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
  268. handler := proxy.NewUpgradeAwareHandler(url, nil, false, true, &responder{})
  269. handler.ServeHTTP(w, r)
  270. }
  271. func containerExecSync(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, containerId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  272. input := new(compute.ContainerExecSyncInput)
  273. if err := body.Unmarshal(input); err != nil {
  274. return nil, errors.Wrap(err, "unmarshal to ContainerExecSyncInput")
  275. }
  276. return pod.ContainerExecSync(ctx, userCred, containerId, input)
  277. }
  278. func containerSetResourcesLimit(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, containerId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  279. input := new(apis.ContainerResources)
  280. if err := body.Unmarshal(input); err != nil {
  281. return nil, errors.Wrap(err, "unmarshal to ContainerExecSyncInput")
  282. }
  283. return pod.SetContainerResourceLimit(containerId, input)
  284. }
  285. func commitContainer(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  286. input := new(hostapi.ContainerCommitInput)
  287. if err := body.Unmarshal(input); err != nil {
  288. return nil, errors.Wrap(err, "unmarshal to ContainerCommitInput")
  289. }
  290. return pod.CommitContainer(ctx, userCred, ctrId, input)
  291. }
  292. func containerAddVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, containerId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  293. input := new(compute.ContainerVolumeMountAddPostOverlayInput)
  294. if err := body.Unmarshal(input); err != nil {
  295. return nil, errors.Wrap(err, "unmarshal to ContainerVolumeMountAddPostOverlayInput")
  296. }
  297. return nil, pod.AddContainerVolumeMountPostOverlay(ctx, userCred, containerId, input)
  298. }
  299. func containerRemoveVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, containerId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  300. input := new(compute.ContainerVolumeMountRemovePostOverlayInput)
  301. if err := body.Unmarshal(input); err != nil {
  302. return nil, errors.Wrap(err, "unmarshal to ContainerMountVolumeRemovePostOverlayInput")
  303. }
  304. return nil, pod.RemoveContainerVolumeMountPostOverlay(ctx, userCred, containerId, input)
  305. }