pod.go 111 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package guestman
  15. import (
  16. "context"
  17. "encoding/base64"
  18. "encoding/json"
  19. "fmt"
  20. "io"
  21. "io/ioutil"
  22. "net/url"
  23. "os"
  24. "path"
  25. "path/filepath"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "github.com/opencontainers/runtime-spec/specs-go"
  31. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
  32. "yunion.io/x/jsonutils"
  33. "yunion.io/x/log"
  34. "yunion.io/x/pkg/errors"
  35. "yunion.io/x/pkg/util/sets"
  36. "yunion.io/x/pkg/utils"
  37. "yunion.io/x/onecloud/pkg/apis"
  38. computeapi "yunion.io/x/onecloud/pkg/apis/compute"
  39. hostapi "yunion.io/x/onecloud/pkg/apis/host"
  40. "yunion.io/x/onecloud/pkg/hostman/container/device"
  41. "yunion.io/x/onecloud/pkg/hostman/container/lifecycle"
  42. "yunion.io/x/onecloud/pkg/hostman/container/prober"
  43. proberesults "yunion.io/x/onecloud/pkg/hostman/container/prober/results"
  44. "yunion.io/x/onecloud/pkg/hostman/container/snapshot_service"
  45. "yunion.io/x/onecloud/pkg/hostman/container/status"
  46. "yunion.io/x/onecloud/pkg/hostman/container/volume_mount"
  47. "yunion.io/x/onecloud/pkg/hostman/container/volume_mount/disk"
  48. _ "yunion.io/x/onecloud/pkg/hostman/container/volume_mount/disk"
  49. "yunion.io/x/onecloud/pkg/hostman/guestman/desc"
  50. "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime"
  51. "yunion.io/x/onecloud/pkg/hostman/guestman/pod/statusman"
  52. deployapi "yunion.io/x/onecloud/pkg/hostman/hostdeployer/apis"
  53. "yunion.io/x/onecloud/pkg/hostman/hostinfo"
  54. "yunion.io/x/onecloud/pkg/hostman/hostutils"
  55. "yunion.io/x/onecloud/pkg/hostman/isolated_device"
  56. _ "yunion.io/x/onecloud/pkg/hostman/isolated_device/container_device/cdi"
  57. "yunion.io/x/onecloud/pkg/hostman/options"
  58. "yunion.io/x/onecloud/pkg/hostman/storageman"
  59. "yunion.io/x/onecloud/pkg/httperrors"
  60. "yunion.io/x/onecloud/pkg/mcclient"
  61. "yunion.io/x/onecloud/pkg/mcclient/auth"
  62. computemod "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  63. imagemod "yunion.io/x/onecloud/pkg/mcclient/modules/image"
  64. "yunion.io/x/onecloud/pkg/util/cgrouputils/cpuset"
  65. "yunion.io/x/onecloud/pkg/util/fileutils2"
  66. "yunion.io/x/onecloud/pkg/util/mountutils"
  67. podutil "yunion.io/x/onecloud/pkg/util/pod"
  68. containerdutil "yunion.io/x/onecloud/pkg/util/pod/containerd"
  69. "yunion.io/x/onecloud/pkg/util/pod/logs"
  70. "yunion.io/x/onecloud/pkg/util/pod/nerdctl"
  71. "yunion.io/x/onecloud/pkg/util/procutils"
  72. )
  73. func (m *SGuestManager) startContainerProbeManager() {
  74. livenessManager := proberesults.NewManager()
  75. startupManager := proberesults.NewManager()
  76. man := prober.NewManager(status.NewManager(), livenessManager, startupManager, newContainerRunner(m))
  77. m.containerProbeManager = man
  78. man.Start()
  79. }
  80. func (m *SGuestManager) GetContainerProbeManager() prober.Manager {
  81. return m.containerProbeManager
  82. }
  83. func newContainerRunner(man *SGuestManager) *containerRunner {
  84. return &containerRunner{man}
  85. }
  86. type containerRunner struct {
  87. manager *SGuestManager
  88. }
  89. func (cr *containerRunner) RunInContainer(podId string, containerId string, cmd []string, timeout time.Duration) ([]byte, error) {
  90. srv, ok := cr.manager.GetServer(podId)
  91. if !ok {
  92. return nil, errors.Wrapf(httperrors.ErrNotFound, "server %s not found", podId)
  93. }
  94. s := srv.(*sPodGuestInstance)
  95. ctrCriId, err := s.getContainerCRIId(containerId)
  96. if err != nil {
  97. return nil, errors.Wrap(err, "get container cri id")
  98. }
  99. resp, err := s.getCRI().ExecSync(context.Background(), ctrCriId, cmd, int64(timeout.Seconds()))
  100. if err != nil {
  101. return nil, errors.Wrapf(err, "exec sync %#v to %s", cmd, ctrCriId)
  102. }
  103. return append(resp.Stdout, resp.Stderr...), nil
  104. }
  105. type PodInstance interface {
  106. GuestRuntimeInstance
  107. GetCRIId() string
  108. GetContainerById(ctrId string) *hostapi.ContainerDesc
  109. GetContainerByCRIId(criId string) (*hostapi.ContainerDesc, error)
  110. CreateContainer(ctx context.Context, userCred mcclient.TokenCredential, id string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error)
  111. StartContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error)
  112. StartLocalContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error)
  113. DeleteContainer(ctx context.Context, cred mcclient.TokenCredential, id string) (jsonutils.JSONObject, error)
  114. SyncStatus(reason string, ctrId string)
  115. SyncContainerStatus(ctx context.Context, cred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error)
  116. StopContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerStopInput) (jsonutils.JSONObject, error)
  117. GetContainerStatus(ctx context.Context, ctrId string) (string, *runtime.Status, error)
  118. IsPrimaryContainer(ctrId string) bool
  119. StopAll(ctx context.Context) error
  120. PullImage(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error)
  121. SaveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredential, input *hostapi.ContainerSaveVolumeMountToImageInput, ctrId string) (jsonutils.JSONObject, error)
  122. ExecContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecInput) (*url.URL, error)
  123. ContainerExecSync(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecSyncInput) (jsonutils.JSONObject, error)
  124. SetContainerResourceLimit(ctrId string, limit *apis.ContainerResources) (jsonutils.JSONObject, error)
  125. CommitContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCommitInput) (jsonutils.JSONObject, error)
  126. AddContainerVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerVolumeMountAddPostOverlayInput) error
  127. RemoveContainerVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerVolumeMountRemovePostOverlayInput) error
  128. ReadLogs(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.PodLogOptions, stdout, stderr io.Writer) error
  129. // for monitoring
  130. GetVolumeMountUsages() (map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage, error)
  131. IsInternalStopped(ctrCriId string) (*ContainerExpectedStatus, bool)
  132. IsInternalRemoved(ctrCriId string) bool
  133. GetPodContainerCriIds() []string
  134. // For container log rotation: log dir, relative log path per container, and ctrId->criId map
  135. GetPodLogDir() string
  136. GetContainerLogPath(ctrId string) string
  137. ListContainerCriIds() map[string]string
  138. }
  139. type sContainer struct {
  140. Id string `json:"id"`
  141. Index int `json:"index"`
  142. CRIId string `json:"cri_id"`
  143. }
  144. func newContainer(id string) *sContainer {
  145. return &sContainer{
  146. Id: id,
  147. }
  148. }
  149. type startStatHelper struct {
  150. podId string
  151. homeDir string
  152. }
  153. func newStartStatHelper(podId string, homeDir string) *startStatHelper {
  154. return &startStatHelper{
  155. podId: podId,
  156. homeDir: homeDir,
  157. }
  158. }
  159. func (h startStatHelper) getPodFile() string {
  160. return filepath.Join(h.homeDir, "pod-start.stat")
  161. }
  162. func (h startStatHelper) IsPodFileExists() bool {
  163. return fileutils2.Exists(h.getPodFile())
  164. }
  165. func (h startStatHelper) createStatFile(fp string) error {
  166. if fileutils2.Exists(fp) {
  167. return nil
  168. }
  169. if err := podutil.EnsureFile(fp, "", "755"); err != nil {
  170. return errors.Wrapf(err, "ensure file %s", fp)
  171. }
  172. return nil
  173. }
  174. func (h startStatHelper) removeStatFile(fp string) error {
  175. if !fileutils2.Exists(fp) {
  176. return nil
  177. }
  178. if err := os.Remove(fp); err != nil && !strings.Contains(err.Error(), "no such file or directory") {
  179. return errors.Wrapf(err, "remove file %s", fp)
  180. }
  181. return nil
  182. }
  183. func (h startStatHelper) CreatePodFile() error {
  184. return h.createStatFile(h.getPodFile())
  185. }
  186. func (h startStatHelper) RemovePodFile() error {
  187. return h.removeStatFile(h.getPodFile())
  188. }
  189. func (h startStatHelper) getContainerFile(ctrId string) string {
  190. return filepath.Join(h.homeDir, fmt.Sprintf("container-start-%s.stat", ctrId))
  191. }
  192. func (h startStatHelper) IsContainerFileExists(ctrId string) bool {
  193. return fileutils2.Exists(h.getContainerFile(ctrId))
  194. }
  195. func (h startStatHelper) CreateContainerFile(ctrId string) error {
  196. return h.createStatFile(h.getContainerFile(ctrId))
  197. }
  198. func (h startStatHelper) RemoveContainerFile(ctrId string) error {
  199. return h.removeStatFile(h.getContainerFile(ctrId))
  200. }
  201. type sPodGuestInstance struct {
  202. *sBaseGuestInstance
  203. containers map[string]*sContainer
  204. startStat *startStatHelper
  205. expectedStatus *PodExpectedStatus
  206. startPodLock sync.Mutex
  207. saveContainerLock sync.Mutex
  208. }
  209. func newPodGuestInstance(id string, man *SGuestManager) PodInstance {
  210. p := &sPodGuestInstance{
  211. sBaseGuestInstance: newBaseGuestInstance(id, man, computeapi.HYPERVISOR_POD),
  212. containers: make(map[string]*sContainer),
  213. startPodLock: sync.Mutex{},
  214. saveContainerLock: sync.Mutex{},
  215. }
  216. es, err := NewPodExpectedStatus(p.HomeDir(), computeapi.VM_UNKNOWN)
  217. if err != nil {
  218. log.Fatalf("NewPodExpectedStatus failed of %s: %s", p.HomeDir(), err)
  219. }
  220. p.expectedStatus = es
  221. p.startStat = newStartStatHelper(id, p.HomeDir())
  222. return p
  223. }
  224. func (s *sPodGuestInstance) CleanGuest(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
  225. var err error
  226. criId := s.GetCRIId()
  227. if criId == "" {
  228. criId, err = s.getPodIdFromCRI()
  229. if err != nil && !IsContainerNotFoundError(err) {
  230. return nil, errors.Wrapf(err, "get cri pod id")
  231. }
  232. }
  233. if criId != "" {
  234. if err := s.getCRI().RemovePod(ctx, criId); err != nil {
  235. return nil, errors.Wrapf(err, "RemovePod with cri_id %q", criId)
  236. }
  237. }
  238. return nil, DeleteHomeDir(s)
  239. }
  240. func (s *sPodGuestInstance) getPodIdFromCRI() (string, error) {
  241. ids, err := runtime.GetSandboxIDByPodUID(s.getCRI(), s.GetInitialId(), nil)
  242. if err != nil {
  243. return "", errors.Wrapf(err, "get pod cri_id by uid %s", s.GetInitialId())
  244. }
  245. if len(ids) == 0 {
  246. return "", errors.Wrapf(errors.ErrNotFound, "not found cri pod by uid %s", s.GetInitialId())
  247. }
  248. return ids[0], nil
  249. }
  250. func (s *sPodGuestInstance) ExitCleanup(clear bool) {
  251. }
  252. func (s *sPodGuestInstance) CleanDirtyGuest(ctx context.Context) error {
  253. _, err := s.CleanGuest(ctx, false)
  254. return err
  255. }
  256. func (s *sPodGuestInstance) ImportServer(pendingDelete bool) {
  257. // TODO: 参考SKVMGuestInstance,可以做更多的事,比如同步状态
  258. s.manager.SaveServer(s.Id, s)
  259. s.manager.RemoveCandidateServer(s)
  260. if s.IsDaemon() || s.IsDirtyShutdown() {
  261. ctx := context.Background()
  262. cred := hostutils.GetComputeSession(ctx).GetToken()
  263. if err := s.StartLocalDirtyPod(ctx, cred); err != nil {
  264. log.Errorf("start local pod err %s", err.Error())
  265. }
  266. } else {
  267. s.SyncStatus("sync status after host started", "")
  268. s.getProbeManager().AddPod(s)
  269. }
  270. }
  271. func (s *sPodGuestInstance) isPodDirtyShutdown() bool {
  272. if !s.IsRunning() && s.startStat.IsPodFileExists() {
  273. return true
  274. }
  275. return false
  276. }
  277. func (s *sPodGuestInstance) isContainerDirtyShutdown(ctrId string) bool {
  278. isRunning, err := s.IsContainerRunning(context.Background(), ctrId)
  279. if err != nil {
  280. log.Warningf("[isContainerDrityShutdown] IsContainerRunning(%s, %s): %v", s.GetId(), ctrId, err)
  281. }
  282. if !isRunning && s.startStat.IsContainerFileExists(ctrId) {
  283. return true
  284. }
  285. return false
  286. }
  287. func (s *sPodGuestInstance) IsDirtyShutdown() bool {
  288. if !s.manager.EnableDirtyRecoveryFeature() {
  289. return false
  290. }
  291. if s.isPodDirtyShutdown() {
  292. return true
  293. }
  294. for _, ctr := range s.GetContainers() {
  295. if s.isContainerDirtyShutdown(ctr.Id) {
  296. return true
  297. }
  298. }
  299. return false
  300. }
  301. func (s *sPodGuestInstance) getStatus(ctx context.Context, defaultStatus string) string {
  302. status := defaultStatus
  303. if status == "" {
  304. status = computeapi.VM_READY
  305. }
  306. if s.IsRunning() {
  307. status = computeapi.VM_RUNNING
  308. }
  309. for _, c := range s.GetContainers() {
  310. cStatus, cs, err := s.getContainerStatus(ctx, c.Id)
  311. if err != nil {
  312. log.Errorf("get container %s status of pod %s", c.Id, s.Id)
  313. continue
  314. }
  315. if cs != nil {
  316. status = GetPodStatusByContainerStatus(status, cStatus, s.IsPrimaryContainer(c.Id))
  317. }
  318. }
  319. return status
  320. }
  321. func (s *sPodGuestInstance) GetUploadStatus(ctx context.Context, reason string) (*computeapi.HostUploadGuestStatusInput, error) {
  322. // sync pod status
  323. var status = computeapi.VM_READY
  324. if s.IsRunning() {
  325. status = computeapi.VM_RUNNING
  326. }
  327. errs := make([]error, 0)
  328. if err := s.expectedStatus.SetStatus(status); err != nil {
  329. log.Warningf("set expected status to %s, reason: %s, err: %s", status, reason, err.Error())
  330. }
  331. /*if status == computeapi.VM_READY {
  332. // remove pod
  333. if err := s.stopPod(ctx, 5); err != nil {
  334. log.Warningf("stop cri pod when sync status: %s: %v", s.Id, err)
  335. }
  336. }*/
  337. // sync container's status
  338. cStatuss := make(map[string]*computeapi.ContainerPerformStatusInput)
  339. for _, c := range s.GetContainers() {
  340. cStatus, cs, err := s.getContainerStatus(ctx, c.Id)
  341. if err != nil {
  342. log.Errorf("get container %s status of pod %s", c.Id, s.Id)
  343. continue
  344. }
  345. /*if err := s.expectedStatus.SetContainerStatus(c.CRIId, c.Id, cStatus); err != nil {
  346. log.Warningf("expectedStatus.SetContainerStatus(%s, %s) to %s, error: %s", s.GetId(), c.Id, cStatus, err.Error())
  347. }*/
  348. ctrStatusInput := &computeapi.ContainerPerformStatusInput{
  349. PerformStatusInput: apis.PerformStatusInput{
  350. Status: cStatus,
  351. Reason: reason,
  352. HostId: hostinfo.Instance().HostId,
  353. },
  354. }
  355. if cs != nil {
  356. if computeapi.ContainerNoFailedRunningStatus.Has(cStatus) {
  357. ctrStatusInput.RestartCount = 0
  358. } else {
  359. ctrStatusInput.RestartCount = cs.RestartCount
  360. }
  361. if !cs.StartedAt.IsZero() {
  362. ctrStatusInput.StartedAt = &cs.StartedAt
  363. }
  364. if !cs.FinishedAt.IsZero() {
  365. ctrStatusInput.LastFinishedAt = &cs.FinishedAt
  366. }
  367. if ctr := s.GetContainerById(c.Id); ctr != nil {
  368. ctr.RestartCount = ctrStatusInput.RestartCount
  369. ctr.StartedAt = cs.StartedAt
  370. ctr.LastFinishedAt = cs.FinishedAt
  371. if err := s.SaveContainerDesc(ctr); err != nil {
  372. errs = append(errs, errors.Wrapf(err, "save container desc for %s/%s", ctr.Id, ctr.Name))
  373. }
  374. }
  375. }
  376. cStatuss[c.Id] = ctrStatusInput
  377. status = GetPodStatusByContainerStatus(status, cStatus, s.IsPrimaryContainer(c.Id))
  378. }
  379. if len(errs) > 0 {
  380. log.Errorf("get upload status error: %v", errors.NewAggregate(errs))
  381. }
  382. statusInput := &apis.PerformStatusInput{
  383. Status: status,
  384. Reason: reason,
  385. PowerStates: GetPowerStates(s),
  386. HostId: hostinfo.Instance().HostId,
  387. }
  388. return &computeapi.HostUploadGuestStatusInput{
  389. PerformStatusInput: *statusInput,
  390. Containers: cStatuss,
  391. }, nil
  392. }
  393. // UploadStatus uploads the status of the pod and the specified container to the server
  394. // If uploadCtrId is not empty, only the status of the specified container will be uploaded
  395. // If uploadCtrId is empty, all containers' status will be uploaded
  396. func (s *sPodGuestInstance) UploadStatus(ctx context.Context, reason string, uploadCtrId string) error {
  397. /*resp, err := s.GetUploadStatus(ctx, reason)
  398. if err != nil {
  399. return errors.Wrapf(err, "get upload status of pod: %s", reason)
  400. }
  401. errs := make([]error, 0)
  402. // sync container's status
  403. for id, ctrStatusInput := range resp.Containers {
  404. if _, err := hostutils.UpdateContainerStatus(ctx, id, ctrStatusInput); err != nil {
  405. errs = append(errs, errors.Wrapf(err, "failed update container %s status", id))
  406. }
  407. // 同步容器状态可能会出现 probing 状态,所以需要 mark 成 dirty,等待 probe manager 重新探测容器状态
  408. s.markContainerProbeDirty(ctrStatusInput.Status, id, reason)
  409. }
  410. if _, err := hostutils.UpdateServerStatus(ctx, s.Id, &resp.PerformStatusInput); err != nil {
  411. errs = append(errs, errors.Wrapf(err, "failed update guest status"))
  412. }*/
  413. // return errors.NewAggregate(errs)
  414. resp, err := s.GetUploadStatus(ctx, reason)
  415. if err != nil {
  416. return errors.Wrapf(err, "get upload status of pod: %s", reason)
  417. }
  418. containerStatuses := make(map[string]*statusman.ContainerStatus)
  419. for ctrId, cStatus := range resp.Containers {
  420. if uploadCtrId != "" && ctrId != uploadCtrId {
  421. continue
  422. }
  423. containerStatuses[ctrId] = &statusman.ContainerStatus{
  424. Status: cStatus.Status,
  425. RestartCount: cStatus.RestartCount,
  426. StartedAt: cStatus.StartedAt,
  427. LastFinishedAt: cStatus.LastFinishedAt,
  428. }
  429. }
  430. if err := statusman.GetManager().UpdateStatus(&statusman.PodStatusUpdateRequest{
  431. Id: s.Id,
  432. Pod: s,
  433. Status: resp.Status,
  434. ContainerStatuses: containerStatuses,
  435. Reason: reason,
  436. }); err != nil {
  437. return errors.Wrapf(err, "update status of pod: %s", reason)
  438. }
  439. return nil
  440. }
  441. func (s *sPodGuestInstance) PostUploadStatus(resp *computeapi.HostUploadGuestStatusInput, reason string) {
  442. for ctrId, cStatus := range resp.Containers {
  443. s.markContainerProbeDirty(cStatus.Status, ctrId, reason)
  444. }
  445. }
  446. func (s *sPodGuestInstance) SyncStatus(reason string, ctrId string) {
  447. if err := s.UploadStatus(context.Background(), reason, ctrId); err != nil {
  448. log.Warningf("upload status failed, reason: %s, err: %v", reason, err)
  449. }
  450. }
  451. func (s *sPodGuestInstance) DeployFs(ctx context.Context, userCred mcclient.TokenCredential, deployInfo *deployapi.DeployInfo) (jsonutils.JSONObject, error) {
  452. // update port_mappings
  453. /*podInput, err := s.getPodCreateParams()
  454. if err != nil {
  455. return nil, errors.Wrap(err, "getPodCreateParams")
  456. }
  457. if len(podInput.PortMappings) != 0 {
  458. pms, err := s.getPortMappings(podInput.PortMappings)
  459. if err != nil {
  460. return nil, errors.Wrap(err, "get port mappings")
  461. }
  462. if err := s.setPortMappings(ctx, userCred, s.convertToPodMetadataPortMappings(pms)); err != nil {
  463. return nil, errors.Wrap(err, "set port mappings")
  464. }
  465. }*/
  466. return nil, nil
  467. }
  468. func (s *sPodGuestInstance) IsStopped() bool {
  469. //TODO implement me
  470. panic("implement me")
  471. }
  472. func (s *sPodGuestInstance) IsSuspend() bool {
  473. return false
  474. }
  475. func (s *sPodGuestInstance) getCRI() podutil.CRI {
  476. return s.manager.GetCRI()
  477. }
  478. func (s *sPodGuestInstance) getProbeManager() prober.Manager {
  479. return s.manager.GetContainerProbeManager()
  480. }
  481. func (s *sPodGuestInstance) getHostCPUMap() *podutil.HostContainerCPUMap {
  482. return s.manager.GetContainerCPUMap()
  483. }
  484. func (s *sPodGuestInstance) getPod(ctx context.Context) (*runtimeapi.PodSandbox, error) {
  485. pods, err := s.getCRI().ListPods(ctx, podutil.ListPodOptions{})
  486. if err != nil {
  487. return nil, errors.Wrap(err, "ListPods")
  488. }
  489. for _, p := range pods {
  490. if p.Metadata.Uid == s.Id {
  491. return p, nil
  492. }
  493. }
  494. return nil, errors.Wrap(httperrors.ErrNotFound, "Not found pod from containerd")
  495. }
  496. func (s *sPodGuestInstance) IsRunning() bool {
  497. pod, err := s.getPod(context.Background())
  498. if err != nil {
  499. return false
  500. }
  501. if pod.GetState() == runtimeapi.PodSandboxState_SANDBOX_READY {
  502. return true
  503. }
  504. return false
  505. }
  506. func (s *sPodGuestInstance) IsContainerRunning(ctx context.Context, ctrId string) (bool, error) {
  507. status, _, err := s.getContainerStatus(ctx, ctrId)
  508. if err != nil {
  509. return false, errors.Wrapf(err, "get container %s status error", ctrId)
  510. }
  511. if computeapi.ContainerRunningStatus.Has(status) {
  512. return true, nil
  513. }
  514. return false, nil
  515. }
  516. func (s *sPodGuestInstance) probeGuestStatus(ctx context.Context, resp *computeapi.HostUploadGuestStatusInput) {
  517. resp.Status = s.getStatus(ctx, resp.Status)
  518. }
  519. func (s *sPodGuestInstance) HandleGuestStatus(ctx context.Context, resp *computeapi.HostUploadGuestStatusInput, isBatch bool) *computeapi.HostUploadGuestStatusInput {
  520. //s.probeGuestStatus(ctx, resp)
  521. //hostutils.TaskComplete(ctx, jsonutils.Marshal(resp))
  522. ctrStatus, _ := s.GetUploadStatus(ctx, "")
  523. return ctrStatus
  524. }
  525. func (s *sPodGuestInstance) HandleGuestStart(ctx context.Context, userCred mcclient.TokenCredential, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  526. hostutils.DelayTaskWithWorker(ctx, func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
  527. resp, err := s.startPod(ctx, userCred)
  528. if err != nil {
  529. return nil, errors.Wrap(err, "startPod")
  530. }
  531. resJ := jsonutils.Marshal(resp)
  532. res := resJ.(*jsonutils.JSONDict)
  533. if !s.manager.host.IsSchedulerNumaAllocateEnabled() {
  534. res.Set("cpu_numa_pin", jsonutils.Marshal(s.Desc.CpuNumaPin))
  535. }
  536. return res, nil
  537. }, nil, s.manager.GuestStartWorker)
  538. return nil, nil
  539. }
  540. func (s *sPodGuestInstance) HandleStop(ctx context.Context, timeout int64) error {
  541. hostutils.DelayTask(ctx, func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
  542. err := s.stopPod(ctx, timeout)
  543. if err != nil {
  544. return nil, errors.Wrap(err, "stopPod")
  545. }
  546. return nil, nil
  547. }, nil)
  548. return nil
  549. }
  550. func (s *sPodGuestInstance) getCreateParams() (jsonutils.JSONObject, error) {
  551. createParamsStr, ok := s.GetDesc().Metadata[computeapi.VM_METADATA_CREATE_PARAMS]
  552. if !ok {
  553. return nil, errors.Errorf("not found %s in metadata", computeapi.VM_METADATA_CREATE_PARAMS)
  554. }
  555. return jsonutils.ParseString(createParamsStr)
  556. }
  557. func (s *sPodGuestInstance) getPostStopCleanupConfig() (*computeapi.PodPostStopCleanupConfig, error) {
  558. data, ok := s.GetDesc().Metadata[computeapi.POD_METADATA_POST_STOP_CLEANUP_CONFIG]
  559. if !ok {
  560. return nil, nil
  561. }
  562. jsonData, err := jsonutils.ParseString(data)
  563. if err != nil {
  564. return nil, errors.Wrapf(err, "parse to json")
  565. }
  566. input := new(computeapi.PodPostStopCleanupConfig)
  567. if err := jsonData.Unmarshal(input); err != nil {
  568. return nil, errors.Wrapf(err, "unmarshal to pod post stop cleanup config")
  569. }
  570. return input, nil
  571. }
  572. func (s *sPodGuestInstance) getPodCreateParams() (*computeapi.PodCreateInput, error) {
  573. createParams, err := s.getCreateParams()
  574. if err != nil {
  575. return nil, errors.Wrapf(err, "getCreateParams")
  576. }
  577. input := new(computeapi.PodCreateInput)
  578. if err := createParams.Unmarshal(input, "pod"); err != nil {
  579. return nil, errors.Wrapf(err, "unmarshal to pod creation input")
  580. }
  581. return input, nil
  582. }
  583. func (s *sPodGuestInstance) getPodLogDir() string {
  584. return filepath.Join(s.HomeDir(), "logs")
  585. }
  586. func (s *sPodGuestInstance) GetPodLogDir() string {
  587. return s.getPodLogDir()
  588. }
  589. func (s *sPodGuestInstance) getContainerLogPath(ctrId string) string {
  590. return filepath.Join(fmt.Sprintf("%s.log", ctrId))
  591. }
  592. func (s *sPodGuestInstance) GetContainerLogPath(ctrId string) string {
  593. return s.getContainerLogPath(ctrId)
  594. }
  595. func (s *sPodGuestInstance) ListContainerCriIds() map[string]string {
  596. out := make(map[string]string, len(s.containers))
  597. for ctrId, c := range s.containers {
  598. out[ctrId] = c.CRIId
  599. }
  600. return out
  601. }
  602. func (s *sPodGuestInstance) getShmDir() string {
  603. return filepath.Join(s.HomeDir(), "shm")
  604. }
  605. func (s *sPodGuestInstance) getContainerShmDir(containerName string) string {
  606. return filepath.Join(s.getShmDir(), fmt.Sprintf("%s-shm", containerName))
  607. }
  608. func (s *sPodGuestInstance) GetDisks() []*desc.SGuestDisk {
  609. return s.GetDesc().Disks
  610. }
  611. func (s *sPodGuestInstance) ConvertRootFsToVolumeMount(rootFs *hostapi.ContainerRootfs) *hostapi.ContainerVolumeMount {
  612. return &hostapi.ContainerVolumeMount{
  613. Type: rootFs.Type,
  614. Disk: rootFs.Disk,
  615. }
  616. }
  617. func (s *sPodGuestInstance) mountRootFs(ctrId string, rootFs *hostapi.ContainerRootfs) error {
  618. drv := volume_mount.GetDriver(rootFs.Type)
  619. vol := s.ConvertRootFsToVolumeMount(rootFs)
  620. if err := drv.Mount(s, ctrId, vol); err != nil {
  621. return errors.Wrapf(err, "mount root fs %s, ctrId %s", jsonutils.Marshal(rootFs), ctrId)
  622. }
  623. return nil
  624. }
  625. func (s *sPodGuestInstance) clearContainerRootFs(ctrId string, rootFs *hostapi.ContainerRootfs) error {
  626. if rootFs == nil {
  627. return nil
  628. }
  629. drv := volume_mount.GetDriver(rootFs.Type)
  630. vol := s.ConvertRootFsToVolumeMount(rootFs)
  631. hostPath, err := drv.GetRuntimeMountHostPath(s, ctrId, vol)
  632. if err != nil {
  633. return errors.Wrapf(err, "get runtime mount path")
  634. }
  635. if !rootFs.Persistent {
  636. if err := volume_mount.RemoveDir(hostPath); err != nil {
  637. return errors.Wrapf(err, "remove %q", hostPath)
  638. }
  639. log.Infof("clear rootfs of container %s/%s: %q", s.GetName(), ctrId, hostPath)
  640. }
  641. return nil
  642. }
  643. func (s *sPodGuestInstance) umountRootFs(ctrId string, rootFs *hostapi.ContainerRootfs) error {
  644. if rootFs == nil {
  645. return nil
  646. }
  647. drv := volume_mount.GetDriver(rootFs.Type).(volume_mount.IConnectedVolumeMount)
  648. vol := s.ConvertRootFsToVolumeMount(rootFs)
  649. if err := s.clearContainerRootFs(ctrId, rootFs); err != nil {
  650. return errors.Wrapf(err, "clear rootfs of container %s", ctrId)
  651. }
  652. if err := drv.UnmountWithoutDisconnect(s, ctrId, vol); err != nil {
  653. return errors.Wrapf(err, "unmount root fs %s, ctrId %s", jsonutils.Marshal(rootFs), ctrId)
  654. }
  655. return nil
  656. }
  657. func (s *sPodGuestInstance) GetRootFsMountPath(ctrId string) (string, error) {
  658. ctr := s.GetContainerById(ctrId)
  659. if ctr == nil {
  660. return "", errors.Wrapf(httperrors.ErrNotFound, "not found container %s", ctrId)
  661. }
  662. rootFs := ctr.Spec.Rootfs
  663. if rootFs == nil {
  664. return "", errors.Wrapf(httperrors.ErrNotFound, "not found root fs %s", ctrId)
  665. }
  666. vol := s.ConvertRootFsToVolumeMount(rootFs)
  667. drv := volume_mount.GetDriver(vol.Type)
  668. hostPath, err := drv.GetRuntimeMountHostPath(s, ctrId, vol)
  669. if err != nil {
  670. return "", errors.Wrapf(err, "get runtime mount host path %s, ctrId %s", jsonutils.Marshal(vol), ctrId)
  671. }
  672. return hostPath, nil
  673. }
  674. func (s *sPodGuestInstance) mountPodVolumes() error {
  675. for _, ctr := range s.GetDesc().Containers {
  676. if ctr.Spec.Rootfs == nil {
  677. continue
  678. }
  679. if err := s.mountRootFs(ctr.Id, ctr.Spec.Rootfs); err != nil {
  680. return errors.Wrapf(err, "mount root fs %s, ctrId %s", jsonutils.Marshal(ctr.Spec.Rootfs), ctr.Id)
  681. }
  682. }
  683. for ctrId, vols := range s.getContainerVolumeMounts() {
  684. for _, vol := range vols {
  685. drv := volume_mount.GetDriver(vol.Type)
  686. if err := drv.Mount(s, ctrId, vol); err != nil {
  687. return errors.Wrapf(err, "mount volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId)
  688. }
  689. if vol.FsUser != nil || vol.FsGroup != nil {
  690. // change mountpoint owner
  691. if err := volume_mount.ChangeDirOwner(s, drv, ctrId, vol); err != nil {
  692. return errors.Wrapf(err, "change dir owner")
  693. }
  694. }
  695. }
  696. }
  697. return nil
  698. }
  699. func (s *sPodGuestInstance) umountPodVolumes() error {
  700. disConnectFuncs := make([]func() error, 0)
  701. for ctrId, vols := range s.getContainerVolumeMounts() {
  702. for _, vol := range vols {
  703. drv := volume_mount.GetDriver(vol.Type)
  704. connectedDrv, ok := drv.(volume_mount.IConnectedVolumeMount)
  705. if !ok {
  706. if err := drv.Unmount(s, ctrId, vol); err != nil {
  707. return errors.Wrapf(err, "Unmount volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId)
  708. }
  709. } else {
  710. if err := connectedDrv.UnmountWithoutDisconnect(s, ctrId, vol); err != nil {
  711. return errors.Wrapf(err, "UnmountWithoutDisconnect volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId)
  712. }
  713. tmpCtrId := ctrId
  714. tmpVol := vol
  715. disConnectFuncs = append(disConnectFuncs, func() error {
  716. if err := connectedDrv.Disconnect(s, tmpCtrId, tmpVol); err != nil {
  717. return errors.Wrapf(err, "Disconnect volume %s, ctrId %s", jsonutils.Marshal(tmpVol), tmpCtrId)
  718. }
  719. return nil
  720. })
  721. }
  722. }
  723. }
  724. for _, ctr := range s.GetDesc().Containers {
  725. if ctr.Spec.Rootfs == nil {
  726. continue
  727. }
  728. if err := s.umountRootFs(ctr.Id, ctr.Spec.Rootfs); err != nil {
  729. return errors.Wrapf(err, "umount root fs %s, ctrId %s", jsonutils.Marshal(ctr.Spec.Rootfs), ctr.Id)
  730. }
  731. }
  732. for _, disConnectFunc := range disConnectFuncs {
  733. if err := disConnectFunc(); err != nil {
  734. return errors.Wrapf(err, "disconnect volume")
  735. }
  736. }
  737. return nil
  738. }
  739. func (s *sPodGuestInstance) GetContainers() []*hostapi.ContainerDesc {
  740. return s.GetDesc().Containers
  741. }
  742. func (s *sPodGuestInstance) GetPodContainerCriIds() []string {
  743. criids := make([]string, 0)
  744. for i := range s.containers {
  745. criids = append(criids, s.containers[i].CRIId)
  746. }
  747. return criids
  748. }
  749. func (s *sPodGuestInstance) HasContainerNvidiaGpu() bool {
  750. for i := range s.Desc.IsolatedDevices {
  751. if utils.IsInStringArray(s.Desc.IsolatedDevices[i].DevType, computeapi.NVIDIA_GPU_TYPES) {
  752. return true
  753. }
  754. }
  755. return false
  756. }
  757. func (s *sPodGuestInstance) GetContainerById(ctrId string) *hostapi.ContainerDesc {
  758. ctrs := s.GetContainers()
  759. for i := range ctrs {
  760. ctr := ctrs[i]
  761. if ctr.Id == ctrId {
  762. return ctr
  763. }
  764. }
  765. return nil
  766. }
  767. func (s *sPodGuestInstance) SaveContainerDesc(ctr *hostapi.ContainerDesc) error {
  768. ctrs := s.GetContainers()
  769. for i := range ctrs {
  770. tmp := ctrs[i]
  771. if tmp.Id == ctr.Id {
  772. ctrs[i] = ctr
  773. }
  774. }
  775. s.GetDesc().Containers = ctrs
  776. return SaveDesc(s, s.GetDesc())
  777. }
  778. func (s *sPodGuestInstance) getContainerVolumeMounts() map[string][]*hostapi.ContainerVolumeMount {
  779. result := make(map[string][]*hostapi.ContainerVolumeMount, 0)
  780. for _, ctr := range s.GetDesc().Containers {
  781. mnts, ok := result[ctr.Id]
  782. if !ok {
  783. mnts = make([]*hostapi.ContainerVolumeMount, 0)
  784. }
  785. for _, vol := range ctr.Spec.VolumeMounts {
  786. tmp := vol
  787. mnts = append(mnts, tmp)
  788. }
  789. result[ctr.Id] = mnts
  790. }
  791. return result
  792. }
  793. func (s *sPodGuestInstance) getContainerVolumeMountsByDiskId(ctrId, diskId string) []*hostapi.ContainerVolumeMount {
  794. ctrVols := s.getContainerVolumeMounts()
  795. vols, ok := ctrVols[ctrId]
  796. if !ok {
  797. return nil
  798. }
  799. volList := make([]*hostapi.ContainerVolumeMount, 0)
  800. for _, vol := range vols {
  801. if vol.Disk != nil {
  802. if vol.Disk.Id == diskId {
  803. tmpVol := vol
  804. volList = append(volList, tmpVol)
  805. }
  806. }
  807. }
  808. return volList
  809. }
  810. func (s *sPodGuestInstance) GetVolumesDir() string {
  811. return filepath.Join(s.HomeDir(), "volumes")
  812. }
  813. func (s *sPodGuestInstance) GetVolumesOverlayDir() string {
  814. return filepath.Join(s.GetVolumesDir(), "_overlay_")
  815. }
  816. func (s *sPodGuestInstance) GetDiskMountPoint(disk storageman.IDisk) string {
  817. return s.GetDiskMountPointById(disk.GetId())
  818. }
  819. func (s *sPodGuestInstance) GetDiskMountPointById(diskId string) string {
  820. return filepath.Join(s.GetVolumesDir(), diskId)
  821. }
  822. func (s *sPodGuestInstance) getPodPrivilegedMode(input *computeapi.PodCreateInput) bool {
  823. for _, ctr := range input.Containers {
  824. if ctr.Privileged {
  825. return true
  826. }
  827. }
  828. return false
  829. }
  830. func (s *sPodGuestInstance) getOtherPods() []*sPodGuestInstance {
  831. man := s.manager
  832. otherPods := make([]*sPodGuestInstance, 0)
  833. man.Servers.Range(func(id, value any) bool {
  834. if id == s.Id {
  835. return true
  836. }
  837. ins := value.(GuestRuntimeInstance)
  838. pod, ok := ins.(*sPodGuestInstance)
  839. if !ok {
  840. return true
  841. }
  842. otherPods = append(otherPods, pod)
  843. return true
  844. })
  845. return otherPods
  846. }
  847. func PodCgroupParent() string {
  848. return "/cloudpods"
  849. }
  850. type localDirtyPodStartTask struct {
  851. ctx context.Context
  852. userCred mcclient.TokenCredential
  853. pod *sPodGuestInstance
  854. }
  855. func newLocalDirtyPodStartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *localDirtyPodStartTask {
  856. return &localDirtyPodStartTask{
  857. ctx: ctx,
  858. userCred: userCred,
  859. pod: pod,
  860. }
  861. }
  862. func (t *localDirtyPodStartTask) Run() {
  863. /*if t.pod.isPodDirtyShutdown() {
  864. log.Infof("start dirty pod locally (%s/%s)", t.pod.Id, t.pod.GetName())
  865. if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil {
  866. log.Errorf("start dirty pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error())
  867. }
  868. }*/
  869. for _, ctr := range t.pod.GetContainers() {
  870. if t.pod.isContainerDirtyShutdown(ctr.Id) {
  871. if !t.pod.IsRunning() {
  872. log.Infof("start dirty pod locally (%s/%s)", t.pod.Id, t.pod.GetName())
  873. if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil {
  874. log.Errorf("start dirty pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error())
  875. }
  876. }
  877. log.Infof("start dirty container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name)
  878. if _, err := t.pod.StartLocalContainer(t.ctx, t.userCred, ctr.Id); err != nil {
  879. log.Errorf("start dirty container %s err: %s", ctr.Id, err.Error())
  880. }
  881. }
  882. }
  883. t.pod.SyncStatus("sync status after dirty pod start locally", "")
  884. }
  885. func (t *localDirtyPodStartTask) Dump() string {
  886. return fmt.Sprintf("pod start task %s/%s", t.pod.GetId(), t.pod.GetName())
  887. }
  888. func (s *sPodGuestInstance) StartLocalDirtyPod(ctx context.Context, userCred mcclient.TokenCredential) error {
  889. s.manager.GuestStartWorker.Run(newLocalDirtyPodStartTask(ctx, userCred, s), nil, nil)
  890. return nil
  891. }
  892. func (s *sPodGuestInstance) ShouldRestartPodOnCrash() bool {
  893. if len(s.GetContainers()) <= 1 {
  894. return true
  895. }
  896. return false
  897. }
  898. func (s *sPodGuestInstance) IsPrimaryContainer(ctrId string) bool {
  899. ctr := s.GetContainerById(ctrId)
  900. if ctr == nil {
  901. return false
  902. }
  903. if len(s.containers) == 1 {
  904. return true
  905. }
  906. return ctr.Spec.Primary
  907. }
  908. func (s *sPodGuestInstance) startPod(ctx context.Context, userCred mcclient.TokenCredential) (*computeapi.PodStartResponse, error) {
  909. s.startPodLock.Lock()
  910. defer s.startPodLock.Unlock()
  911. retries := 3
  912. sec := 5 * time.Second
  913. var err error
  914. var resp *computeapi.PodStartResponse
  915. for i := 1; i <= retries; i++ {
  916. resp, err = s._startPod(ctx, userCred)
  917. if err == nil {
  918. return resp, nil
  919. }
  920. log.Errorf("start pod %s/%s error with %d times: %v", s.GetId(), s.GetName(), i, err)
  921. time.Sleep(sec * time.Duration(i))
  922. }
  923. return resp, err
  924. }
  925. func (s *sPodGuestInstance) namespacesForPod(input *computeapi.PodCreateInput) *runtimeapi.NamespaceOption {
  926. opt := &runtimeapi.NamespaceOption{
  927. Ipc: runtimeapi.NamespaceMode_POD,
  928. Network: runtimeapi.NamespaceMode_POD,
  929. Pid: runtimeapi.NamespaceMode_CONTAINER,
  930. }
  931. if input.HostIPC {
  932. opt.Ipc = runtimeapi.NamespaceMode_NODE
  933. }
  934. return opt
  935. }
  936. func (s *sPodGuestInstance) updateGuestDesc() error {
  937. s.Desc = new(desc.SGuestDesc)
  938. err := jsonutils.Marshal(s.SourceDesc).Unmarshal(s.Desc)
  939. if err != nil {
  940. return errors.Wrap(err, "unmarshal source desc")
  941. }
  942. return s.allocateCpuNumaPin()
  943. }
  944. func (s *sPodGuestInstance) _startPod(ctx context.Context, userCred mcclient.TokenCredential) (*computeapi.PodStartResponse, error) {
  945. podInput, err := s.getPodCreateParams()
  946. if err != nil {
  947. return nil, errors.Wrap(err, "getPodCreateParams")
  948. }
  949. if err := s.updateGuestDesc(); err != nil {
  950. return nil, errors.Wrap(err, "updateGuestDesc")
  951. }
  952. if err := s.mountPodVolumes(); err != nil {
  953. return nil, errors.Wrap(err, "mountPodVolumes")
  954. }
  955. if err := s.ensurePodRemoved(ctx, 0); err != nil {
  956. log.Warningf("ensure pod removed before starting %s: %v", s.GetId(), err)
  957. }
  958. podCfg := &runtimeapi.PodSandboxConfig{
  959. Metadata: &runtimeapi.PodSandboxMetadata{
  960. Name: s.GetDesc().Name,
  961. Uid: s.GetId(),
  962. Namespace: s.GetDesc().TenantId,
  963. Attempt: 1,
  964. },
  965. Hostname: s.GetDesc().Hostname,
  966. LogDirectory: s.getPodLogDir(),
  967. DnsConfig: nil,
  968. PortMappings: nil,
  969. Labels: map[string]string{
  970. runtime.PodUIDLabel: s.GetId(),
  971. },
  972. Annotations: nil,
  973. Linux: &runtimeapi.LinuxPodSandboxConfig{
  974. CgroupParent: PodCgroupParent(),
  975. SecurityContext: &runtimeapi.LinuxSandboxSecurityContext{
  976. NamespaceOptions: s.namespacesForPod(podInput),
  977. SelinuxOptions: nil,
  978. RunAsUser: nil,
  979. RunAsGroup: nil,
  980. ReadonlyRootfs: false,
  981. SupplementalGroups: nil,
  982. Privileged: s.getPodPrivilegedMode(podInput),
  983. Seccomp: &runtimeapi.SecurityProfile{
  984. ProfileType: runtimeapi.SecurityProfile_Unconfined,
  985. },
  986. Apparmor: &runtimeapi.SecurityProfile{
  987. ProfileType: runtimeapi.SecurityProfile_Unconfined,
  988. },
  989. SeccompProfilePath: "",
  990. },
  991. Sysctls: nil,
  992. },
  993. Windows: nil,
  994. }
  995. // inject pod security context
  996. podSec := podInput.SecurityContext
  997. if podSec != nil {
  998. /*podCfg.Linux.Sysctls = map[string]string{
  999. "net.ipv4.ip_unprivileged_port_start": "80",
  1000. }*/
  1001. if podSec.RunAsUser != nil {
  1002. podCfg.Linux.SecurityContext.RunAsUser = &runtimeapi.Int64Value{
  1003. Value: *podSec.RunAsUser,
  1004. }
  1005. }
  1006. if podSec.RunAsGroup != nil {
  1007. podCfg.Linux.SecurityContext.RunAsGroup = &runtimeapi.Int64Value{
  1008. Value: *podSec.RunAsGroup,
  1009. }
  1010. }
  1011. }
  1012. if options.HostOptions.EnableContainerCniPortmap {
  1013. metaPms, err := s.GetPortMappings()
  1014. if err != nil {
  1015. return nil, errors.Wrap(err, "GetPortMappings")
  1016. }
  1017. if len(metaPms) != 0 {
  1018. pms := make([]*runtimeapi.PortMapping, len(metaPms))
  1019. for idx := range metaPms {
  1020. pm := metaPms[idx]
  1021. proto := runtimeapi.Protocol_TCP
  1022. switch pm.Protocol {
  1023. case computeapi.PodPortMappingProtocolTCP:
  1024. proto = runtimeapi.Protocol_TCP
  1025. case computeapi.PodPortMappingProtocolUDP:
  1026. proto = runtimeapi.Protocol_UDP
  1027. }
  1028. pms[idx] = &runtimeapi.PortMapping{
  1029. Protocol: proto,
  1030. ContainerPort: int32(pm.Port),
  1031. HostPort: int32(*pm.HostPort),
  1032. HostIp: pm.HostIp,
  1033. }
  1034. }
  1035. podCfg.PortMappings = pms
  1036. }
  1037. }
  1038. criId, err := s.getCRI().RunPod(ctx, podCfg, "")
  1039. if err != nil {
  1040. return nil, errors.Wrap(err, "cri.RunPod")
  1041. }
  1042. if err := s.setCRIInfo(ctx, userCred, criId, podCfg); err != nil {
  1043. return nil, errors.Wrap(err, "setCRIId")
  1044. }
  1045. // set pod cgroup resources
  1046. if err := s.setPodCgroupResources(criId, s.GetDesc().Mem, s.GetDesc().Cpu); err != nil {
  1047. return nil, errors.Wrapf(err, "set pod %s cgroup memMB %d, cpu %d", criId, s.GetDesc().Mem, s.GetDesc().Cpu)
  1048. }
  1049. s.getProbeManager().AddPod(s)
  1050. if err := s.startStat.CreatePodFile(); err != nil {
  1051. return nil, errors.Wrap(err, "startStat.CreatePodFile")
  1052. }
  1053. return &computeapi.PodStartResponse{
  1054. CRIId: criId,
  1055. IsRunning: false,
  1056. }, nil
  1057. }
  1058. func (s *sPodGuestInstance) setPodCgroupResources(criId string, memMB int64, cpuCnt int64) error {
  1059. if err := s.getCGUtil().SetMemoryLimitBytes(criId, memMB*1024*1024); err != nil {
  1060. return errors.Wrap(err, "set cgroup memory limit")
  1061. }
  1062. if err := s.getCGUtil().SetCPUCfs(criId, cpuCnt*s.getDefaultCPUPeriod(), s.getDefaultCPUPeriod()); err != nil {
  1063. return errors.Wrap(err, "set cgroup cfs")
  1064. }
  1065. return nil
  1066. }
  1067. func (s *sPodGuestInstance) ensurePodRemoved(ctx context.Context, timeout int64) error {
  1068. if timeout == 0 {
  1069. timeout = 15
  1070. }
  1071. ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
  1072. defer cancel()
  1073. /*if err := s.getCRI().StopPod(ctx, &runtimeapi.StopPodSandboxRequest{
  1074. PodSandboxId: s.GetCRIId(),
  1075. }); err != nil {
  1076. return errors.Wrapf(err, "stop cri pod: %s", s.GetCRIId())
  1077. }*/
  1078. criId := s.GetCRIId()
  1079. if criId != "" {
  1080. if err := s.getCRI().RemovePod(ctx, s.GetCRIId()); err != nil {
  1081. return errors.Wrapf(err, "remove cri pod: %s", s.GetCRIId())
  1082. }
  1083. }
  1084. p, _ := s.getPod(ctx)
  1085. if p != nil {
  1086. if err := s.getCRI().RemovePod(ctx, p.GetId()); err != nil {
  1087. return errors.Wrapf(err, "remove cri pod: %s", p.GetId())
  1088. }
  1089. }
  1090. s.getProbeManager().RemovePod(s)
  1091. if err := s.startStat.RemovePodFile(); err != nil {
  1092. return errors.Wrap(err, "startStat.RemovePodFile")
  1093. }
  1094. return nil
  1095. }
  1096. func (s *sPodGuestInstance) stopPod(ctx context.Context, timeout int64) error {
  1097. if err := s.umountPodVolumes(); err != nil {
  1098. return errors.Wrapf(err, "umount pod volumes")
  1099. }
  1100. if timeout == 0 {
  1101. timeout = 15
  1102. }
  1103. if err := s.ensurePodRemoved(ctx, timeout); err != nil {
  1104. return err
  1105. }
  1106. ReleaseGuestCpuset(s.manager, s)
  1107. if err := s.postStopCleanup(ctx); err != nil {
  1108. return errors.Wrapf(err, "post stop cleanup")
  1109. }
  1110. return nil
  1111. }
  1112. func (s *sPodGuestInstance) postStopCleanup(ctx context.Context) error {
  1113. config, err := s.getPostStopCleanupConfig()
  1114. if err != nil {
  1115. return errors.Wrapf(err, "get post stop cleanup config")
  1116. }
  1117. if config == nil {
  1118. return nil
  1119. }
  1120. for _, dir := range config.Dirs {
  1121. if err := os.RemoveAll(dir); err != nil {
  1122. return errors.Wrapf(err, "remove dir %s", dir)
  1123. }
  1124. }
  1125. return nil
  1126. }
  1127. func (s *sPodGuestInstance) LoadDesc() error {
  1128. if err := LoadDesc(s); err != nil {
  1129. return errors.Wrap(err, "LoadDesc")
  1130. }
  1131. if err := s.loadContainers(); err != nil {
  1132. return errors.Wrap(err, "loadContainers")
  1133. }
  1134. return nil
  1135. }
  1136. func (s *sPodGuestInstance) loadContainers() error {
  1137. s.containers = make(map[string]*sContainer)
  1138. ctrFile := s.getContainersFilePath()
  1139. if !fileutils2.Exists(ctrFile) {
  1140. log.Warningf("pod %s containers file %s doesn't exist", s.Id, ctrFile)
  1141. return nil
  1142. }
  1143. ctrStr, err := ioutil.ReadFile(ctrFile)
  1144. if err != nil {
  1145. return errors.Wrapf(err, "read %s", ctrFile)
  1146. }
  1147. obj, err := jsonutils.Parse(ctrStr)
  1148. if err != nil {
  1149. return errors.Wrapf(err, "jsonutils.Parse %s", ctrStr)
  1150. }
  1151. ctrs := make(map[string]*sContainer)
  1152. if err := obj.Unmarshal(ctrs); err != nil {
  1153. return errors.Wrapf(err, "unmarshal %s to container map", obj.String())
  1154. }
  1155. s.containers = ctrs
  1156. return nil
  1157. }
  1158. func (s *sPodGuestInstance) PostLoad(m *SGuestManager) error {
  1159. return LoadGuestCpuset(m, s)
  1160. }
  1161. func (s *sPodGuestInstance) SyncConfig(ctx context.Context, guestDesc *desc.SGuestDesc, fwOnly, setUefiBootOrder bool) (jsonutils.JSONObject, error) {
  1162. if err := SaveDesc(s, guestDesc); err != nil {
  1163. return nil, errors.Wrap(err, "SaveDesc")
  1164. }
  1165. // update guest live desc, don't be here update cpu and mem
  1166. // cpu and memory should update from SGuestHotplugCpuMemTask
  1167. s.UpdateLiveDesc(guestDesc)
  1168. // keep origin cpu numa pin
  1169. cpuNumaPin := s.Desc.CpuNumaPin
  1170. s.Desc.SGuestHardwareDesc = guestDesc.SGuestHardwareDesc
  1171. s.Desc.SGuestContainerDesc = guestDesc.SGuestContainerDesc
  1172. s.Desc.CpuNumaPin = cpuNumaPin
  1173. if err := SaveLiveDesc(s, s.Desc); err != nil {
  1174. return nil, errors.Wrap(err, "SaveLiveDesc")
  1175. }
  1176. return nil, nil
  1177. }
  1178. func (s *sPodGuestInstance) getContainerMeta(id string) *sContainer {
  1179. return s.containers[id]
  1180. }
  1181. func (s *sPodGuestInstance) getContainerCRIId(ctrId string) (string, error) {
  1182. ctr := s.getContainerMeta(ctrId)
  1183. if ctr == nil {
  1184. return "", errors.Wrapf(errors.ErrNotFound, "Not found container %s", ctrId)
  1185. }
  1186. return ctr.CRIId, nil
  1187. }
  1188. func (s *sPodGuestInstance) GetContainerByCRIId(criId string) (*hostapi.ContainerDesc, error) {
  1189. for _, ctr := range s.containers {
  1190. if ctr.CRIId == criId {
  1191. desc := s.GetContainerById(ctr.Id)
  1192. if desc == nil {
  1193. return nil, errors.Wrapf(errors.ErrNotFound, "Not found container desc by CRIId %s", criId)
  1194. }
  1195. return desc, nil
  1196. }
  1197. }
  1198. return nil, errors.Wrapf(errors.ErrNotFound, "Not found container by CRIId %s", criId)
  1199. }
  1200. func (s *sPodGuestInstance) StartLocalContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) {
  1201. ctr := s.GetContainerById(ctrId)
  1202. if ctr == nil {
  1203. return nil, errors.Wrapf(errors.ErrNotFound, "Not found container %s", ctrId)
  1204. }
  1205. input := &hostapi.ContainerCreateInput{
  1206. Name: ctr.Name,
  1207. GuestId: s.GetId(),
  1208. Spec: ctr.Spec,
  1209. RestartCount: ctr.RestartCount + 1,
  1210. }
  1211. ret, err := s.StartContainer(ctx, userCred, ctrId, input)
  1212. if err != nil {
  1213. return nil, errors.Wrap(err, "start container")
  1214. }
  1215. return ret, nil
  1216. }
  1217. func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) {
  1218. _, hasCtr := s.containers[ctrId]
  1219. needRecreate := false
  1220. if hasCtr {
  1221. status, _, err := s.getContainerStatus(ctx, ctrId)
  1222. if err != nil {
  1223. if IsContainerNotFoundError(err) {
  1224. needRecreate = true
  1225. } else {
  1226. return nil, errors.Wrap(err, "get container status")
  1227. }
  1228. } else {
  1229. if computeapi.ContainerExitedStatus.Has(status) {
  1230. needRecreate = true
  1231. } else if status != computeapi.CONTAINER_STATUS_CREATED {
  1232. return nil, errors.Errorf("can't start container when status is %s", status)
  1233. }
  1234. }
  1235. }
  1236. if err := s.clearContainerRootFs(ctrId, input.Spec.Rootfs); err != nil {
  1237. return nil, errors.Wrapf(err, "clear container rootfs %s before starting", jsonutils.Marshal(input.Spec.Rootfs))
  1238. }
  1239. if !hasCtr || needRecreate {
  1240. log.Infof("recreate container %s before starting. hasCtr: %v, needRecreate: %v", ctrId, hasCtr, needRecreate)
  1241. // delete and recreate the container before starting
  1242. if hasCtr {
  1243. if _, err := s.DeleteContainer(ctx, userCred, ctrId); err != nil {
  1244. return nil, errors.Wrap(err, "delete container before starting")
  1245. }
  1246. }
  1247. if _, err := s.CreateContainer(ctx, userCred, ctrId, input); err != nil {
  1248. return nil, errors.Wrap(err, "recreate container before starting")
  1249. }
  1250. }
  1251. criId, err := s.getContainerCRIId(ctrId)
  1252. if err != nil {
  1253. return nil, errors.Wrap(err, "get container cri id")
  1254. }
  1255. if err := s.expectedStatus.SetContainerStatus(criId, ctrId, computeapi.CONTAINER_STATUS_RUNNING); err != nil {
  1256. log.Warningf("set container %s(%s) expected status to running: %v", criId, ctrId, err)
  1257. }
  1258. if err := s.getCRI().StartContainer(ctx, criId); err != nil {
  1259. return nil, errors.Wrap(err, "CRI.StartContainer")
  1260. }
  1261. // 如果是 cgroup v2,设备规则已经通过 containerd API 在 CreateContainer 时设置,跳过 eBPF 方式
  1262. // 如果是 cgroup v1,继续使用原有的 eBPF 方式
  1263. isV2, err := podutil.DetectCgroupVersion()
  1264. if err != nil {
  1265. log.Warningf("[StartContainer] Failed to detect cgroup version: %v, using eBPF device allow", err)
  1266. isV2 = false
  1267. }
  1268. if !isV2 {
  1269. // cgroup v1 场景,使用原有的 eBPF 方式
  1270. if err := s.setContainerCgroupDevicesAllow(criId, input.Spec.CgroupDevicesAllow, input.Spec.Devices); err != nil {
  1271. return nil, errors.Wrap(err, "set cgroup devices allow")
  1272. }
  1273. } else {
  1274. log.Debugf("[StartContainer] cgroup v2 detected, skipping eBPF device allow (devices already set via containerd API)")
  1275. }
  1276. if input.Spec.CgroupPidsMax > 0 {
  1277. if err := s.getCGUtil().SetPidsMax(criId, input.Spec.CgroupPidsMax); err != nil {
  1278. return nil, errors.Wrap(err, "set cgroup pids.max")
  1279. }
  1280. }
  1281. if err := s.doContainerStartPostLifecycle(ctx, criId, input); err != nil {
  1282. return nil, errors.Wrap(err, "do container lifecycle")
  1283. }
  1284. if err := s.startStat.CreateContainerFile(ctrId); err != nil {
  1285. return nil, errors.Wrapf(err, "create container startup stat file %s", ctrId)
  1286. }
  1287. if input.Spec.ResourcesLimit != nil {
  1288. if err := s.setContainerResourcesLimit(criId, input.Spec.ResourcesLimit); err != nil {
  1289. return nil, errors.Wrap(err, "set container resources limit")
  1290. }
  1291. }
  1292. return nil, nil
  1293. }
  1294. func (s *sPodGuestInstance) allocateCpuNumaPin() error {
  1295. if len(s.Desc.CpuNumaPin) != 0 || len(s.Desc.VcpuPin) != 0 {
  1296. return nil
  1297. }
  1298. if scpuset, ok := s.Desc.Metadata[computeapi.VM_METADATA_CGROUP_CPUSET]; ok {
  1299. cpusetJson, err := jsonutils.ParseString(scpuset)
  1300. if err != nil {
  1301. log.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err)
  1302. return errors.Errorf("failed parse server %s cpuset %s: %s", s.Id, scpuset, err)
  1303. }
  1304. input := new(computeapi.ServerCPUSetInput)
  1305. err = cpusetJson.Unmarshal(input)
  1306. if err != nil {
  1307. log.Errorf("failed unmarshal server %s cpuset %s", s.Id, err)
  1308. return errors.Errorf("failed unmarshal server %s cpuset %s", s.Id, err)
  1309. }
  1310. cpus := input.CPUS
  1311. s.Desc.VcpuPin = []desc.SCpuPin{
  1312. {
  1313. Vcpus: fmt.Sprintf("0-%d", s.Desc.Cpu-1),
  1314. Pcpus: cpuset.NewCPUSet(cpus...).String(),
  1315. },
  1316. }
  1317. return nil
  1318. }
  1319. var cpus = make([]int, 0)
  1320. var preferNumaNodes = make([]int8, 0)
  1321. for i := range s.Desc.IsolatedDevices {
  1322. if s.Desc.IsolatedDevices[i].NumaNode >= 0 {
  1323. preferNumaNodes = append(preferNumaNodes, s.Desc.IsolatedDevices[i].NumaNode)
  1324. break
  1325. }
  1326. }
  1327. log.Infof("guest %s prefer nodes %v", s.Id, preferNumaNodes)
  1328. nodeNumaCpus, err := s.manager.cpuSet.AllocCpuset(int(s.Desc.Cpu), s.Desc.Mem*1024, preferNumaNodes, s.GetId())
  1329. if err != nil {
  1330. return err
  1331. }
  1332. for _, numaCpus := range nodeNumaCpus {
  1333. cpus = append(cpus, numaCpus.Cpuset...)
  1334. }
  1335. if !s.manager.hostagentNumaAllocate {
  1336. s.Desc.VcpuPin = []desc.SCpuPin{
  1337. {
  1338. Vcpus: fmt.Sprintf("0-%d", s.Desc.Cpu-1),
  1339. Pcpus: cpuset.NewCPUSet(cpus...).String(),
  1340. },
  1341. }
  1342. } else {
  1343. var cpuNumaPin = make([]*desc.SCpuNumaPin, 0)
  1344. for nodeId, numaCpus := range nodeNumaCpus {
  1345. if s.manager.hostagentNumaAllocate {
  1346. unodeId := uint16(nodeId)
  1347. vcpuPin := make([]desc.SVCpuPin, len(numaCpus.Cpuset))
  1348. for i := range numaCpus.Cpuset {
  1349. vcpuPin[i].Pcpu = numaCpus.Cpuset[i]
  1350. if i < int(s.Desc.Cpu) {
  1351. vcpuPin[i].Vcpu = i
  1352. } else {
  1353. vcpuPin[i].Vcpu = -1
  1354. }
  1355. }
  1356. memPin := &desc.SCpuNumaPin{
  1357. SizeMB: numaCpus.MemSizeKB / 1024, // MB
  1358. NodeId: &unodeId,
  1359. VcpuPin: vcpuPin,
  1360. Unregular: numaCpus.Unregular,
  1361. }
  1362. cpuNumaPin = append(cpuNumaPin, memPin)
  1363. }
  1364. }
  1365. s.Desc.CpuNumaPin = cpuNumaPin
  1366. }
  1367. return SaveLiveDesc(s, s.Desc)
  1368. }
  1369. func (s *sPodGuestInstance) doContainerStartPostLifecycle(ctx context.Context, criId string, input *hostapi.ContainerCreateInput) error {
  1370. ls := input.Spec.Lifecyle
  1371. if ls == nil {
  1372. return nil
  1373. }
  1374. if ls.PostStart == nil {
  1375. return nil
  1376. }
  1377. drv := lifecycle.GetDriver(ls.PostStart.Type)
  1378. if err := drv.Run(ctx, ls.PostStart, s.getCRI(), criId); err != nil {
  1379. return errors.Wrapf(err, "run %s", ls.PostStart.Type)
  1380. }
  1381. return nil
  1382. }
  1383. func (s *sPodGuestInstance) StopAll(ctx context.Context) error {
  1384. ctrs := s.GetContainers()
  1385. userCred := hostutils.GetComputeSession(ctx).GetToken()
  1386. errs := []error{}
  1387. for _, ctr := range ctrs {
  1388. if _, err := s.StopContainer(ctx, userCred, ctr.Id, &hostapi.ContainerStopInput{}); err != nil {
  1389. errs = append(errs, errors.Wrapf(err, "failed to stop container %s", ctr.Id))
  1390. }
  1391. }
  1392. err := errors.NewAggregate(errs)
  1393. if err != nil {
  1394. return err
  1395. }
  1396. if err := s.stopPod(ctx, 5); err != nil {
  1397. return errors.Wrapf(err, "stop pod %s", s.GetName())
  1398. }
  1399. return nil
  1400. }
  1401. func (s *sPodGuestInstance) StopContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerStopInput) (jsonutils.JSONObject, error) {
  1402. criId, err := s.getContainerCRIId(ctrId)
  1403. if err != nil {
  1404. if errors.Cause(err) == errors.ErrNotFound {
  1405. // cri id not found, should already stopped
  1406. return nil, nil
  1407. }
  1408. return nil, errors.Wrap(err, "get container cri id")
  1409. }
  1410. var timeout int64 = 0
  1411. s.expectedStatus.SetContainerStatus(criId, ctrId, computeapi.CONTAINER_STATUS_EXITED)
  1412. if input.Timeout != 0 {
  1413. timeout = input.Timeout
  1414. }
  1415. shmSizeMB := input.ShmSizeMB
  1416. if shmSizeMB > 64 {
  1417. name := input.ContainerName
  1418. if name == "" {
  1419. return nil, errors.Wrapf(errors.ErrNotFound, "not found container_name from input: %s", jsonutils.Marshal(input))
  1420. }
  1421. if err := s.unmountDevShm(name); err != nil {
  1422. return nil, errors.Wrapf(err, "unmount shm %s", name)
  1423. }
  1424. }
  1425. if err := s.getCRI().StopContainer(ctx, criId, timeout, true, input.Force); err != nil {
  1426. if !IsContainerNotFoundError(err) {
  1427. return nil, errors.Wrap(err, "CRI.StopContainer")
  1428. } else {
  1429. log.Warningf("CRI.StopContainer %s not found", criId)
  1430. }
  1431. }
  1432. if input.Force {
  1433. // FIXME: Sleep 2s 等待 pleg.PodLifecycleEventGenerator 刷新
  1434. // 后期可以添加主动通知刷新
  1435. time.Sleep(2 * time.Second)
  1436. }
  1437. if err := s.startStat.RemoveContainerFile(ctrId); err != nil {
  1438. return nil, errors.Wrap(err, "startStat.RemoveContainerFile")
  1439. }
  1440. return nil, nil
  1441. }
  1442. func (s *sPodGuestInstance) GetCRIId() string {
  1443. return s.GetSourceDesc().Metadata[computeapi.POD_METADATA_CRI_ID]
  1444. }
  1445. func (s *sPodGuestInstance) convertToPodMetadataPortMappings(pms []*runtimeapi.PortMapping) []*computeapi.PodMetadataPortMapping {
  1446. ret := make([]*computeapi.PodMetadataPortMapping, len(pms))
  1447. for idx := range pms {
  1448. pm := pms[idx]
  1449. var proto computeapi.PodPortMappingProtocol = computeapi.PodPortMappingProtocolTCP
  1450. if pm.Protocol == runtimeapi.Protocol_UDP {
  1451. proto = computeapi.PodPortMappingProtocolUDP
  1452. }
  1453. ret[idx] = &computeapi.PodMetadataPortMapping{
  1454. Protocol: proto,
  1455. ContainerPort: pm.ContainerPort,
  1456. HostPort: pm.HostPort,
  1457. HostIp: pm.HostIp,
  1458. }
  1459. }
  1460. return ret
  1461. }
  1462. func (s *sPodGuestInstance) setPortMappings(ctx context.Context, userCred mcclient.TokenCredential, pms []*computeapi.PodMetadataPortMapping) error {
  1463. pmStr := jsonutils.Marshal(pms).String()
  1464. s.Desc.Metadata[computeapi.POD_METADATA_PORT_MAPPINGS] = pmStr
  1465. session := auth.GetSession(ctx, userCred, options.HostOptions.Region)
  1466. if _, err := computemod.Servers.SetMetadata(session, s.GetId(), jsonutils.Marshal(map[string]string{
  1467. computeapi.POD_METADATA_PORT_MAPPINGS: pmStr,
  1468. })); err != nil {
  1469. return errors.Wrapf(err, "set cri_id of pod %s", s.GetId())
  1470. }
  1471. return SaveDesc(s, s.Desc)
  1472. }
  1473. func (s *sPodGuestInstance) setCRIInfo(ctx context.Context, userCred mcclient.TokenCredential, criId string, cfg *runtimeapi.PodSandboxConfig) error {
  1474. s.Desc.Metadata[computeapi.POD_METADATA_CRI_ID] = criId
  1475. cfgStr := jsonutils.Marshal(cfg).String()
  1476. s.Desc.Metadata[computeapi.POD_METADATA_CRI_CONFIG] = cfgStr
  1477. session := auth.GetSession(ctx, userCred, options.HostOptions.Region)
  1478. if _, err := computemod.Servers.SetMetadata(session, s.GetId(), jsonutils.Marshal(map[string]string{
  1479. computeapi.POD_METADATA_CRI_ID: criId,
  1480. computeapi.POD_METADATA_CRI_CONFIG: cfgStr,
  1481. })); err != nil {
  1482. return errors.Wrapf(err, "set cri_id of pod %s", s.GetId())
  1483. }
  1484. return SaveDesc(s, s.Desc)
  1485. }
  1486. func (s *sPodGuestInstance) setContainerCRIInfo(ctx context.Context, userCred mcclient.TokenCredential, ctrId, criId string) error {
  1487. session := auth.GetSession(ctx, userCred, options.HostOptions.Region)
  1488. if _, err := computemod.Containers.SetMetadata(session, ctrId, jsonutils.Marshal(map[string]string{
  1489. computeapi.CONTAINER_METADATA_CRI_ID: criId,
  1490. })); err != nil {
  1491. return errors.Wrapf(err, "set cri_id of container %s", ctrId)
  1492. }
  1493. return nil
  1494. }
  1495. func (s *sPodGuestInstance) getPodSandboxConfig() (*runtimeapi.PodSandboxConfig, error) {
  1496. cfgStr := s.GetSourceDesc().Metadata[computeapi.POD_METADATA_CRI_CONFIG]
  1497. obj, err := jsonutils.ParseString(cfgStr)
  1498. if err != nil {
  1499. return nil, errors.Wrapf(err, "ParseString to json object: %s", cfgStr)
  1500. }
  1501. podCfg := new(runtimeapi.PodSandboxConfig)
  1502. if err := obj.Unmarshal(podCfg); err != nil {
  1503. return nil, errors.Wrap(err, "Unmarshal to PodSandboxConfig")
  1504. }
  1505. return podCfg, nil
  1506. }
  1507. func (s *sPodGuestInstance) GetPortMappings() (computeapi.GuestPortMappings, error) {
  1508. srcDesc := s.GetSourceDesc()
  1509. nics := srcDesc.Nics
  1510. pms := make([]*computeapi.GuestPortMapping, 0)
  1511. for _, nic := range nics {
  1512. for _, pm := range nic.PortMappings {
  1513. tmpPm := pm
  1514. pms = append(pms, tmpPm)
  1515. }
  1516. }
  1517. return pms, nil
  1518. }
  1519. func (s *sPodGuestInstance) saveContainer(id string, criId string) error {
  1520. s.saveContainerLock.Lock()
  1521. defer s.saveContainerLock.Unlock()
  1522. _, ok := s.containers[id]
  1523. if ok {
  1524. return errors.Errorf("container %s already exists", criId)
  1525. }
  1526. ctr := newContainer(id)
  1527. ctr.CRIId = criId
  1528. s.containers[id] = ctr
  1529. if err := s.saveContainersFile(s.containers); err != nil {
  1530. return errors.Wrap(err, "saveContainersFile")
  1531. }
  1532. return nil
  1533. }
  1534. func (s *sPodGuestInstance) saveContainersFile(containers map[string]*sContainer) error {
  1535. content := jsonutils.Marshal(containers).String()
  1536. if err := fileutils2.FilePutContents(s.getContainersFilePath(), content, false); err != nil {
  1537. return errors.Wrapf(err, "put content %s to containers file", content)
  1538. }
  1539. return nil
  1540. }
  1541. func (s *sPodGuestInstance) getContainersFilePath() string {
  1542. return path.Join(s.HomeDir(), "containers")
  1543. }
  1544. func (s *sPodGuestInstance) CreateContainer(ctx context.Context, userCred mcclient.TokenCredential, id string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) {
  1545. // always pull image for checking
  1546. imgInput := &hostapi.ContainerPullImageInput{
  1547. Image: input.Spec.Image,
  1548. PullPolicy: input.Spec.ImagePullPolicy,
  1549. }
  1550. if input.Spec.ImageCredentialToken != "" {
  1551. tokenJson, err := base64.StdEncoding.DecodeString(input.Spec.ImageCredentialToken)
  1552. if err != nil {
  1553. return nil, errors.Wrapf(err, "base64 decode image credential token %s", input.Spec.ImageCredentialToken)
  1554. }
  1555. authObj, err := jsonutils.Parse(tokenJson)
  1556. if err != nil {
  1557. return nil, errors.Wrapf(err, "parse image credential token %s", input.Spec.ImageCredentialToken)
  1558. }
  1559. imgAuth := new(apis.ContainerPullImageAuthConfig)
  1560. if err := authObj.Unmarshal(imgAuth); err != nil {
  1561. return nil, errors.Wrapf(err, "unmarshal image credential token: %s", authObj)
  1562. }
  1563. imgInput.Auth = imgAuth
  1564. }
  1565. if _, err := s.PullImage(ctx, userCred, id, imgInput); err != nil {
  1566. return nil, errors.Wrapf(err, "pull image %s", input.Spec.Image)
  1567. }
  1568. pod, err := s.getPod(ctx)
  1569. if err != nil {
  1570. return nil, errors.Wrap(err, "get pod")
  1571. }
  1572. ctrCriId, err := s.createContainer(ctx, userCred, pod, id, input)
  1573. if err != nil {
  1574. return nil, errors.Wrap(err, "CRI.CreateContainer")
  1575. }
  1576. // 如果是 cgroup v2,通过 containerd API 更新 container spec 中的 devices
  1577. isV2, err := podutil.DetectCgroupVersion()
  1578. if err != nil {
  1579. log.Warningf("[CreateContainer] Failed to detect cgroup version: %v, skipping containerd device update", err)
  1580. } else if isV2 {
  1581. // 将设备规则转换为 specs.LinuxDeviceCgroup
  1582. specDevices, err := podutil.ConvertDeviceRulesToSpecsDevices(input.Spec.CgroupDevicesAllow)
  1583. if err != nil {
  1584. log.Warningf("[CreateContainer] Failed to convert device rules to specs devices: %v, skipping containerd device update", err)
  1585. } else if len(specDevices) > 0 {
  1586. if err := s.updateContainerDevicesViaContainerd(ctx, ctrCriId, specDevices); err != nil {
  1587. log.Warningf("[CreateContainer] Failed to update container devices via containerd API: %v (container created but devices may not be accessible)", err)
  1588. // 不返回错误,因为容器已经创建成功
  1589. }
  1590. }
  1591. }
  1592. if err := s.setContainerCRIInfo(ctx, userCred, id, ctrCriId); err != nil {
  1593. return nil, errors.Wrap(err, "setContainerCRIInfo")
  1594. }
  1595. return nil, nil
  1596. }
  1597. func (s *sPodGuestInstance) getLxcfsMounts() []*runtimeapi.Mount {
  1598. // lxcfsPath := "/var/lib/lxc/lxcfs"
  1599. lxcfsPath := options.HostOptions.LxcfsPath
  1600. const (
  1601. procCpuinfo = "/proc/cpuinfo"
  1602. procDiskstats = "/proc/diskstats"
  1603. procMeminfo = "/proc/meminfo"
  1604. procStat = "/proc/stat"
  1605. procSwaps = "/proc/swaps"
  1606. procUptime = "/proc/uptime"
  1607. )
  1608. newLxcfsMount := func(fp string) *runtimeapi.Mount {
  1609. return &runtimeapi.Mount{
  1610. ContainerPath: fp,
  1611. HostPath: fmt.Sprintf("%s%s", lxcfsPath, fp),
  1612. }
  1613. }
  1614. return []*runtimeapi.Mount{
  1615. newLxcfsMount(procUptime),
  1616. newLxcfsMount(procMeminfo),
  1617. newLxcfsMount(procStat),
  1618. newLxcfsMount(procCpuinfo),
  1619. newLxcfsMount(procSwaps),
  1620. newLxcfsMount(procDiskstats),
  1621. }
  1622. }
  1623. func (s *sPodGuestInstance) getContainerMounts(ctrId string, input *hostapi.ContainerCreateInput) ([]*runtimeapi.Mount, error) {
  1624. inputMounts := input.Spec.VolumeMounts
  1625. if len(inputMounts) == 0 {
  1626. return make([]*runtimeapi.Mount, 0), nil
  1627. }
  1628. mounts := make([]*runtimeapi.Mount, len(inputMounts))
  1629. for idx, im := range inputMounts {
  1630. mnt := &runtimeapi.Mount{
  1631. ContainerPath: im.MountPath,
  1632. Readonly: im.ReadOnly,
  1633. SelinuxRelabel: im.SelinuxRelabel,
  1634. Propagation: volume_mount.GetRuntimeVolumeMountPropagation(im.Propagation),
  1635. }
  1636. hostPath, err := volume_mount.GetDriver(im.Type).GetRuntimeMountHostPath(s, ctrId, im)
  1637. if err != nil {
  1638. return nil, errors.Wrapf(err, "get runtime host mount path of %s", jsonutils.Marshal(im))
  1639. }
  1640. mnt.HostPath = hostPath
  1641. mounts[idx] = mnt
  1642. }
  1643. return mounts, nil
  1644. }
  1645. func (s *sPodGuestInstance) getCGUtil() podutil.CgroupUtil {
  1646. cgUtil, err := podutil.NewPodCgroupUtil(PodCgroupParent())
  1647. if err != nil {
  1648. // 如果检测失败,记录错误并使用默认的 v1 实现
  1649. log.Errorf("failed to detect cgroup version, fallback to v1: %s", err)
  1650. return podutil.NewPodCgroupV1Util(PodCgroupParent())
  1651. }
  1652. return cgUtil
  1653. }
  1654. // updateContainerDevicesViaContainerd 通过 containerd API 更新 container spec 中的 devices
  1655. // 用于 cgroup v2 场景
  1656. func (s *sPodGuestInstance) updateContainerDevicesViaContainerd(ctx context.Context, criId string, devices []*specs.LinuxDeviceCgroup) error {
  1657. addr, namespace := GetContainerdConnectionInfo()
  1658. // 创建 containerd client
  1659. client, err := containerdutil.NewClient(ctx, addr, namespace)
  1660. if err != nil {
  1661. return errors.Wrap(err, "create containerd client")
  1662. }
  1663. defer client.Close()
  1664. // 更新 container devices
  1665. return containerdutil.UpdateContainerDevices(ctx, client, criId, devices)
  1666. }
  1667. func (s *sPodGuestInstance) setContainerCgroupDevicesAllow(ctrId string, allowStrs []string, devices []*hostapi.ContainerDevice) error {
  1668. // 自动从容器设备配置中提取设备号并添加到允许列表
  1669. allowSet := make(map[string]bool)
  1670. for _, rule := range allowStrs {
  1671. allowSet[rule] = true
  1672. }
  1673. // 遍历容器设备,提取设备路径并生成设备规则
  1674. for _, dev := range devices {
  1675. var devicePath string
  1676. var permissions string = "rwm" // 默认权限
  1677. // 获取设备路径和权限
  1678. if dev.Host != nil && dev.Host.HostPath != "" {
  1679. devicePath = dev.Host.HostPath
  1680. if dev.Permissions != "" {
  1681. permissions = dev.Permissions
  1682. }
  1683. } else if dev.IsolatedDevice != nil {
  1684. // 对于 IsolatedDevice,优先使用 RenderPath,然后是 Path
  1685. if dev.IsolatedDevice.RenderPath != "" {
  1686. devicePath = dev.IsolatedDevice.RenderPath
  1687. } else if dev.IsolatedDevice.Path != "" {
  1688. devicePath = dev.IsolatedDevice.Path
  1689. } else if dev.IsolatedDevice.CardPath != "" {
  1690. devicePath = dev.IsolatedDevice.CardPath
  1691. }
  1692. if dev.Permissions != "" {
  1693. permissions = dev.Permissions
  1694. }
  1695. } else {
  1696. log.Debugf("[setContainerCgroupDevicesAllow] Skipping device %v (no Host or IsolatedDevice path)", dev)
  1697. continue
  1698. }
  1699. if devicePath == "" {
  1700. log.Debugf("[setContainerCgroupDevicesAllow] Skipping device %v (no device path found)", dev)
  1701. continue
  1702. }
  1703. // 从设备路径获取设备号并生成设备规则
  1704. rule, err := podutil.GetDeviceAllowRuleFromPath(devicePath, permissions)
  1705. if err != nil {
  1706. log.Warningf("[setContainerCgroupDevicesAllow] Failed to get device rule from path %s: %v (skipping)", devicePath, err)
  1707. continue
  1708. }
  1709. // 添加到允许列表(去重)
  1710. if !allowSet[rule] {
  1711. allowStrs = append(allowStrs, rule)
  1712. allowSet[rule] = true
  1713. log.Infof("[setContainerCgroupDevicesAllow] Auto-added device rule: %s (from device path: %s)", rule, devicePath)
  1714. }
  1715. }
  1716. return s.getCGUtil().SetDevicesAllow(ctrId, allowStrs)
  1717. }
  1718. func (s *sPodGuestInstance) SetContainerResourceLimit(ctrId string, limit *apis.ContainerResources) (jsonutils.JSONObject, error) {
  1719. criId, err := s.getContainerCRIId(ctrId)
  1720. if err != nil {
  1721. return nil, errors.Wrap(err, "get container cri id")
  1722. }
  1723. return nil, s.setContainerResourcesLimit(criId, limit)
  1724. }
  1725. func (s *sPodGuestInstance) setContainerResourcesLimit(ctrId string, limit *apis.ContainerResources) error {
  1726. cgUtil := s.getCGUtil()
  1727. /*if limit.MemoryLimitMB != nil {
  1728. if err := cgUtil.SetMemoryLimitBytes(ctrId, *limit.MemoryLimitMB*1024*1024); err != nil {
  1729. return errors.Wrapf(err, "set memory limit to %d MB", *limit.MemoryLimitMB)
  1730. }
  1731. }*/
  1732. if limit.CpuCfsQuota != nil {
  1733. cpuCfsQuotaUs := *limit.CpuCfsQuota * float64(s.getDefaultCPUPeriod())
  1734. if err := cgUtil.SetCPUCfs(ctrId, int64(cpuCfsQuotaUs), s.getDefaultCPUPeriod()); err != nil {
  1735. return errors.Wrapf(err, "set cpu cfs quota to %d", int64(cpuCfsQuotaUs))
  1736. }
  1737. }
  1738. if limit.PidsMax != nil {
  1739. if err := cgUtil.SetPidsMax(ctrId, *limit.PidsMax); err != nil {
  1740. return errors.Wrapf(err, "set pids.max to %d", *limit.PidsMax)
  1741. }
  1742. }
  1743. if len(limit.DevicesAllow) != 0 {
  1744. if err := cgUtil.SetDevicesAllow(ctrId, limit.DevicesAllow); err != nil {
  1745. return errors.Wrapf(err, "set devices.allow %v", limit.DevicesAllow)
  1746. }
  1747. }
  1748. if limit.CpusetCloneChildren {
  1749. if err := cgUtil.SetCpusetCloneChildren(ctrId); err != nil {
  1750. return errors.Wrapf(err, "set cpuset clone_children")
  1751. }
  1752. }
  1753. if limit.MemoryHighRatio != nil {
  1754. memHigh := int64(*limit.MemoryHighRatio * float64(s.GetDesc().Mem*1024*1024))
  1755. if err := cgUtil.SetCgroupKeyValue(ctrId, podutil.CgroupControllerMemory, "memory.high", fmt.Sprintf("%d", memHigh)); err != nil {
  1756. return errors.Wrapf(err, "set memory.high to %d", memHigh)
  1757. }
  1758. }
  1759. return nil
  1760. }
  1761. func (s *sPodGuestInstance) getDefaultCPUPeriod() int64 {
  1762. return 100000
  1763. }
  1764. func (s *sPodGuestInstance) createContainer(ctx context.Context, userCred mcclient.TokenCredential, sandbox *runtimeapi.PodSandbox, ctrId string, input *hostapi.ContainerCreateInput) (string, error) {
  1765. podCfg, err := s.getPodSandboxConfig()
  1766. if err != nil {
  1767. return "", errors.Wrap(err, "getPodSandboxConfig")
  1768. }
  1769. spec := input.Spec
  1770. mounts, err := s.getContainerMounts(ctrId, input)
  1771. if err != nil {
  1772. return "", errors.Wrap(err, "get container mounts")
  1773. }
  1774. if spec.SimulateCpu {
  1775. systemCpuMounts, err := s.simulateContainerSystemCpu(ctx, ctrId)
  1776. if err != nil {
  1777. return "", errors.Wrapf(err, "simulate container system cpu")
  1778. }
  1779. newMounts := systemCpuMounts
  1780. newMounts = append(newMounts, mounts...)
  1781. mounts = newMounts
  1782. }
  1783. // process shm size
  1784. if spec.ShmSizeMB > 64 {
  1785. // mount empty dir
  1786. shmPath, err := s.mountDevShm(input, spec.ShmSizeMB)
  1787. if err != nil {
  1788. return "", errors.Wrapf(err, "mount dev shm")
  1789. }
  1790. mounts = append(mounts, &runtimeapi.Mount{
  1791. ContainerPath: "/dev/shm",
  1792. HostPath: shmPath,
  1793. })
  1794. }
  1795. // inject /etc/hosts to hide host storage
  1796. if spec.Rootfs != nil {
  1797. if etcFilesMount, err := s.getEtcFilesMount(ctrId); err != nil {
  1798. return "", errors.Wrapf(err, "get etc hosts mount")
  1799. } else {
  1800. mounts = append(mounts, etcFilesMount...)
  1801. }
  1802. }
  1803. var cpuSetCpus string
  1804. var cpuSetMems string
  1805. var extraCpuCount int
  1806. {
  1807. cpuSets := sets.NewString()
  1808. cpuMemSets := sets.NewString()
  1809. if len(s.Desc.CpuNumaPin) > 0 {
  1810. for _, cpuNumaPin := range s.GetDesc().CpuNumaPin {
  1811. for _, cpuPin := range cpuNumaPin.VcpuPin {
  1812. cpuSets.Insert(fmt.Sprintf("%d", cpuPin.Pcpu))
  1813. }
  1814. if cpuNumaPin.NodeId != nil && cpuNumaPin.SizeMB > 0 {
  1815. cpuMemSets.Insert(fmt.Sprintf("%d", int(*cpuNumaPin.NodeId)))
  1816. }
  1817. if cpuNumaPin.ExtraCpuCount > 0 {
  1818. extraCpuCount += cpuNumaPin.ExtraCpuCount
  1819. }
  1820. }
  1821. cpuSetCpus = strings.Join(cpuSets.List(), ",")
  1822. cpuSetMems = strings.Join(cpuMemSets.List(), ",")
  1823. } else if len(s.Desc.VcpuPin) > 0 {
  1824. for _, vcpuPin := range s.Desc.VcpuPin {
  1825. cpuSets.Insert(vcpuPin.Pcpus)
  1826. }
  1827. cpuSetCpus = strings.Join(cpuSets.List(), ",")
  1828. }
  1829. }
  1830. procMountType := apis.ContainerDefaultProcMount
  1831. if spec.SecurityContext != nil && spec.SecurityContext.ProcMount == apis.ContainerUnmaskedProcMount {
  1832. procMountType = apis.ContainerUnmaskedProcMount
  1833. }
  1834. ctrCfg := &runtimeapi.ContainerConfig{
  1835. Metadata: &runtimeapi.ContainerMetadata{
  1836. Name: input.Name,
  1837. Attempt: uint32(input.RestartCount),
  1838. },
  1839. Labels: map[string]string{
  1840. runtime.PodNameLabel: s.GetDesc().Name,
  1841. runtime.PodUIDLabel: s.GetId(),
  1842. runtime.ContainerNameLabel: input.Name,
  1843. runtime.ContainerRestartCountLabel: fmt.Sprintf("%d", input.RestartCount),
  1844. },
  1845. Annotations: map[string]string{
  1846. runtime.ContainerRestartCountLabel: fmt.Sprintf("%d", input.RestartCount),
  1847. },
  1848. Image: &runtimeapi.ImageSpec{
  1849. Image: spec.Image,
  1850. },
  1851. Linux: &runtimeapi.LinuxContainerConfig{
  1852. Resources: &runtimeapi.LinuxContainerResources{
  1853. // REF: https://docs.docker.com/config/containers/resource_constraints/#configure-the-default-cfs-scheduler
  1854. CpuPeriod: s.getDefaultCPUPeriod(),
  1855. CpuQuota: (s.GetDesc().Cpu + int64(extraCpuCount)) * s.getDefaultCPUPeriod(),
  1856. //CpuShares: defaultCPUPeriod,
  1857. MemoryLimitInBytes: s.GetDesc().Mem * 1024 * 1024,
  1858. OomScoreAdj: 0,
  1859. CpusetCpus: cpuSetCpus,
  1860. CpusetMems: cpuSetMems,
  1861. HugepageLimits: nil,
  1862. Unified: nil,
  1863. MemorySwapLimitInBytes: 0,
  1864. },
  1865. SecurityContext: &runtimeapi.LinuxContainerSecurityContext{
  1866. Capabilities: &runtimeapi.Capability{},
  1867. Privileged: spec.Privileged,
  1868. NamespaceOptions: podCfg.Linux.SecurityContext.GetNamespaceOptions(),
  1869. SelinuxOptions: nil,
  1870. RunAsUser: nil,
  1871. RunAsGroup: nil,
  1872. RunAsUsername: "",
  1873. ReadonlyRootfs: false,
  1874. SupplementalGroups: nil,
  1875. NoNewPrivs: !spec.DisableNoNewPrivs,
  1876. MaskedPaths: podutil.GetDefaultMaskedPaths(procMountType),
  1877. ReadonlyPaths: podutil.GetReadonlyPaths(procMountType),
  1878. Seccomp: &runtimeapi.SecurityProfile{
  1879. ProfileType: runtimeapi.SecurityProfile_Unconfined,
  1880. },
  1881. Apparmor: &runtimeapi.SecurityProfile{
  1882. ProfileType: runtimeapi.SecurityProfile_Unconfined,
  1883. },
  1884. ApparmorProfile: "",
  1885. SeccompProfilePath: "",
  1886. },
  1887. },
  1888. LogPath: s.getContainerLogPath(ctrId),
  1889. Envs: make([]*runtimeapi.KeyValue, 0),
  1890. Devices: []*runtimeapi.Device{},
  1891. Mounts: mounts,
  1892. }
  1893. // set container namespace options to target
  1894. /*if ctrCfg.Linux.SecurityContext.NamespaceOptions.Pid == runtimeapi.NamespaceMode_CONTAINER {
  1895. ctrCfg.Linux.SecurityContext.NamespaceOptions.Pid = runtimeapi.NamespaceMode_TARGET
  1896. ctrCfg.Linux.SecurityContext.NamespaceOptions.TargetId = s.GetCRIId()
  1897. }*/
  1898. if spec.Rootfs != nil {
  1899. ctrCfg.Labels[snapshot_service.LabelServerId] = s.GetId()
  1900. ctrCfg.Labels[snapshot_service.LabelContainerId] = ctrId
  1901. }
  1902. // inherit security context
  1903. if spec.SecurityContext != nil {
  1904. secInput := spec.SecurityContext
  1905. if secInput.RunAsUser != nil {
  1906. ctrCfg.Linux.SecurityContext.RunAsUser = &runtimeapi.Int64Value{
  1907. Value: *secInput.RunAsUser,
  1908. }
  1909. }
  1910. if secInput.RunAsGroup != nil {
  1911. ctrCfg.Linux.SecurityContext.RunAsGroup = &runtimeapi.Int64Value{
  1912. Value: *secInput.RunAsGroup,
  1913. }
  1914. }
  1915. if secInput.ApparmorProfile != "" {
  1916. ctrCfg.Linux.SecurityContext.Apparmor = &runtimeapi.SecurityProfile{
  1917. ProfileType: runtimeapi.SecurityProfile_Localhost,
  1918. LocalhostRef: secInput.ApparmorProfile,
  1919. }
  1920. }
  1921. }
  1922. if spec.EnableLxcfs {
  1923. ctrCfg.Mounts = append(ctrCfg.Mounts, s.getLxcfsMounts()...)
  1924. }
  1925. if spec.Capabilities != nil {
  1926. ctrCfg.Linux.SecurityContext.Capabilities.AddCapabilities = spec.Capabilities.Add
  1927. ctrCfg.Linux.SecurityContext.Capabilities.DropCapabilities = spec.Capabilities.Drop
  1928. }
  1929. envSecs := make(map[string]map[string]string, 0)
  1930. if len(spec.SecretCredentials) > 0 {
  1931. for credId, credData := range spec.SecretCredentials {
  1932. obj := map[string]string{}
  1933. credKey, err := base64.StdEncoding.DecodeString(credData)
  1934. if err != nil {
  1935. return "", errors.Wrapf(err, "decode secret credential %s", credId)
  1936. }
  1937. if err := json.Unmarshal(credKey, &obj); err != nil {
  1938. return "", errors.Wrapf(err, "unmarshal secret credential %s", credId)
  1939. }
  1940. envSecs[credId] = obj
  1941. }
  1942. for i := range spec.Envs {
  1943. env := spec.Envs[i]
  1944. if env.ValueFrom == nil {
  1945. continue
  1946. }
  1947. if env.ValueFrom.Credential != nil {
  1948. credId := env.ValueFrom.Credential.Id
  1949. if _, ok := envSecs[credId]; !ok {
  1950. return "", errors.Wrapf(errors.ErrNotFound, "secret credential %s not found", credId)
  1951. }
  1952. credKey := env.ValueFrom.Credential.Key
  1953. if _, ok := envSecs[credId][credKey]; !ok {
  1954. return "", errors.Wrapf(errors.ErrNotFound, "secret credential %s key %s not found", credId, credKey)
  1955. }
  1956. env.Value = envSecs[credId][credKey]
  1957. }
  1958. }
  1959. }
  1960. for _, env := range spec.Envs {
  1961. ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{
  1962. Key: env.Key,
  1963. Value: env.Value,
  1964. })
  1965. }
  1966. pms, err := s.GetPortMappings()
  1967. if err != nil {
  1968. return "", errors.Wrapf(err, "get pod port mappings")
  1969. }
  1970. if len(pms) != 0 {
  1971. for _, pm := range pms {
  1972. envKey := fmt.Sprintf("CLOUDPODS_%s_PORT_%d", strings.ToUpper(string(pm.Protocol)), pm.Port)
  1973. envVal := fmt.Sprintf("%d", *pm.HostPort)
  1974. ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{
  1975. Key: envKey,
  1976. Value: envVal,
  1977. })
  1978. for _, pEnv := range pm.Envs {
  1979. pEnvVal := ""
  1980. switch pEnv.ValueFrom {
  1981. case computeapi.GuestPortMappingEnvValueFromHostPort:
  1982. pEnvVal = fmt.Sprintf("%d", *pm.HostPort)
  1983. case computeapi.GuestPortMappingEnvValueFromPort:
  1984. pEnvVal = fmt.Sprintf("%d", pm.Port)
  1985. default:
  1986. return "", httperrors.NewInputParameterError("invalid value from %s", pEnv.ValueFrom)
  1987. }
  1988. ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{
  1989. Key: pEnv.Key,
  1990. Value: pEnvVal,
  1991. })
  1992. }
  1993. }
  1994. }
  1995. if s.GetDesc().HostAccessIp != "" {
  1996. ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{
  1997. Key: "CLOUDPODS_HOST_ACCESS_IP",
  1998. Value: s.GetDesc().HostAccessIp,
  1999. })
  2000. }
  2001. if s.GetDesc().HostEIP != "" {
  2002. ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{
  2003. Key: "CLOUDPODS_HOST_EIP",
  2004. Value: s.GetDesc().HostEIP,
  2005. })
  2006. }
  2007. if len(spec.Devices) != 0 {
  2008. devsByType := map[apis.ContainerDeviceType][]*hostapi.ContainerDevice{}
  2009. for i := range spec.Devices {
  2010. if devs, ok := devsByType[spec.Devices[i].Type]; ok {
  2011. devsByType[spec.Devices[i].Type] = append(devs, spec.Devices[i])
  2012. } else {
  2013. devsByType[spec.Devices[i].Type] = []*hostapi.ContainerDevice{spec.Devices[i]}
  2014. }
  2015. }
  2016. for cdType, devs := range devsByType {
  2017. ctrDevs, err := device.GetDriver(cdType).GetRuntimeDevices(input, devs)
  2018. if err != nil {
  2019. return "", errors.Wrapf(err, "GetRuntimeDevices of %s", jsonutils.Marshal(devs))
  2020. }
  2021. ctrCfg.Devices = append(ctrCfg.Devices, ctrDevs...)
  2022. }
  2023. if err := s.getIsolatedDeviceExtraConfig(spec, ctrCfg); err != nil {
  2024. return "", err
  2025. }
  2026. } else {
  2027. if hostinfo.Instance().HasContainerNvidiaGpu() {
  2028. ctrCfg.Envs = append(ctrCfg.Envs, &runtimeapi.KeyValue{Key: "NVIDIA_VISIBLE_DEVICES", Value: "void"})
  2029. }
  2030. }
  2031. if len(spec.Command) != 0 {
  2032. ctrCfg.Command = spec.Command
  2033. }
  2034. if len(spec.Args) != 0 {
  2035. ctrCfg.Args = spec.Args
  2036. }
  2037. criId, err := s.getCRI().CreateContainer(ctx, sandbox.GetId(), podCfg, ctrCfg, false)
  2038. if err != nil {
  2039. return "", errors.Wrapf(err, "cri.CreateContainer of pod: %q", sandbox.GetId())
  2040. }
  2041. if err := s.saveContainer(ctrId, criId); err != nil {
  2042. return "", errors.Wrap(err, "saveContainer")
  2043. }
  2044. return criId, nil
  2045. }
  2046. // copyEtcFile 复制主机上的 etc 文件到容器根文件系统
  2047. func (s *sPodGuestInstance) copyEtcFile(hostPath, etcFilePath string) (*runtimeapi.Mount, error) {
  2048. hostEtcFilePath := filepath.Join(hostPath, etcFilePath)
  2049. // 确保目录存在
  2050. if err := volume_mount.EnsureDir(filepath.Dir(hostEtcFilePath)); err != nil {
  2051. return nil, errors.Wrapf(err, "ensure dir %s", filepath.Dir(hostEtcFilePath))
  2052. }
  2053. // 复制文件
  2054. if err := volume_mount.CopyFile(etcFilePath, hostEtcFilePath); err != nil {
  2055. return nil, errors.Wrapf(err, "copy file %s to %s", etcFilePath, hostEtcFilePath)
  2056. }
  2057. // 创建挂载点
  2058. return &runtimeapi.Mount{
  2059. ContainerPath: etcFilePath,
  2060. HostPath: hostEtcFilePath,
  2061. }, nil
  2062. }
  2063. // generateEtcFile 生成 etc 文件内容到容器根文件系统
  2064. func (s *sPodGuestInstance) generateEtcFile(hostPath, etcFilePath, content string) (*runtimeapi.Mount, error) {
  2065. hostEtcFilePath := filepath.Join(hostPath, etcFilePath)
  2066. // 确保目录存在
  2067. if err := volume_mount.EnsureDir(filepath.Dir(hostEtcFilePath)); err != nil {
  2068. return nil, errors.Wrapf(err, "ensure dir %s", filepath.Dir(hostEtcFilePath))
  2069. }
  2070. // 生成文件内容
  2071. if err := fileutils2.FilePutContents(hostEtcFilePath, content, false); err != nil {
  2072. return nil, errors.Wrapf(err, "put file %s to %s", etcFilePath, hostEtcFilePath)
  2073. }
  2074. // 创建挂载点
  2075. return &runtimeapi.Mount{
  2076. ContainerPath: etcFilePath,
  2077. HostPath: hostEtcFilePath,
  2078. }, nil
  2079. }
  2080. func (s *sPodGuestInstance) getEtcFilesMount(ctrId string) ([]*runtimeapi.Mount, error) {
  2081. hostPath, err := s.GetRootFsMountPath(ctrId)
  2082. if err != nil {
  2083. return nil, errors.Wrapf(err, "get container root fs path of %s", ctrId)
  2084. }
  2085. // 复制 /etc/hosts 文件
  2086. etcHostsMount, err := s.copyEtcFile(hostPath, "/etc/hosts")
  2087. if err != nil {
  2088. return nil, errors.Wrap(err, "copy /etc/hosts")
  2089. }
  2090. // 生成 /etc/hostname 文件
  2091. etcHostnameMount, err := s.generateEtcFile(hostPath, "/etc/hostname", s.GetDesc().Hostname)
  2092. if err != nil {
  2093. return nil, errors.Wrap(err, "generate /etc/hostname")
  2094. }
  2095. // 复制 /etc/resolv.conf 文件
  2096. etcResolvConfMount, err := s.copyEtcFile(hostPath, "/etc/resolv.conf")
  2097. if err != nil {
  2098. return nil, errors.Wrap(err, "copy /etc/resolv.conf")
  2099. }
  2100. return []*runtimeapi.Mount{etcHostsMount, etcHostnameMount, etcResolvConfMount}, nil
  2101. }
  2102. type FilteredContainerDevices struct {
  2103. EnvDevs []*hostapi.ContainerDevice
  2104. CDIDevs []*hostapi.ContainerDevice
  2105. RestDevs []*hostapi.ContainerDevice
  2106. }
  2107. func filterContainerIsolatedDevices(devs []*hostapi.ContainerDevice, devTypes sets.String) FilteredContainerDevices {
  2108. envDevs := []*hostapi.ContainerDevice{}
  2109. restDevs := []*hostapi.ContainerDevice{}
  2110. cdiDevs := []*hostapi.ContainerDevice{}
  2111. for i := range devs {
  2112. dev := devs[i]
  2113. if dev.IsolatedDevice != nil {
  2114. devType := dev.IsolatedDevice.DeviceType
  2115. if !devTypes.Has(devType) {
  2116. continue
  2117. }
  2118. if dev.IsolatedDevice.IsCDIUsed() {
  2119. cdiDevs = append(cdiDevs, dev)
  2120. } else if len(dev.IsolatedDevice.OnlyEnv) > 0 {
  2121. envDevs = append(envDevs, dev)
  2122. } else {
  2123. restDevs = append(restDevs, dev)
  2124. }
  2125. }
  2126. }
  2127. return FilteredContainerDevices{
  2128. EnvDevs: envDevs,
  2129. CDIDevs: cdiDevs,
  2130. RestDevs: restDevs,
  2131. }
  2132. }
  2133. func getEnvsFromDevices(devs []*hostapi.ContainerDevice) []*runtimeapi.KeyValue {
  2134. retEnvs := []*runtimeapi.KeyValue{}
  2135. for _, dev := range devs {
  2136. if dev.IsolatedDevice == nil {
  2137. continue
  2138. }
  2139. if len(dev.IsolatedDevice.OnlyEnv) == 0 {
  2140. continue
  2141. }
  2142. for _, oe := range dev.IsolatedDevice.OnlyEnv {
  2143. var tmpEnv *runtimeapi.KeyValue
  2144. if oe.FromRenderPath {
  2145. tmpEnv = &runtimeapi.KeyValue{
  2146. Key: oe.Key,
  2147. Value: dev.IsolatedDevice.RenderPath,
  2148. }
  2149. }
  2150. if oe.FromIndex {
  2151. tmpEnv = &runtimeapi.KeyValue{
  2152. Key: oe.Key,
  2153. Value: fmt.Sprintf("%d", dev.IsolatedDevice.Index),
  2154. }
  2155. }
  2156. if oe.FromDeviceMinor {
  2157. tmpEnv = &runtimeapi.KeyValue{
  2158. Key: oe.Key,
  2159. Value: fmt.Sprintf("%d", dev.IsolatedDevice.DeviceMinor),
  2160. }
  2161. }
  2162. if tmpEnv != nil {
  2163. retEnvs = append(retEnvs, tmpEnv)
  2164. }
  2165. }
  2166. }
  2167. return retEnvs
  2168. }
  2169. func (s *sPodGuestInstance) getIsolatedDeviceExtraConfig(spec *hostapi.ContainerSpec, ctrCfg *runtimeapi.ContainerConfig) error {
  2170. devTypes := sets.NewString(
  2171. string(isolated_device.ContainerDeviceTypeNvidiaGpu),
  2172. string(isolated_device.ContainerDeviceTypeNvidiaMps),
  2173. string(isolated_device.ContainerDeviceTypeNvidiaGpuShare),
  2174. string(isolated_device.ContainerDeviceTypeAscendNpu),
  2175. )
  2176. fDevs := filterContainerIsolatedDevices(spec.Devices, devTypes)
  2177. if len(fDevs.EnvDevs) != 0 {
  2178. ctrCfg.Envs = append(ctrCfg.Envs, getEnvsFromDevices(fDevs.EnvDevs)...)
  2179. }
  2180. for i := range fDevs.RestDevs {
  2181. dev := fDevs.RestDevs[i]
  2182. devMan, err := isolated_device.GetContainerDeviceManager(isolated_device.ContainerDeviceType(dev.IsolatedDevice.DeviceType))
  2183. if err != nil {
  2184. return errors.Wrapf(err, "GetContainerDeviceManager by type %q", dev.IsolatedDevice.DeviceType)
  2185. }
  2186. envs, mounts := devMan.GetContainerExtraConfigures([]*hostapi.ContainerDevice{dev})
  2187. if len(envs) > 0 {
  2188. ctrCfg.Envs = append(ctrCfg.Envs, envs...)
  2189. }
  2190. if len(mounts) > 0 {
  2191. ctrCfg.Mounts = append(ctrCfg.Mounts, mounts...)
  2192. }
  2193. }
  2194. if len(fDevs.CDIDevs) > 0 {
  2195. cdiDevs, err := isolated_device.GetContainerCDIDevices(fDevs.CDIDevs)
  2196. if err != nil {
  2197. return errors.Wrap(err, "GetContainerCDIDevices")
  2198. }
  2199. ctrCfg.CDIDevices = append(ctrCfg.CDIDevices, cdiDevs...)
  2200. }
  2201. return nil
  2202. }
  2203. func (s *sPodGuestInstance) getContainerSystemCpusDir(ctrId string) string {
  2204. rootFsPath, _ := s.GetRootFsMountPath(ctrId)
  2205. if rootFsPath != "" {
  2206. return filepath.Join(rootFsPath, "cpus", ctrId)
  2207. }
  2208. return filepath.Join(s.HomeDir(), "cpus", ctrId)
  2209. }
  2210. func (s *sPodGuestInstance) ensureContainerSystemCpuDir(cpuDir string, cpuCnt int64) error {
  2211. // create cpu dir like /var/lib/docker/cpus/$ctr_name
  2212. if err := podutil.EnsureContainerSystemCpuDir(cpuDir, cpuCnt); err != nil {
  2213. return errors.Wrap(err, "ensure container system cpu dir")
  2214. }
  2215. return nil
  2216. }
  2217. func (s *sPodGuestInstance) findHostCpuPath(ctrId string, cpuIndex int) (int, error) {
  2218. return s.getHostCPUMap().Get(ctrId, cpuIndex)
  2219. }
  2220. func (s *sPodGuestInstance) simulateContainerSystemCpuSetScalingCurFreq(ctrId string, scalingCurFreq int64) error {
  2221. cpuDir := s.getContainerSystemCpusDir(ctrId)
  2222. cpuCnt := s.GetDesc().Cpu
  2223. for i := 0; i < int(cpuCnt); i++ {
  2224. cpufreqPolicyCurFreqFile := path.Join(cpuDir, "cpufreq", fmt.Sprintf("policy%d", i), "scaling_cur_freq")
  2225. cpufreqPolicySetSpeedFile := path.Join(cpuDir, "cpufreq", fmt.Sprintf("policy%d", i), "scaling_setspeed")
  2226. scalingCurFreqStr := fmt.Sprintf("%d\n", scalingCurFreq)
  2227. if err := fileutils2.FilePutContents(cpufreqPolicyCurFreqFile, scalingCurFreqStr, false); err != nil {
  2228. return errors.Wrapf(err, "failed write %s", cpufreqPolicyCurFreqFile)
  2229. }
  2230. if err := fileutils2.FilePutContents(cpufreqPolicySetSpeedFile, scalingCurFreqStr, false); err != nil {
  2231. return errors.Wrapf(err, "failed write %s", cpufreqPolicySetSpeedFile)
  2232. }
  2233. }
  2234. return nil
  2235. }
  2236. func (s *sPodGuestInstance) simulateContainerSystemCpu(ctx context.Context, ctrId string) ([]*runtimeapi.Mount, error) {
  2237. cpuDir := s.getContainerSystemCpusDir(ctrId)
  2238. cpuCnt := s.GetDesc().Cpu
  2239. if err := s.ensureContainerSystemCpuDir(cpuDir, cpuCnt); err != nil {
  2240. return nil, err
  2241. }
  2242. cpufreqConfig := s.manager.host.GetContainerCpufreqSimulateConfig()
  2243. if cpufreqConfig != nil {
  2244. cpufreqDir := path.Join(cpuDir, "cpufreq")
  2245. out, err := procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", cpufreqDir).Output()
  2246. if err != nil {
  2247. return nil, errors.Wrapf(err, "mkdir %s: %s", cpufreqDir, out)
  2248. }
  2249. }
  2250. sysCpuPath := "/sys/devices/system/cpu"
  2251. ret := []*runtimeapi.Mount{
  2252. {
  2253. ContainerPath: sysCpuPath,
  2254. HostPath: cpuDir,
  2255. },
  2256. }
  2257. for i := 0; i < int(cpuCnt); i++ {
  2258. hostCpuIdx, err := s.findHostCpuPath(ctrId, i)
  2259. if err != nil {
  2260. return nil, errors.Wrapf(err, "find host cpu by container %s with index %d", ctrId, i)
  2261. }
  2262. hostCpuPath := filepath.Join(sysCpuPath, fmt.Sprintf("cpu%d", hostCpuIdx))
  2263. if cpufreqConfig != nil {
  2264. if err := s.ensureContainerSystemCpufreqHostDir(cpuDir, hostCpuPath, i, cpufreqConfig); err != nil {
  2265. return nil, errors.Wrap(err, "ensureContainerSystemCpufreqHostDir")
  2266. }
  2267. } else {
  2268. ret = append(ret, &runtimeapi.Mount{
  2269. ContainerPath: filepath.Join(sysCpuPath, fmt.Sprintf("cpu%d", i)),
  2270. HostPath: hostCpuPath,
  2271. })
  2272. }
  2273. }
  2274. pathMap := func(baseName string) *runtimeapi.Mount {
  2275. p := filepath.Join(sysCpuPath, baseName)
  2276. return &runtimeapi.Mount{
  2277. ContainerPath: p,
  2278. HostPath: p,
  2279. Readonly: true,
  2280. }
  2281. }
  2282. cpuConfigs := []string{"modalias", "power", "cpuidle", "hotplug", "isolated", "uevent"}
  2283. if cpufreqConfig == nil {
  2284. cpuConfigs = append(cpuConfigs, "cpufreq")
  2285. }
  2286. for _, baseName := range cpuConfigs {
  2287. ret = append(ret, pathMap(baseName))
  2288. }
  2289. return ret, nil
  2290. }
  2291. func (s *sPodGuestInstance) ensureContainerSystemCpufreqHostDir(cpuDir, hostCpuPath string, cpuIdx int, cpufreqConfig *jsonutils.JSONDict) error {
  2292. cpufreqPolicyDir := path.Join(cpuDir, "cpufreq", fmt.Sprintf("policy%d", cpuIdx))
  2293. out, err := procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", cpufreqPolicyDir).Output()
  2294. if err != nil {
  2295. return errors.Wrapf(err, "mkdir %s: %s", cpufreqPolicyDir, out)
  2296. }
  2297. cpuiDir := path.Join(cpuDir, fmt.Sprintf("cpu%d", cpuIdx))
  2298. out, err = procutils.NewRemoteCommandAsFarAsPossible("cp", "-rf", hostCpuPath, cpuiDir).Output()
  2299. if err != nil {
  2300. log.Warningf("cp %s to %s: %s %s", hostCpuPath, cpuiDir, out, err)
  2301. }
  2302. cpufreqDir := path.Join(cpuiDir, "cpufreq")
  2303. out, err = procutils.NewRemoteCommandAsFarAsPossible("rm", "-f", cpufreqDir).Output()
  2304. if err != nil {
  2305. return errors.Wrapf(err, "rm -f %s: %s", cpufreqDir, out)
  2306. }
  2307. for _, fname := range []string{
  2308. "affected_cpus",
  2309. "cpuinfo_max_freq",
  2310. "cpuinfo_min_freq",
  2311. "cpuinfo_transition_latency",
  2312. "related_cpus",
  2313. "scaling_available_governors",
  2314. "scaling_cur_freq",
  2315. "scaling_driver",
  2316. "scaling_governor",
  2317. "scaling_max_freq",
  2318. "scaling_min_freq",
  2319. "scaling_setspeed",
  2320. } {
  2321. switch fname {
  2322. case "affected_cpus", "related_cpus":
  2323. val := strconv.Itoa(cpuIdx)
  2324. cpath := path.Join(cpufreqPolicyDir, fname)
  2325. if err := fileutils2.FilePutContents(cpath, val+"\n", false); err != nil {
  2326. return errors.Wrapf(err, "failed write %s", cpath)
  2327. }
  2328. default:
  2329. val, err := cpufreqConfig.GetString(fname)
  2330. if err != nil {
  2331. log.Warningf("simulate cpufreq no %s", fname)
  2332. continue
  2333. }
  2334. cpath := path.Join(cpufreqPolicyDir, fname)
  2335. if err := fileutils2.FilePutContents(cpath, val+"\n", false); err != nil {
  2336. return errors.Wrapf(err, "failed write %s", cpath)
  2337. }
  2338. }
  2339. }
  2340. out, err = procutils.NewRemoteCommandAsFarAsPossible("ln", "-s", fmt.Sprintf("../cpufreq/policy%d", cpuIdx), cpufreqDir).Output()
  2341. if err != nil {
  2342. return errors.Wrapf(err, "ln -s ../cpufreq/policy%d %s: %s", cpuIdx, cpufreqDir, out)
  2343. }
  2344. return nil
  2345. }
  2346. func (s *sPodGuestInstance) DeleteContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) {
  2347. criId, err := s.getContainerCRIId(ctrId)
  2348. if err != nil && errors.Cause(err) != errors.ErrNotFound {
  2349. return nil, errors.Wrap(err, "getContainerCRIId")
  2350. }
  2351. if criId != "" {
  2352. s.expectedStatus.RemoveContainer(criId)
  2353. if err := s.getCRI().RemoveContainer(ctx, criId); err != nil && !IsContainerNotFoundError(err) {
  2354. return nil, errors.Wrap(err, "cri.RemoveContainer")
  2355. }
  2356. }
  2357. // refresh local containers file
  2358. delete(s.containers, ctrId)
  2359. if err := s.saveContainersFile(s.containers); err != nil {
  2360. return nil, errors.Wrap(err, "saveContainersFile")
  2361. }
  2362. if err := s.getHostCPUMap().Delete(ctrId); err != nil {
  2363. log.Warningf("delete container %s cpu map: %v", ctrId, err)
  2364. }
  2365. return nil, nil
  2366. }
  2367. func (s *sPodGuestInstance) GetContainerStatus(ctx context.Context, ctrId string) (string, *runtime.Status, error) {
  2368. return s.getContainerStatus(ctx, ctrId)
  2369. }
  2370. func (s *sPodGuestInstance) getContainerStatus(ctx context.Context, ctrId string) (string, *runtime.Status, error) {
  2371. criId, err := s.getContainerCRIId(ctrId)
  2372. if err != nil {
  2373. if errors.Cause(err) == errors.ErrNotFound {
  2374. // not found, already stopped
  2375. return computeapi.CONTAINER_STATUS_EXITED, nil, nil
  2376. }
  2377. return "", nil, errors.Wrapf(err, "get container cri_id by %s", ctrId)
  2378. }
  2379. resp, err := s.getCRI().ContainerStatus(ctx, criId)
  2380. if err != nil {
  2381. if IsContainerNotFoundError(err) {
  2382. return computeapi.CONTAINER_STATUS_EXITED, nil, nil
  2383. }
  2384. return "", nil, errors.Wrap(err, "cri.ContainerStatus")
  2385. }
  2386. cs := runtime.ToContainerStatus(resp.Status, "containerd")
  2387. status := computeapi.CONTAINER_STATUS_UNKNOWN
  2388. switch resp.Status.State {
  2389. case runtimeapi.ContainerState_CONTAINER_CREATED:
  2390. status = computeapi.CONTAINER_STATUS_CREATED
  2391. case runtimeapi.ContainerState_CONTAINER_RUNNING:
  2392. status = computeapi.CONTAINER_STATUS_RUNNING
  2393. case runtimeapi.ContainerState_CONTAINER_EXITED:
  2394. status = computeapi.CONTAINER_STATUS_EXITED
  2395. case runtimeapi.ContainerState_CONTAINER_UNKNOWN:
  2396. status = computeapi.CONTAINER_STATUS_UNKNOWN
  2397. }
  2398. if status == computeapi.CONTAINER_STATUS_RUNNING {
  2399. ctr := s.GetContainerById(ctrId)
  2400. if ctr == nil {
  2401. return "", cs, errors.Wrapf(httperrors.ErrNotFound, "not found container by id %s", ctrId)
  2402. }
  2403. if ctr.Spec.NeedProbe() {
  2404. status = computeapi.CONTAINER_STATUS_PROBING
  2405. }
  2406. }
  2407. if status == computeapi.CONTAINER_STATUS_EXITED && resp.Status.ExitCode != 0 {
  2408. if _, isInternalStopped := s.IsInternalStopped(criId); !isInternalStopped {
  2409. status = computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF
  2410. }
  2411. }
  2412. return status, cs, nil
  2413. }
  2414. func (s *sPodGuestInstance) MarkContainerProbeDirty(ctrStatus string, ctrId string, reason string) {
  2415. s.markContainerProbeDirty(ctrStatus, ctrId, reason)
  2416. }
  2417. func (s *sPodGuestInstance) markContainerProbeDirty(status, ctrId string, reason string) {
  2418. if status == computeapi.CONTAINER_STATUS_PROBING {
  2419. reason = fmt.Sprintf("status is probing: %s", reason)
  2420. s.getProbeManager().SetDirtyContainer(ctrId, reason)
  2421. s.getProbeManager().AddPod(s)
  2422. }
  2423. }
  2424. func (s *sPodGuestInstance) SyncContainerStatus(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) {
  2425. status, cs, err := s.getContainerStatus(ctx, ctrId)
  2426. if err != nil {
  2427. return nil, errors.Wrap(err, "get container status")
  2428. }
  2429. s.markContainerProbeDirty(status, ctrId, "after syncing status")
  2430. resp := computeapi.ContainerSyncStatusResponse{
  2431. Status: status,
  2432. }
  2433. if cs != nil {
  2434. resp.StartedAt = cs.StartedAt
  2435. resp.RestartCount = cs.RestartCount
  2436. }
  2437. return jsonutils.Marshal(resp), nil
  2438. }
  2439. func (s *sPodGuestInstance) PullImage(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) {
  2440. policy := input.PullPolicy
  2441. if policy == apis.ImagePullPolicyIfNotPresent || policy == "" {
  2442. // check if image is presented
  2443. img, err := s.getCRI().ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
  2444. Image: &runtimeapi.ImageSpec{
  2445. Image: input.Image,
  2446. },
  2447. })
  2448. if err != nil {
  2449. return nil, errors.Wrapf(err, "cri.ImageStatus %s", input.Image)
  2450. }
  2451. if img.Image != nil {
  2452. log.Infof("image %s already exists, skipping pulling it when policy is %s", input.Image, policy)
  2453. return jsonutils.Marshal(&runtimeapi.PullImageResponse{
  2454. ImageRef: img.Image.Id,
  2455. }), nil
  2456. }
  2457. }
  2458. return s.pullImageByCtrCmd(ctx, userCred, ctrId, input)
  2459. // return s.pullImageByCRI(ctx, userCred, ctrId, input)
  2460. }
  2461. func (s *sPodGuestInstance) pullImageByCtrCmd(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) {
  2462. if err := PullContainerdImage(input); err != nil {
  2463. return nil, errors.Wrap(err, "PullContainerdImage with https and http")
  2464. }
  2465. return jsonutils.Marshal(&runtimeapi.PullImageResponse{
  2466. ImageRef: input.Image,
  2467. }), nil
  2468. }
  2469. func (s *sPodGuestInstance) pullImageByCRI(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) {
  2470. /*podCfg, err := s.getPodSandboxConfig()
  2471. if err != nil {
  2472. return nil, errors.Wrap(err, "get pod sandbox config")
  2473. }*/
  2474. req := &runtimeapi.PullImageRequest{
  2475. Image: &runtimeapi.ImageSpec{
  2476. Image: input.Image,
  2477. },
  2478. // SandboxConfig: podCfg,
  2479. }
  2480. if input.Auth != nil {
  2481. authCfg := &runtimeapi.AuthConfig{
  2482. Username: input.Auth.Username,
  2483. Password: input.Auth.Password,
  2484. Auth: input.Auth.Auth,
  2485. ServerAddress: input.Auth.ServerAddress,
  2486. IdentityToken: input.Auth.IdentityToken,
  2487. RegistryToken: input.Auth.RegistryToken,
  2488. }
  2489. req.Auth = authCfg
  2490. }
  2491. resp, err := s.getCRI().PullImage(ctx, req)
  2492. if err != nil {
  2493. return nil, errors.Wrapf(err, "cri.PullImage %s", input.Image)
  2494. }
  2495. return jsonutils.Marshal(resp), nil
  2496. }
  2497. func (s *sPodGuestInstance) SaveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredential, input *hostapi.ContainerSaveVolumeMountToImageInput, ctrId string) (jsonutils.JSONObject, error) {
  2498. vol := input.VolumeMount
  2499. drv := volume_mount.GetDriver(vol.Type)
  2500. if err := drv.Mount(s, ctrId, vol); err != nil {
  2501. return nil, errors.Wrapf(err, "mount volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId)
  2502. }
  2503. defer func() {
  2504. if err := drv.Unmount(s, ctrId, vol); err != nil {
  2505. log.Warningf("unmount volume %s: %v", jsonutils.Marshal(vol), err)
  2506. }
  2507. }()
  2508. hostPath, err := drv.GetRuntimeMountHostPath(s, ctrId, vol)
  2509. if err != nil {
  2510. return nil, errors.Wrapf(err, "get runtime host mount path of %s", jsonutils.Marshal(vol))
  2511. }
  2512. // 1. tar hostPath to tgz
  2513. imgPath, originalSizeBytes, err := s.tarGzDir(input, ctrId, hostPath)
  2514. if err != nil {
  2515. return nil, errors.Wrapf(err, "tar and zip directory %s", hostPath)
  2516. }
  2517. defer func() {
  2518. out, err := procutils.NewRemoteCommandAsFarAsPossible("rm", "-f", imgPath).Output()
  2519. if err != nil {
  2520. log.Warningf("rm -f %s: %s", imgPath, out)
  2521. }
  2522. }()
  2523. // 2. upload target tgz to glance
  2524. if err := s.saveTarGzToGlance(ctx, input, imgPath, originalSizeBytes); err != nil {
  2525. return nil, errors.Wrapf(err, "saveTarGzToGlance: %s", imgPath)
  2526. }
  2527. return nil, nil
  2528. }
  2529. // shellQuote wraps s in single quotes, escaping any embedded single quotes
  2530. // so the result is safe to embed in a sh -c command string.
  2531. func shellQuote(s string) string {
  2532. return "'" + strings.ReplaceAll(s, "'", "'\\''") + "'"
  2533. }
  2534. func (s *sPodGuestInstance) tarGzDir(input *hostapi.ContainerSaveVolumeMountToImageInput, ctrId string, hostPath string) (string, int64, error) {
  2535. fp := fmt.Sprintf("volimg-%s-ctr-%s-%d.tar.gz", input.ImageId, ctrId, input.VolumeMountIndex)
  2536. outputFp := filepath.Join(s.GetVolumesDir(), fp)
  2537. dirPath := "."
  2538. if len(input.VolumeMountDirs) != 0 {
  2539. dirPath = ""
  2540. for _, vd := range input.VolumeMountDirs {
  2541. dirPath = fmt.Sprintf("%s %s", dirPath, shellQuote(vd))
  2542. }
  2543. }
  2544. // 计算总字节数,兼容多个目录/文件
  2545. var totalSize int64
  2546. if len(input.VolumeMountDirs) == 0 {
  2547. sizeCmd := fmt.Sprintf("du -sb %s | awk '{print $1}'", shellQuote(hostPath))
  2548. out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", sizeCmd).Output()
  2549. if err != nil {
  2550. return "", 0, errors.Wrapf(err, "calculate total size: %s", out)
  2551. }
  2552. outStr := strings.TrimSpace(string(out))
  2553. totalSize, err = strconv.ParseInt(outStr, 10, 64)
  2554. if err != nil {
  2555. return "", 0, errors.Wrapf(err, "parse total size: %s", outStr)
  2556. }
  2557. } else {
  2558. for _, d := range input.VolumeMountDirs {
  2559. // 兼容目录或文件名有空格
  2560. sizeCmd := fmt.Sprintf("du -sb %s | awk '{print $1}'", shellQuote(d))
  2561. cmd := fmt.Sprintf("cd %s && %s", shellQuote(hostPath), sizeCmd)
  2562. out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output()
  2563. if err != nil {
  2564. return "", 0, errors.Wrapf(err, "calculate total size for %s: %s", d, out)
  2565. }
  2566. outStr := strings.TrimSpace(string(out))
  2567. sz, err := strconv.ParseInt(outStr, 10, 64)
  2568. if err != nil {
  2569. return "", 0, errors.Wrapf(err, "parse total size for %s: %s", d, outStr)
  2570. }
  2571. totalSize += sz
  2572. }
  2573. }
  2574. // 减去 excludePaths 的大小
  2575. for _, exclude := range input.ExcludePaths {
  2576. // exclude 路径是相对于 hostPath 的
  2577. sizeCmd := fmt.Sprintf("du -sb %s 2>/dev/null | awk '{print $1}'", shellQuote(exclude))
  2578. cmd := fmt.Sprintf("cd %s && %s", shellQuote(hostPath), sizeCmd)
  2579. out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output()
  2580. if err != nil {
  2581. // exclude 路径不存在时忽略错误,继续处理下一个
  2582. continue
  2583. }
  2584. outStr := strings.TrimSpace(string(out))
  2585. if outStr == "" {
  2586. continue
  2587. }
  2588. excludeSize, err := strconv.ParseInt(outStr, 10, 64)
  2589. if err != nil {
  2590. // 解析失败时忽略,继续处理下一个
  2591. continue
  2592. }
  2593. totalSize -= excludeSize
  2594. // 确保 totalSize 不为负数
  2595. if totalSize < 0 {
  2596. totalSize = 0
  2597. }
  2598. }
  2599. baseCmd := "tar -czf"
  2600. // 添加 exclude 选项
  2601. for _, exclude := range input.ExcludePaths {
  2602. baseCmd = fmt.Sprintf("%s --exclude=%s", baseCmd, shellQuote(exclude))
  2603. }
  2604. cmd := fmt.Sprintf("%s %s -C %s %s", baseCmd, shellQuote(outputFp), shellQuote(hostPath), dirPath)
  2605. if input.VolumeMountPrefix != "" {
  2606. cmd += fmt.Sprintf(" --transform 's,^,%s/,'", input.VolumeMountPrefix)
  2607. }
  2608. if out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output(); err != nil {
  2609. return "", 0, errors.Wrapf(err, "%s: %s", cmd, out)
  2610. }
  2611. return outputFp, totalSize, nil
  2612. }
  2613. func (s *sPodGuestInstance) saveTarGzToGlance(ctx context.Context, input *hostapi.ContainerSaveVolumeMountToImageInput, imgPath string, originalSizeBytes int64) error {
  2614. f, err := os.Open(imgPath)
  2615. if err != nil {
  2616. return err
  2617. }
  2618. defer f.Close()
  2619. finfo, err := f.Stat()
  2620. if err != nil {
  2621. return err
  2622. }
  2623. size := finfo.Size()
  2624. // 转换为 MB
  2625. originalSizeMB := originalSizeBytes / (1024 * 1024)
  2626. var params = jsonutils.NewDict()
  2627. params.Set("image_id", jsonutils.NewString(input.ImageId))
  2628. params.Set("min_disk", jsonutils.NewInt(originalSizeMB))
  2629. if _, err := imagemod.Images.Upload(hostutils.GetImageSession(ctx), params, f, size); err != nil {
  2630. return errors.Wrap(err, "upload image")
  2631. }
  2632. return err
  2633. }
  2634. func (s *sPodGuestInstance) ExecContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecInput) (*url.URL, error) {
  2635. rCli := s.getCRI().GetRuntimeClient()
  2636. criId, err := s.getContainerCRIId(ctrId)
  2637. if err != nil {
  2638. return nil, errors.Wrap(err, "get container cri id")
  2639. }
  2640. stderr := true
  2641. if input.Tty {
  2642. stderr = false
  2643. }
  2644. req := &runtimeapi.ExecRequest{
  2645. ContainerId: criId,
  2646. Cmd: input.Command,
  2647. Tty: input.Tty,
  2648. Stdin: true,
  2649. Stdout: true,
  2650. Stderr: stderr,
  2651. }
  2652. if input.SetIO {
  2653. req.Stdin = input.Stdin
  2654. req.Stdout = input.Stdout
  2655. }
  2656. resp, err := rCli.Exec(ctx, req)
  2657. if err != nil {
  2658. return nil, errors.Wrap(err, "exec")
  2659. }
  2660. return url.Parse(resp.Url)
  2661. }
  2662. func (s *sPodGuestInstance) mountDevShm(input *hostapi.ContainerCreateInput, mb int) (string, error) {
  2663. shmPath := s.getContainerShmDir(input.Name)
  2664. if !fileutils2.Exists(shmPath) {
  2665. out, err := procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", shmPath).Output()
  2666. if err != nil {
  2667. return "", errors.Wrapf(err, "mkdir -p %s: %s", shmPath, out)
  2668. }
  2669. }
  2670. if err := procutils.NewRemoteCommandAsFarAsPossible("mountpoint", shmPath).Run(); err == nil {
  2671. log.Warningf("mountpoint %s is already mounted", shmPath)
  2672. return "", nil
  2673. }
  2674. out, err := procutils.NewRemoteCommandAsFarAsPossible("mount", "-t", "tmpfs", "-o", fmt.Sprintf("size=%dM", mb), "tmpfs", shmPath).Output()
  2675. if err != nil {
  2676. return "", errors.Wrapf(err, "mount tmpfs %s: %s", shmPath, out)
  2677. }
  2678. return shmPath, nil
  2679. }
  2680. func (s *sPodGuestInstance) unmountDevShm(containerName string) error {
  2681. shmPath := s.getContainerShmDir(containerName)
  2682. if err := procutils.NewRemoteCommandAsFarAsPossible("mountpoint", shmPath).Run(); err != nil {
  2683. return nil
  2684. }
  2685. out, err := procutils.NewRemoteCommandAsFarAsPossible("umount", shmPath).Output()
  2686. if err != nil {
  2687. return errors.Wrapf(err, "mount tmpfs %s: %s", shmPath, out)
  2688. }
  2689. return nil
  2690. }
  2691. func (s *sPodGuestInstance) DoSnapshot(ctx context.Context, params *SDiskSnapshot) (jsonutils.JSONObject, error) {
  2692. input := params.BackupDiskConfig.BackupAsTar
  2693. if input.ContainerId == "" {
  2694. return nil, httperrors.NewMissingParameterError("missing backup_disk_config.backup_as_tar.container_id")
  2695. }
  2696. isCtrRunning, err := s.IsContainerRunning(ctx, input.ContainerId)
  2697. if err != nil {
  2698. return nil, errors.Wrapf(err, "check container %s running status", input.ContainerId)
  2699. }
  2700. if params.BackupDiskConfig == nil {
  2701. return nil, httperrors.NewMissingParameterError("missing backup_disk_config")
  2702. }
  2703. if params.BackupDiskConfig.BackupAsTar == nil {
  2704. return nil, httperrors.NewMissingParameterError("missing backup_disk_config.backup_as_tar")
  2705. }
  2706. vols := s.getContainerVolumeMountsByDiskId(input.ContainerId, params.Disk.GetId())
  2707. if len(vols) == 0 {
  2708. return nil, httperrors.NewNotFoundError("not found container volume_mount by container_id %s and disk_id %s", input.ContainerId, params.Disk.GetId())
  2709. }
  2710. tmpBackRootDir, err := storageman.EnsureBackupDir()
  2711. if err != nil {
  2712. return nil, errors.Wrap(err, "EnsureBackupDir")
  2713. }
  2714. defer storageman.CleanupDirOrFile(tmpBackRootDir)
  2715. povTmpBackRootDir := []string{}
  2716. for _, vol := range vols {
  2717. drv := volume_mount.GetDriver(vol.Type)
  2718. if err := drv.Mount(s, input.ContainerId, vol); err != nil {
  2719. return nil, errors.Wrapf(err, "mount %s to %s", input.ContainerId, jsonutils.Marshal(vol))
  2720. }
  2721. mntPath, err := drv.GetRuntimeMountHostPath(s, input.ContainerId, vol)
  2722. if err != nil {
  2723. return nil, errors.Wrapf(err, "GetRuntimeMountHostPath containerId: %s, vol: %s", input.ContainerId, jsonutils.Marshal(vol))
  2724. }
  2725. isMntPathFile := false
  2726. targetBindMntPath := tmpBackRootDir
  2727. if vol.Disk.SubDirectory != "" {
  2728. // mkdir tmpBackRootDir/subdirectory
  2729. targetBindMntPath = filepath.Join(tmpBackRootDir, vol.Disk.SubDirectory)
  2730. }
  2731. if vol.Disk.StorageSizeFile != "" {
  2732. targetBindMntPath = filepath.Join(tmpBackRootDir, vol.Disk.StorageSizeFile)
  2733. isMntPathFile = true
  2734. }
  2735. if isMntPathFile {
  2736. if out, err := procutils.NewRemoteCommandAsFarAsPossible("touch", targetBindMntPath).Output(); err != nil {
  2737. return nil, errors.Wrapf(err, "touch %s: %s", targetBindMntPath, out)
  2738. }
  2739. } else {
  2740. if err := volume_mount.EnsureDir(targetBindMntPath); err != nil {
  2741. return nil, errors.Wrap(err, "ensure dir")
  2742. }
  2743. }
  2744. // do bind mount
  2745. if err := mountutils.MountBind(mntPath, targetBindMntPath); err != nil {
  2746. return nil, errors.Wrapf(err, "bind mount %s to %s", mntPath, targetBindMntPath)
  2747. }
  2748. // process post overlay
  2749. diskDrv := drv.(disk.IVolumeMountDisk)
  2750. for _, pov := range vol.Disk.PostOverlay {
  2751. // bind mount post overlay dirs to tmpBackRootDir
  2752. upperDir, err := diskDrv.GetPostOverlayRootUpperDir(s, vol, input.ContainerId, pov)
  2753. if err != nil {
  2754. return nil, errors.Wrapf(err, "get post overlay root upper dir: %s", jsonutils.Marshal(pov))
  2755. }
  2756. workDir, err := diskDrv.GetPostOverlayRootWorkDir(s, vol, input.ContainerId)
  2757. if err != nil {
  2758. return nil, errors.Wrapf(err, "get post overlay root upper dir: %s", jsonutils.Marshal(pov))
  2759. }
  2760. hostDiskRootPath, _ := diskDrv.GetHostDiskRootPath(s, vol)
  2761. for _, srcDir := range []string{upperDir, workDir} {
  2762. targetSubDir := strings.TrimPrefix(srcDir, hostDiskRootPath)
  2763. targetPovBindMntPath := filepath.Join(tmpBackRootDir, targetSubDir)
  2764. if err := mountutils.MountBind(srcDir, targetPovBindMntPath); err != nil {
  2765. return nil, errors.Wrap(err, "bind mount post overlay dir")
  2766. }
  2767. povTmpBackRootDir = append(povTmpBackRootDir, targetPovBindMntPath)
  2768. }
  2769. }
  2770. }
  2771. deferUmount := func() error {
  2772. for _, vol := range vols {
  2773. // unbind mount
  2774. for _, povPath := range povTmpBackRootDir {
  2775. if err := mountutils.Unmount(povPath, false); err != nil {
  2776. return errors.Wrapf(err, "umount bind point %s", povTmpBackRootDir)
  2777. }
  2778. }
  2779. targetBindMntPath := filepath.Join(tmpBackRootDir, vol.Disk.SubDirectory)
  2780. if vol.Disk.StorageSizeFile != "" {
  2781. targetBindMntPath = filepath.Join(tmpBackRootDir, vol.Disk.StorageSizeFile)
  2782. }
  2783. if err := mountutils.Unmount(targetBindMntPath, false); err != nil {
  2784. return errors.Wrapf(err, "umount bind point %s", targetBindMntPath)
  2785. }
  2786. }
  2787. if !isCtrRunning && !s.IsRunning() {
  2788. for _, vol := range vols {
  2789. drv := volume_mount.GetDriver(vol.Type)
  2790. if err := drv.Unmount(s, input.ContainerId, vol); err != nil {
  2791. return errors.Wrapf(err, "unmount %s to %s", input.ContainerId, jsonutils.Marshal(vol))
  2792. }
  2793. }
  2794. } else {
  2795. log.Infof("container %s/%s is running, so skipping unmount volumes", s.GetId(), input.ContainerId)
  2796. }
  2797. return nil
  2798. }
  2799. defer func() {
  2800. if err := deferUmount(); err != nil {
  2801. log.Errorf("deferUmount after snapshot error: %s", err)
  2802. } else {
  2803. log.Infof("defer umount success")
  2804. }
  2805. }()
  2806. snapshotPath, err := s.createSnapshot(params, tmpBackRootDir)
  2807. if err != nil {
  2808. return nil, errors.Wrap(err, "create snapshot")
  2809. }
  2810. res := jsonutils.NewDict()
  2811. res.Set("location", jsonutils.NewString(snapshotPath))
  2812. return res, nil
  2813. }
  2814. func (s *sPodGuestInstance) createSnapshot(params *SDiskSnapshot, hostPath string) (string, error) {
  2815. d := params.Disk
  2816. snapshotDir := d.GetSnapshotDir()
  2817. log.Infof("snapshotDir of LocalDisk %s: %s", d.GetId(), snapshotDir)
  2818. if !fileutils2.Exists(snapshotDir) {
  2819. output, err := procutils.NewCommand("mkdir", "-p", snapshotDir).Output()
  2820. if err != nil {
  2821. log.Errorf("mkdir %s failed: %s", snapshotDir, output)
  2822. return "", errors.Wrapf(err, "mkdir %s failed: %s", snapshotDir, output)
  2823. }
  2824. }
  2825. snapshotPath := s.getSnapshotPath(d, params.SnapshotId)
  2826. // tar hostPath to snapshotPath
  2827. input := params.BackupDiskConfig.BackupAsTar
  2828. if err := s.tarHostDir(hostPath, snapshotPath, input.IncludeFiles, input.ExcludeFiles, input.IgnoreNotExistFile, input.IncludePatterns); err != nil {
  2829. return "", errors.Wrapf(err, "tar host dir %s to %s", hostPath, snapshotPath)
  2830. }
  2831. return snapshotPath, nil
  2832. }
  2833. func (s *sPodGuestInstance) tarHostDir(srcDir, targetPath string,
  2834. includeFiles, excludeFiles []string,
  2835. ignoreNotExistFile bool,
  2836. includePatterns []string) error {
  2837. baseCmd := "tar"
  2838. filterNotExistFiles := func(files []string) []string {
  2839. result := []string{}
  2840. for i := range files {
  2841. if fileutils2.Exists(filepath.Join(srcDir, files[i])) {
  2842. result = append(result, files[i])
  2843. } else {
  2844. log.Warningf("tar path doesn't exist: %q", filepath.Join(srcDir, files[i]))
  2845. }
  2846. }
  2847. return result
  2848. }
  2849. if ignoreNotExistFile {
  2850. includeFiles = filterNotExistFiles(includeFiles)
  2851. excludeFiles = filterNotExistFiles(excludeFiles)
  2852. }
  2853. // 如果有 includePatterns,使用 find -name 找出匹配的路径,添加到 includeFiles 中
  2854. if len(includePatterns) > 0 {
  2855. findPatterns := []string{}
  2856. for _, pattern := range includePatterns {
  2857. // 转义特殊字符,但保留 glob 通配符
  2858. findPatterns = append(findPatterns, fmt.Sprintf("-name %s", shellQuote(pattern)))
  2859. }
  2860. // 构建 find 命令来查找匹配的路径
  2861. findCmd := fmt.Sprintf("find . \\( %s \\)", strings.Join(findPatterns, " -o "))
  2862. cmd := fmt.Sprintf("cd %s && %s", shellQuote(srcDir), findCmd)
  2863. log.Infof("[%s] find cmd: %s", s.GetName(), cmd)
  2864. out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output()
  2865. if err != nil {
  2866. return errors.Wrapf(err, "find matching paths: %s", out)
  2867. }
  2868. // 解析 find 的输出,将匹配的路径添加到 includeFiles 中
  2869. // find 输出的路径格式为 ./path,需要去掉开头的 ./
  2870. lines := strings.Split(strings.TrimSpace(string(out)), "\n")
  2871. for _, line := range lines {
  2872. line = strings.TrimSpace(line)
  2873. if line == "" {
  2874. continue
  2875. }
  2876. // 去掉开头的 ./
  2877. if strings.HasPrefix(line, "./") {
  2878. line = line[2:]
  2879. }
  2880. // 添加到 includeFiles 中
  2881. includeFiles = append(includeFiles, line)
  2882. }
  2883. }
  2884. // 没有 includePatterns 时,使用原来的逻辑
  2885. // 添加 --exclude 选项
  2886. for _, exclude := range excludeFiles {
  2887. baseCmd = fmt.Sprintf("%s --exclude=%s", baseCmd, shellQuote(exclude))
  2888. }
  2889. includeStr := "."
  2890. if len(includeFiles) > 0 {
  2891. for i := range includeFiles {
  2892. includeFiles[i] = shellQuote(includeFiles[i])
  2893. }
  2894. includeStr = strings.Join(includeFiles, " ")
  2895. }
  2896. cmd := fmt.Sprintf("%s --ignore-failed-read -cf %s -C %s %s", baseCmd, shellQuote(targetPath), shellQuote(srcDir), includeStr)
  2897. log.Infof("[%s] tar cmd: %s", s.GetName(), cmd)
  2898. if out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output(); err != nil {
  2899. outErr := errors.Wrapf(err, "%s: %s", cmd, out)
  2900. outMsg := strings.ToLower(outErr.Error())
  2901. // ref: https://stackoverflow.com/questions/20318852/tar-file-changed-as-we-read-it
  2902. const exitStatus1 = "exit status 1"
  2903. const fileChangedMsg = "file changed as we read it"
  2904. const fileRemovedMsg = "file removed before we read it"
  2905. const socketIgnoredMsg = "socket ignored"
  2906. if strings.Contains(outMsg, exitStatus1) {
  2907. for _, warningMsg := range []string{fileChangedMsg, fileRemovedMsg, socketIgnoredMsg} {
  2908. if strings.Contains(outMsg, warningMsg) {
  2909. log.Warningf("[%s] got some warning message when tar: %s", s.GetName(), outMsg)
  2910. return nil
  2911. }
  2912. }
  2913. }
  2914. return outErr
  2915. }
  2916. return nil
  2917. }
  2918. func (s *sPodGuestInstance) getSnapshotPath(d storageman.IDisk, snapshotId string) string {
  2919. snapshotDir := d.GetSnapshotDir()
  2920. snapshotPath := path.Join(snapshotDir, fmt.Sprintf("%s.tar", snapshotId))
  2921. return snapshotPath
  2922. }
  2923. func (s *sPodGuestInstance) DeleteSnapshot(ctx context.Context, params *SDeleteDiskSnapshot) (jsonutils.JSONObject, error) {
  2924. snapshotPath := s.getSnapshotPath(params.Disk, params.DeleteSnapshot)
  2925. out, err := procutils.NewCommand("rm", "-f", snapshotPath).Output()
  2926. if err != nil {
  2927. return nil, errors.Wrapf(err, "rm -f %s: %s", snapshotPath, out)
  2928. }
  2929. res := jsonutils.NewDict()
  2930. res.Set("deleted", jsonutils.JSONTrue)
  2931. return res, nil
  2932. }
  2933. func (s *sPodGuestInstance) doOnlineResizeDisk(ctx context.Context, disk storageman.IDisk, sizeMB int64) {
  2934. drv, err := disk.GetContainerStorageDriver()
  2935. if err != nil {
  2936. hostutils.TaskFailed(ctx, fmt.Sprintf("get disk storage driver %s", err))
  2937. return
  2938. }
  2939. partDev, found, err := drv.CheckConnect(disk.GetPath())
  2940. if err != nil {
  2941. hostutils.TaskFailed(ctx, fmt.Sprintf("disk check connect %s", err))
  2942. return
  2943. }
  2944. if !found {
  2945. hostutils.TaskFailed(ctx, fmt.Sprintf("online resize but loop device not connected"))
  2946. return
  2947. }
  2948. if err := disk.PreResize(ctx, sizeMB); err != nil {
  2949. hostutils.TaskFailed(ctx, fmt.Sprintf("PreResize failed %s", err))
  2950. return
  2951. }
  2952. diskInfo := jsonutils.NewDict()
  2953. diskInfo.Set("size", jsonutils.NewInt(sizeMB))
  2954. diskInfo.Set("loop_part_dev", jsonutils.NewString(partDev))
  2955. resizeDiskInfo := &storageman.SDiskResizeInput{
  2956. DiskInfo: diskInfo,
  2957. }
  2958. res, err := disk.Resize(ctx, resizeDiskInfo)
  2959. if err != nil {
  2960. hostutils.TaskFailed(ctx, fmt.Sprintf("PreResize failed %s", err))
  2961. return
  2962. }
  2963. hostutils.TaskComplete(ctx, res)
  2964. }
  2965. func (s *sPodGuestInstance) OnlineResizeDisk(ctx context.Context, disk storageman.IDisk, sizeMB int64) {
  2966. go s.doOnlineResizeDisk(ctx, disk, sizeMB)
  2967. }
  2968. func (s *sPodGuestInstance) ContainerExecSync(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecSyncInput) (jsonutils.JSONObject, error) {
  2969. ctrCriId, err := s.getContainerCRIId(ctrId)
  2970. if err != nil {
  2971. return nil, errors.Wrap(err, "get container cri id")
  2972. }
  2973. cli := s.getCRI().GetRuntimeClient()
  2974. resp, err := cli.ExecSync(ctx, &runtimeapi.ExecSyncRequest{
  2975. ContainerId: ctrCriId,
  2976. Cmd: input.Command,
  2977. Timeout: input.Timeout,
  2978. })
  2979. if err != nil {
  2980. return nil, errors.Wrapf(err, "exec sync %#v to %s", input.Command, ctrCriId)
  2981. }
  2982. return jsonutils.Marshal(&computeapi.ContainerExecSyncResponse{
  2983. Stdout: string(resp.Stdout),
  2984. Stderr: string(resp.Stderr),
  2985. ExitCode: resp.ExitCode,
  2986. }), nil
  2987. }
  2988. func (s *sPodGuestInstance) ReadLogs(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.PodLogOptions, stdout, stderr io.Writer) error {
  2989. // Do a zero-byte write to stdout before handing off to the container runtime.
  2990. // This ensures at least one Write call is made to the writer when copying starts,
  2991. // even if we then block waiting for log output from the container.
  2992. if _, err := stdout.Write([]byte{}); err != nil {
  2993. return err
  2994. }
  2995. ctrCriId, err := s.getContainerCRIId(ctrId)
  2996. if err != nil {
  2997. return errors.Wrapf(err, "get container cri id %s", ctrId)
  2998. }
  2999. resp, err := s.getCRI().ContainerStatus(ctx, ctrCriId)
  3000. if err != nil {
  3001. return errors.Wrapf(err, "get container status %s", ctrCriId)
  3002. }
  3003. logPath := resp.GetStatus().GetLogPath()
  3004. opts := logs.NewLogOptions(input, time.Now())
  3005. return logs.ReadLogs(ctx, logPath, ctrCriId, opts, s.getCRI().GetRuntimeClient(), stdout, stderr)
  3006. }
  3007. func (s *sPodGuestInstance) CommitContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCommitInput) (jsonutils.JSONObject, error) {
  3008. criId, err := s.getContainerCRIId(ctrId)
  3009. if err != nil {
  3010. return nil, errors.Wrap(err, "get container cri id")
  3011. }
  3012. // 1. commit
  3013. tool := NewContainerdNerdctl()
  3014. imgRepo, err := tool.Commit(criId, &nerdctl.CommitOptions{Repository: input.Repository})
  3015. if err != nil {
  3016. return nil, errors.Wrapf(err, "commit container %s image", ctrId)
  3017. }
  3018. log.Infof("container %s was commited to %s", ctrId, imgRepo)
  3019. // 2. push to repository
  3020. if err := PushContainerdImage(&hostapi.ContainerPushImageInput{
  3021. Image: imgRepo,
  3022. Auth: input.Auth,
  3023. }); err != nil {
  3024. return nil, errors.Wrapf(err, "push container %s image", ctrId)
  3025. }
  3026. log.Infof("container %s was pushed to %s", ctrId, imgRepo)
  3027. return jsonutils.Marshal(map[string]interface{}{
  3028. "image_repository": imgRepo,
  3029. }), nil
  3030. }
  3031. func (s *sPodGuestInstance) AddContainerVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerVolumeMountAddPostOverlayInput) error {
  3032. isRunning, err := s.IsContainerRunning(ctx, ctrId)
  3033. if err != nil {
  3034. return errors.Wrap(err, "check container is running")
  3035. }
  3036. if !isRunning {
  3037. return nil
  3038. }
  3039. ctrSpec := s.GetContainerById(ctrId)
  3040. vol := ctrSpec.Spec.VolumeMounts[input.Index]
  3041. drv := volume_mount.GetDriver(vol.Type)
  3042. diskDrv, ok := drv.(disk.IVolumeMountDisk)
  3043. if !ok {
  3044. return errors.Errorf("invalid disk volume driver of %s", vol.Type)
  3045. }
  3046. return diskDrv.MountPostOverlays(s, ctrId, vol, input.PostOverlay)
  3047. }
  3048. func (s *sPodGuestInstance) RemoveContainerVolumeMountPostOverlay(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerVolumeMountRemovePostOverlayInput) error {
  3049. ctrSpec := s.GetContainerById(ctrId)
  3050. vol := ctrSpec.Spec.VolumeMounts[input.Index]
  3051. drv := volume_mount.GetDriver(vol.Type)
  3052. diskDrv, ok := drv.(disk.IVolumeMountDisk)
  3053. if !ok {
  3054. return errors.Errorf("invalid disk volume driver of %s", vol.Type)
  3055. }
  3056. // drv.Mount 不会重复挂载,支持重复调用
  3057. if err := drv.Mount(s, ctrId, vol); err != nil {
  3058. return errors.Wrapf(err, "mount volume %s, ctrId %s", jsonutils.Marshal(vol), ctrId)
  3059. }
  3060. return diskDrv.UnmountPostOverlays(s, ctrId, vol, input.PostOverlay, input.UseLazy, input.ClearLayers)
  3061. }
  3062. func (s *sPodGuestInstance) SetNicDown(mac string) error {
  3063. // null operation for pod, QIUJIAN, 20260203
  3064. return nil
  3065. }
  3066. func (s *sPodGuestInstance) SetNicUp(nic *desc.SGuestNetwork) error {
  3067. // null operation for pod, QIUJIAN, 20260203
  3068. return nil
  3069. }