client.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. // Copyright 2017 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package containerd
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "net"
  20. "sync"
  21. "time"
  22. ptypes "github.com/gogo/protobuf/types"
  23. "google.golang.org/grpc"
  24. "google.golang.org/grpc/backoff"
  25. "google.golang.org/grpc/credentials/insecure"
  26. "github.com/google/cadvisor/container/containerd/containers"
  27. "github.com/google/cadvisor/container/containerd/errdefs"
  28. "github.com/google/cadvisor/container/containerd/pkg/dialer"
  29. containersapi "github.com/google/cadvisor/third_party/containerd/api/services/containers/v1"
  30. tasksapi "github.com/google/cadvisor/third_party/containerd/api/services/tasks/v1"
  31. versionapi "github.com/google/cadvisor/third_party/containerd/api/services/version/v1"
  32. tasktypes "github.com/google/cadvisor/third_party/containerd/api/types/task"
  33. )
  34. type client struct {
  35. containerService containersapi.ContainersClient
  36. taskService tasksapi.TasksClient
  37. versionService versionapi.VersionClient
  38. }
  39. type ContainerdClient interface {
  40. LoadContainer(ctx context.Context, id string) (*containers.Container, error)
  41. TaskPid(ctx context.Context, id string) (uint32, error)
  42. Version(ctx context.Context) (string, error)
  43. }
  44. var (
  45. ErrTaskIsInUnknownState = errors.New("containerd task is in unknown state") // used when process reported in containerd task is in Unknown State
  46. )
  47. var once sync.Once
  48. var ctrdClient ContainerdClient = nil
  49. const (
  50. maxBackoffDelay = 3 * time.Second
  51. baseBackoffDelay = 100 * time.Millisecond
  52. connectionTimeout = 2 * time.Second
  53. )
  54. // Client creates a containerd client
  55. func Client(address, namespace string) (ContainerdClient, error) {
  56. var retErr error
  57. once.Do(func() {
  58. tryConn, err := net.DialTimeout("unix", address, connectionTimeout)
  59. if err != nil {
  60. retErr = fmt.Errorf("containerd: cannot unix dial containerd api service: %v", err)
  61. return
  62. }
  63. tryConn.Close()
  64. connParams := grpc.ConnectParams{
  65. Backoff: backoff.DefaultConfig,
  66. }
  67. connParams.Backoff.BaseDelay = baseBackoffDelay
  68. connParams.Backoff.MaxDelay = maxBackoffDelay
  69. gopts := []grpc.DialOption{
  70. grpc.WithTransportCredentials(insecure.NewCredentials()),
  71. grpc.WithContextDialer(dialer.ContextDialer),
  72. grpc.WithBlock(),
  73. grpc.WithConnectParams(connParams),
  74. }
  75. unary, stream := newNSInterceptors(namespace)
  76. gopts = append(gopts,
  77. grpc.WithUnaryInterceptor(unary),
  78. grpc.WithStreamInterceptor(stream),
  79. )
  80. ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
  81. defer cancel()
  82. conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...)
  83. if err != nil {
  84. retErr = err
  85. return
  86. }
  87. ctrdClient = &client{
  88. containerService: containersapi.NewContainersClient(conn),
  89. taskService: tasksapi.NewTasksClient(conn),
  90. versionService: versionapi.NewVersionClient(conn),
  91. }
  92. })
  93. return ctrdClient, retErr
  94. }
  95. func (c *client) LoadContainer(ctx context.Context, id string) (*containers.Container, error) {
  96. r, err := c.containerService.Get(ctx, &containersapi.GetContainerRequest{
  97. ID: id,
  98. })
  99. if err != nil {
  100. return nil, errdefs.FromGRPC(err)
  101. }
  102. return containerFromProto(r.Container), nil
  103. }
  104. func (c *client) TaskPid(ctx context.Context, id string) (uint32, error) {
  105. response, err := c.taskService.Get(ctx, &tasksapi.GetRequest{
  106. ContainerID: id,
  107. })
  108. if err != nil {
  109. return 0, errdefs.FromGRPC(err)
  110. }
  111. if response.Process.Status == tasktypes.StatusUnknown {
  112. return 0, ErrTaskIsInUnknownState
  113. }
  114. return response.Process.Pid, nil
  115. }
  116. func (c *client) Version(ctx context.Context) (string, error) {
  117. response, err := c.versionService.Version(ctx, &ptypes.Empty{})
  118. if err != nil {
  119. return "", errdefs.FromGRPC(err)
  120. }
  121. return response.Version, nil
  122. }
  123. func containerFromProto(containerpb containersapi.Container) *containers.Container {
  124. var runtime containers.RuntimeInfo
  125. if containerpb.Runtime != nil {
  126. runtime = containers.RuntimeInfo{
  127. Name: containerpb.Runtime.Name,
  128. Options: containerpb.Runtime.Options,
  129. }
  130. }
  131. return &containers.Container{
  132. ID: containerpb.ID,
  133. Labels: containerpb.Labels,
  134. Image: containerpb.Image,
  135. Runtime: runtime,
  136. Spec: containerpb.Spec,
  137. Snapshotter: containerpb.Snapshotter,
  138. SnapshotKey: containerpb.SnapshotKey,
  139. Extensions: containerpb.Extensions,
  140. }
  141. }