runtime_manager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. "context"
  17. "encoding/json"
  18. "net"
  19. "sort"
  20. "time"
  21. runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/pkg/utils"
  25. "yunion.io/x/onecloud/pkg/util/pod"
  26. )
  27. const (
  28. runtimeAPIVersion = "0.1.0"
  29. )
  30. type runtimeManager struct {
  31. runtimeName string
  32. // gRPC service clients.
  33. cri pod.CRI
  34. }
  35. func NewRuntimeManager(cri pod.CRI) (Runtime, error) {
  36. man := &runtimeManager{
  37. cri: cri,
  38. }
  39. typedVersion, err := man.getTypedVersion()
  40. if err != nil {
  41. return nil, errors.Wrap(err, "getTypedVersion")
  42. }
  43. man.runtimeName = typedVersion.RuntimeName
  44. log.Infof("Container runtime %s initialized, version: %s, apiVersion: %s", typedVersion.RuntimeName, typedVersion.RuntimeVersion, typedVersion.RuntimeApiVersion)
  45. return man, nil
  46. }
  47. func (m *runtimeManager) getTypedVersion() (*runtimeapi.VersionResponse, error) {
  48. resp, err := m.cri.GetRuntimeClient().Version(context.Background(), &runtimeapi.VersionRequest{Version: runtimeAPIVersion})
  49. if err != nil {
  50. return nil, errors.Wrap(err, "get remote runtime typed version")
  51. }
  52. return resp, nil
  53. }
  54. // Type returns the type of the container runtime.
  55. func (m *runtimeManager) Type() string {
  56. return m.runtimeName
  57. }
  58. // getSandboxes lists all (or just the running) sandboxes.
  59. func (m *runtimeManager) getSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {
  60. var filter *runtimeapi.PodSandboxFilter
  61. if !all {
  62. readyState := runtimeapi.PodSandboxState_SANDBOX_READY
  63. filter = &runtimeapi.PodSandboxFilter{
  64. State: &runtimeapi.PodSandboxStateValue{
  65. State: readyState,
  66. },
  67. }
  68. }
  69. resp, err := m.cri.GetRuntimeClient().ListPodSandbox(context.Background(), &runtimeapi.ListPodSandboxRequest{
  70. Filter: filter,
  71. })
  72. if err != nil {
  73. log.Errorf("ListPodSandbox failed: %v", err)
  74. return nil, err
  75. }
  76. return resp.Items, nil
  77. }
  78. type ContainerRuntimeSpec struct {
  79. Annotations map[string]string `json:"annotations"`
  80. }
  81. type ContainerExtraInfo struct {
  82. SandboxID string `json:"sandbox_id"`
  83. Pid int `json:"pid"`
  84. RuntimeSpec ContainerRuntimeSpec `json:"runtimeSpec"`
  85. }
  86. // GetPods returns a list of containers grouped by pods. The boolean parameter
  87. // specifies whether the runtime returns all containers including those already
  88. // exited and dead containers (used for garbage collection).
  89. func (m *runtimeManager) GetPods(all bool) ([]*Pod, error) {
  90. pods := make(map[string]*Pod)
  91. sandboxes, err := m.getSandboxes(all)
  92. if err != nil {
  93. return nil, err
  94. }
  95. for i := range sandboxes {
  96. s := sandboxes[i]
  97. if s.Metadata == nil {
  98. log.Infof("Sandbox does not have metadata: %#v", s)
  99. continue
  100. }
  101. podUid := s.Metadata.Uid
  102. if _, ok := pods[podUid]; !ok {
  103. pods[podUid] = &Pod{
  104. Id: podUid,
  105. CRIId: s.Id,
  106. Name: s.Metadata.Name,
  107. Namespace: s.Metadata.Namespace,
  108. }
  109. }
  110. p := pods[podUid]
  111. converted, err := m.sandboxToContainer(s)
  112. if err != nil {
  113. log.Infof("Convert %q sandbox %v of pod %q failed: %v", m.runtimeName, s, podUid, err)
  114. continue
  115. }
  116. p.Sandboxes = append(p.Sandboxes, converted)
  117. }
  118. containers, err := m.getContainers(all)
  119. if err != nil {
  120. return nil, err
  121. }
  122. for i := range containers {
  123. c := containers[i]
  124. if c.Metadata == nil {
  125. log.Infof("Container does not have metadata: %+v", c)
  126. continue
  127. }
  128. labelledInfo := getContainerInfoFromLabels(c.Labels, c.Annotations)
  129. if labelledInfo.PodUid == "" {
  130. // 旧的容器没设置 labels 标签,需要从 status.info.runtimeSpec.annotations 里面找 pod 关联信息
  131. resp, err := m.cri.GetRuntimeClient().ContainerStatus(context.Background(), &runtimeapi.ContainerStatusRequest{
  132. ContainerId: c.Id,
  133. Verbose: true,
  134. })
  135. if err != nil {
  136. log.Infof("get container %s status failed: %v", c.GetId(), err)
  137. continue
  138. }
  139. infoStr, ok := resp.GetInfo()["info"]
  140. if !ok {
  141. log.Infof("not found container %s info", c.GetId())
  142. continue
  143. }
  144. info := new(ContainerExtraInfo)
  145. if err := json.Unmarshal([]byte(infoStr), info); err != nil {
  146. log.Infof("unmarshal container %s info failed: %v", c.GetId(), err)
  147. continue
  148. }
  149. labelledInfo = getContainerInfoFromLabels(nil, info.RuntimeSpec.Annotations)
  150. }
  151. pod, found := pods[labelledInfo.PodUid]
  152. if !found {
  153. pod = &Pod{
  154. Id: labelledInfo.PodUid,
  155. Name: labelledInfo.PodName,
  156. Namespace: labelledInfo.PodNamespace,
  157. }
  158. pods[labelledInfo.PodUid] = pod
  159. }
  160. converted, err := m.toContainer(c)
  161. if err != nil {
  162. log.Warningf("Convert %s container %v of pod %q failed: %v", m.runtimeName, c, labelledInfo.PodUid, err)
  163. continue
  164. }
  165. pod.Containers = append(pod.Containers, converted)
  166. }
  167. // convert map to list.
  168. var result []*Pod
  169. for i := range pods {
  170. result = append(result, pods[i])
  171. }
  172. return result, nil
  173. }
  174. func (m *runtimeManager) getContainers(allContainers bool) ([]*runtimeapi.Container, error) {
  175. filter := &runtimeapi.ContainerFilter{}
  176. if !allContainers {
  177. filter.State = &runtimeapi.ContainerStateValue{
  178. State: runtimeapi.ContainerState_CONTAINER_RUNNING,
  179. }
  180. }
  181. containers, err := m.cri.GetRuntimeClient().ListContainers(context.Background(), &runtimeapi.ListContainersRequest{Filter: filter})
  182. if err != nil {
  183. return nil, errors.Wrap(err, "ListContainers failed")
  184. }
  185. return containers.Containers, nil
  186. }
  187. func (m *runtimeManager) GetPodStatus(uid, name, namespace string) (*PodStatus, error) {
  188. // Now we retain restart count of container as a container label. Each time a container
  189. // restarts, pod will read the restart count from the registered dead container, increment
  190. // it to get the new restart count, and then add a label with the new restart count on
  191. // the newly started container.
  192. // However, there are some limitations of this method:
  193. // 1. When all dead containers were garbage collected, the container status could
  194. // not get the historical value and would be *inaccurate*. Fortunately, the chance
  195. // is really slim.
  196. // 2. When working with old version containers which have no restart count label,
  197. // we can only assume their restart count is 0.
  198. // Anyhow, we only promised "best-effort" restart count reporting, we can just ignore
  199. // these limitations now.
  200. podSandboxIDs, err := m.getSandboxIDByPodUID(uid, nil)
  201. if err != nil {
  202. return nil, err
  203. }
  204. podFullName := BuildPodFullName(name, namespace)
  205. log.Debugf("getSandboxIDByPodUID got sandbox IDs %q for pod %q", podSandboxIDs, podFullName)
  206. sandboxStatuses := make([]*runtimeapi.PodSandboxStatus, len(podSandboxIDs))
  207. podIPs := []string{}
  208. for idx, podSandboxID := range podSandboxIDs {
  209. req := &runtimeapi.PodSandboxStatusRequest{
  210. PodSandboxId: podSandboxID,
  211. }
  212. resp, err := m.cri.GetRuntimeClient().PodSandboxStatus(context.Background(), req)
  213. if err != nil {
  214. log.Errorf("PodSandboxStatus of sandbox %q for pod %q error: %v", podSandboxID, podFullName, err)
  215. return nil, err
  216. }
  217. podSandboxStatus := resp.Status
  218. sandboxStatuses[idx] = podSandboxStatus
  219. // Only get pod IP from latest sandbox
  220. if idx == 0 && podSandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
  221. podIPs = m.determinePodSandboxIPs(podSandboxStatus)
  222. }
  223. }
  224. // Get statuses of all containers visible in the pod.
  225. containerStatuses, err := m.getPodContainerStatuses(uid, podSandboxIDs)
  226. if err != nil {
  227. log.Errorf("getPodContainerStatuses for pod %q failed: %v", podFullName, err)
  228. return nil, err
  229. }
  230. return &PodStatus{
  231. ID: uid,
  232. Name: name,
  233. Namespace: namespace,
  234. IPs: podIPs,
  235. SandboxStatuses: sandboxStatuses,
  236. ContainerStatuses: containerStatuses,
  237. }, nil
  238. }
  239. func GetSandboxIDByPodUID(cri pod.CRI, podUID string, state *runtimeapi.PodSandboxState) ([]string, error) {
  240. filter := &runtimeapi.PodSandboxFilter{
  241. LabelSelector: map[string]string{
  242. PodUIDLabel: podUID,
  243. },
  244. }
  245. if state != nil {
  246. filter.State = &runtimeapi.PodSandboxStateValue{
  247. State: *state,
  248. }
  249. }
  250. resp, err := cri.GetRuntimeClient().ListPodSandbox(context.Background(), &runtimeapi.ListPodSandboxRequest{Filter: filter})
  251. if err != nil {
  252. return nil, errors.Wrap(err, "ListPodSandbox failed")
  253. }
  254. sandboxes := resp.Items
  255. if len(sandboxes) == 0 {
  256. // 兼容旧版没有打标签的 pods
  257. pods, err := cri.ListPods(context.Background(), pod.ListPodOptions{})
  258. if err != nil {
  259. return nil, errors.Wrap(err, "List all pods failed")
  260. }
  261. for i := range pods {
  262. item := pods[i]
  263. if item.Metadata.Uid == podUID {
  264. sandboxes = append(sandboxes, item)
  265. }
  266. }
  267. }
  268. // Sort with newest first.
  269. sandboxIDs := make([]string, len(sandboxes))
  270. for i, s := range sandboxes {
  271. sandboxIDs[i] = s.Id
  272. }
  273. return sandboxIDs, nil
  274. }
  275. func (m *runtimeManager) getSandboxIDByPodUID(podUID string, state *runtimeapi.PodSandboxState) ([]string, error) {
  276. return GetSandboxIDByPodUID(m.cri, podUID, state)
  277. }
  278. // determinePodSandboxIP determines the IP addresses of the given pod sandbox.
  279. func (m *runtimeManager) determinePodSandboxIPs(podSandbox *runtimeapi.PodSandboxStatus) []string {
  280. podIPs := make([]string, 0)
  281. if podSandbox.Network == nil {
  282. log.Warningf("Pod Sandbox status doesn't have network information, cannot report IPs")
  283. return podIPs
  284. }
  285. // ip could be an empty string if runtime is not responsible for the
  286. // IP (e.g., host networking).
  287. // pick primary IP
  288. if len(podSandbox.Network.Ip) != 0 {
  289. if net.ParseIP(podSandbox.Network.Ip) == nil {
  290. log.Warningf("Pod Sandbox reported an unparseable IP (Primary) %v", podSandbox.Network.Ip)
  291. return nil
  292. }
  293. podIPs = append(podIPs, podSandbox.Network.Ip)
  294. }
  295. // pick additional ips, if cri reported them
  296. for _, podIP := range podSandbox.Network.AdditionalIps {
  297. if nil == net.ParseIP(podIP.Ip) {
  298. log.Warningf("Pod Sandbox reported an unparseable IP (additional) %v", podIP.Ip)
  299. return nil
  300. }
  301. podIPs = append(podIPs, podIP.Ip)
  302. }
  303. return podIPs
  304. }
  305. func (m *runtimeManager) getPodContainerStatuses(uid string, criId []string) ([]*Status, error) {
  306. resp, err := m.cri.GetRuntimeClient().ListContainers(context.Background(), &runtimeapi.ListContainersRequest{Filter: &runtimeapi.ContainerFilter{
  307. LabelSelector: map[string]string{PodUIDLabel: uid},
  308. }})
  309. if err != nil {
  310. return nil, errors.Wrap(err, "ListContainers with label selector failed")
  311. }
  312. containers := resp.Containers
  313. if len(containers) == 0 {
  314. // 兼容旧版没有打标签的容器
  315. allContainers, err := m.cri.ListContainers(context.Background(), pod.ListContainerOptions{})
  316. if err != nil {
  317. return nil, errors.Wrapf(err, "ListContainers by pod uid: %s", uid)
  318. }
  319. for i := range allContainers {
  320. container := allContainers[i]
  321. if utils.IsInStringArray(container.PodSandboxId, criId) {
  322. containers = append(containers, container)
  323. }
  324. }
  325. }
  326. statuses := make([]*Status, len(containers))
  327. for i, c := range containers {
  328. sResp, err := m.cri.ContainerStatus(context.Background(), c.Id)
  329. if err != nil {
  330. return nil, errors.Wrapf(err, "ContainerStatus by container id: %s", c.Id)
  331. }
  332. status := sResp.Status
  333. cStatus := ToContainerStatus(status, m.runtimeName)
  334. cStatus.PodSandboxID = c.PodSandboxId
  335. statuses[i] = cStatus
  336. }
  337. sort.Sort(containerStatusByCreated(statuses))
  338. return statuses, nil
  339. }
  340. func ToContainerStatus(status *runtimeapi.ContainerStatus, runtimeName string) *Status {
  341. annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
  342. labeledInfo := getContainerInfoFromLabels(status.Labels, status.Annotations)
  343. cStatus := &Status{
  344. ID: ContainerID{
  345. Type: runtimeName,
  346. ID: status.Id,
  347. },
  348. Name: labeledInfo.ContainerName,
  349. Image: status.Image.Image,
  350. ImageID: status.ImageRef,
  351. State: toContainerState(status.State),
  352. CreatedAt: time.Unix(0, status.CreatedAt),
  353. }
  354. if annotatedInfo != nil {
  355. // cStatus.Hash = annotatedInfo.Hash
  356. cStatus.RestartCount = annotatedInfo.RestartCount
  357. }
  358. if status.State != runtimeapi.ContainerState_CONTAINER_CREATED {
  359. // If container is not in the created state, we have tried and
  360. // started the container. Set the StartedAt time.
  361. cStatus.StartedAt = time.Unix(0, status.StartedAt)
  362. }
  363. if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {
  364. cStatus.Reason = status.Reason
  365. cStatus.Message = status.Message
  366. cStatus.ExitCode = int(status.ExitCode)
  367. cStatus.FinishedAt = time.Unix(0, status.FinishedAt)
  368. }
  369. return cStatus
  370. }