client.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886
  1. /*
  2. Copyright The containerd Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package containerd
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "runtime"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. containersapi "github.com/containerd/containerd/api/services/containers/v1"
  25. contentapi "github.com/containerd/containerd/api/services/content/v1"
  26. diffapi "github.com/containerd/containerd/api/services/diff/v1"
  27. eventsapi "github.com/containerd/containerd/api/services/events/v1"
  28. imagesapi "github.com/containerd/containerd/api/services/images/v1"
  29. introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
  30. leasesapi "github.com/containerd/containerd/api/services/leases/v1"
  31. namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
  32. sandboxsapi "github.com/containerd/containerd/api/services/sandbox/v1"
  33. snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
  34. "github.com/containerd/containerd/api/services/tasks/v1"
  35. versionservice "github.com/containerd/containerd/api/services/version/v1"
  36. apitypes "github.com/containerd/containerd/api/types"
  37. "github.com/containerd/containerd/containers"
  38. "github.com/containerd/containerd/content"
  39. contentproxy "github.com/containerd/containerd/content/proxy"
  40. "github.com/containerd/containerd/defaults"
  41. "github.com/containerd/containerd/errdefs"
  42. "github.com/containerd/containerd/events"
  43. "github.com/containerd/containerd/images"
  44. "github.com/containerd/containerd/leases"
  45. leasesproxy "github.com/containerd/containerd/leases/proxy"
  46. "github.com/containerd/containerd/namespaces"
  47. "github.com/containerd/containerd/pkg/dialer"
  48. "github.com/containerd/containerd/platforms"
  49. "github.com/containerd/containerd/plugin"
  50. ptypes "github.com/containerd/containerd/protobuf/types"
  51. "github.com/containerd/containerd/remotes"
  52. "github.com/containerd/containerd/remotes/docker"
  53. "github.com/containerd/containerd/sandbox"
  54. sandboxproxy "github.com/containerd/containerd/sandbox/proxy"
  55. "github.com/containerd/containerd/services/introspection"
  56. "github.com/containerd/containerd/snapshots"
  57. snproxy "github.com/containerd/containerd/snapshots/proxy"
  58. "github.com/containerd/typeurl/v2"
  59. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  60. "github.com/opencontainers/runtime-spec/specs-go"
  61. "golang.org/x/sync/semaphore"
  62. "google.golang.org/grpc"
  63. "google.golang.org/grpc/backoff"
  64. "google.golang.org/grpc/credentials/insecure"
  65. "google.golang.org/grpc/health/grpc_health_v1"
  66. )
  67. func init() {
  68. const prefix = "types.containerd.io"
  69. // register TypeUrls for commonly marshaled external types
  70. major := strconv.Itoa(specs.VersionMajor)
  71. typeurl.Register(&specs.Spec{}, prefix, "opencontainers/runtime-spec", major, "Spec")
  72. typeurl.Register(&specs.Process{}, prefix, "opencontainers/runtime-spec", major, "Process")
  73. typeurl.Register(&specs.LinuxResources{}, prefix, "opencontainers/runtime-spec", major, "LinuxResources")
  74. typeurl.Register(&specs.WindowsResources{}, prefix, "opencontainers/runtime-spec", major, "WindowsResources")
  75. }
  76. // New returns a new containerd client that is connected to the containerd
  77. // instance provided by address
  78. func New(address string, opts ...ClientOpt) (*Client, error) {
  79. var copts clientOpts
  80. for _, o := range opts {
  81. if err := o(&copts); err != nil {
  82. return nil, err
  83. }
  84. }
  85. if copts.timeout == 0 {
  86. copts.timeout = 10 * time.Second
  87. }
  88. c := &Client{
  89. defaultns: copts.defaultns,
  90. }
  91. if copts.defaultRuntime != "" {
  92. c.runtime = copts.defaultRuntime
  93. } else {
  94. c.runtime = defaults.DefaultRuntime
  95. }
  96. if copts.defaultPlatform != nil {
  97. c.platform = copts.defaultPlatform
  98. } else {
  99. c.platform = platforms.Default()
  100. }
  101. if copts.services != nil {
  102. c.services = *copts.services
  103. }
  104. if address != "" {
  105. backoffConfig := backoff.DefaultConfig
  106. backoffConfig.MaxDelay = 3 * time.Second
  107. connParams := grpc.ConnectParams{
  108. Backoff: backoffConfig,
  109. }
  110. gopts := []grpc.DialOption{
  111. grpc.WithBlock(),
  112. grpc.WithTransportCredentials(insecure.NewCredentials()),
  113. grpc.FailOnNonTempDialError(true),
  114. grpc.WithConnectParams(connParams),
  115. grpc.WithContextDialer(dialer.ContextDialer),
  116. grpc.WithReturnConnectionError(),
  117. }
  118. if len(copts.dialOptions) > 0 {
  119. gopts = copts.dialOptions
  120. }
  121. gopts = append(gopts, grpc.WithDefaultCallOptions(
  122. grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize),
  123. grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)))
  124. if len(copts.callOptions) > 0 {
  125. gopts = append(gopts, grpc.WithDefaultCallOptions(copts.callOptions...))
  126. }
  127. if copts.defaultns != "" {
  128. unary, stream := newNSInterceptors(copts.defaultns)
  129. gopts = append(gopts, grpc.WithChainUnaryInterceptor(unary))
  130. gopts = append(gopts, grpc.WithChainStreamInterceptor(stream))
  131. }
  132. connector := func() (*grpc.ClientConn, error) {
  133. ctx, cancel := context.WithTimeout(context.Background(), copts.timeout)
  134. defer cancel()
  135. conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...)
  136. if err != nil {
  137. return nil, fmt.Errorf("failed to dial %q: %w", address, err)
  138. }
  139. return conn, nil
  140. }
  141. conn, err := connector()
  142. if err != nil {
  143. return nil, err
  144. }
  145. c.conn, c.connector = conn, connector
  146. }
  147. if copts.services == nil && c.conn == nil {
  148. return nil, fmt.Errorf("no grpc connection or services is available: %w", errdefs.ErrUnavailable)
  149. }
  150. // check namespace labels for default runtime
  151. if copts.defaultRuntime == "" && c.defaultns != "" {
  152. if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
  153. return nil, err
  154. } else if label != "" {
  155. c.runtime = label
  156. }
  157. }
  158. return c, nil
  159. }
  160. // NewWithConn returns a new containerd client that is connected to the containerd
  161. // instance provided by the connection
  162. func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) {
  163. var copts clientOpts
  164. for _, o := range opts {
  165. if err := o(&copts); err != nil {
  166. return nil, err
  167. }
  168. }
  169. c := &Client{
  170. defaultns: copts.defaultns,
  171. conn: conn,
  172. runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
  173. }
  174. if copts.defaultPlatform != nil {
  175. c.platform = copts.defaultPlatform
  176. } else {
  177. c.platform = platforms.Default()
  178. }
  179. // check namespace labels for default runtime
  180. if copts.defaultRuntime == "" && c.defaultns != "" {
  181. if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
  182. return nil, err
  183. } else if label != "" {
  184. c.runtime = label
  185. }
  186. }
  187. if copts.services != nil {
  188. c.services = *copts.services
  189. }
  190. return c, nil
  191. }
  192. // Client is the client to interact with containerd and its various services
  193. // using a uniform interface
  194. type Client struct {
  195. services
  196. connMu sync.Mutex
  197. conn *grpc.ClientConn
  198. runtime string
  199. defaultns string
  200. platform platforms.MatchComparer
  201. connector func() (*grpc.ClientConn, error)
  202. }
  203. // Reconnect re-establishes the GRPC connection to the containerd daemon
  204. func (c *Client) Reconnect() error {
  205. if c.connector == nil {
  206. return fmt.Errorf("unable to reconnect to containerd, no connector available: %w", errdefs.ErrUnavailable)
  207. }
  208. c.connMu.Lock()
  209. defer c.connMu.Unlock()
  210. c.conn.Close()
  211. conn, err := c.connector()
  212. if err != nil {
  213. return err
  214. }
  215. c.conn = conn
  216. return nil
  217. }
  218. // Runtime returns the name of the runtime being used
  219. func (c *Client) Runtime() string {
  220. return c.runtime
  221. }
  222. // IsServing returns true if the client can successfully connect to the
  223. // containerd daemon and the healthcheck service returns the SERVING
  224. // response.
  225. // This call will block if a transient error is encountered during
  226. // connection. A timeout can be set in the context to ensure it returns
  227. // early.
  228. func (c *Client) IsServing(ctx context.Context) (bool, error) {
  229. c.connMu.Lock()
  230. if c.conn == nil {
  231. c.connMu.Unlock()
  232. return false, fmt.Errorf("no grpc connection available: %w", errdefs.ErrUnavailable)
  233. }
  234. c.connMu.Unlock()
  235. r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.WaitForReady(true))
  236. if err != nil {
  237. return false, err
  238. }
  239. return r.Status == grpc_health_v1.HealthCheckResponse_SERVING, nil
  240. }
  241. // Containers returns all containers created in containerd
  242. func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container, error) {
  243. r, err := c.ContainerService().List(ctx, filters...)
  244. if err != nil {
  245. return nil, err
  246. }
  247. var out []Container
  248. for _, container := range r {
  249. out = append(out, containerFromRecord(c, container))
  250. }
  251. return out, nil
  252. }
  253. // NewContainer will create a new container with the provided id.
  254. // The id must be unique within the namespace.
  255. func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
  256. ctx, done, err := c.WithLease(ctx)
  257. if err != nil {
  258. return nil, err
  259. }
  260. defer done(ctx)
  261. container := containers.Container{
  262. ID: id,
  263. Runtime: containers.RuntimeInfo{
  264. Name: c.runtime,
  265. },
  266. }
  267. for _, o := range opts {
  268. if err := o(ctx, c, &container); err != nil {
  269. return nil, err
  270. }
  271. }
  272. r, err := c.ContainerService().Create(ctx, container)
  273. if err != nil {
  274. return nil, err
  275. }
  276. return containerFromRecord(c, r), nil
  277. }
  278. // LoadContainer loads an existing container from metadata
  279. func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) {
  280. r, err := c.ContainerService().Get(ctx, id)
  281. if err != nil {
  282. return nil, err
  283. }
  284. return containerFromRecord(c, r), nil
  285. }
  286. // RemoteContext is used to configure object resolutions and transfers with
  287. // remote content stores and image providers.
  288. type RemoteContext struct {
  289. // Resolver is used to resolve names to objects, fetchers, and pushers.
  290. // If no resolver is provided, defaults to Docker registry resolver.
  291. Resolver remotes.Resolver
  292. // PlatformMatcher is used to match the platforms for an image
  293. // operation and define the preference when a single match is required
  294. // from multiple platforms.
  295. PlatformMatcher platforms.MatchComparer
  296. // Unpack is done after an image is pulled to extract into a snapshotter.
  297. // It is done simultaneously for schema 2 images when they are pulled.
  298. // If an image is not unpacked on pull, it can be unpacked any time
  299. // afterwards. Unpacking is required to run an image.
  300. Unpack bool
  301. // UnpackOpts handles options to the unpack call.
  302. UnpackOpts []UnpackOpt
  303. // Snapshotter used for unpacking
  304. Snapshotter string
  305. // SnapshotterOpts are additional options to be passed to a snapshotter during pull
  306. SnapshotterOpts []snapshots.Opt
  307. // Labels to be applied to the created image
  308. Labels map[string]string
  309. // BaseHandlers are a set of handlers which get are called on dispatch.
  310. // These handlers always get called before any operation specific
  311. // handlers.
  312. BaseHandlers []images.Handler
  313. // HandlerWrapper wraps the handler which gets sent to dispatch.
  314. // Unlike BaseHandlers, this can run before and after the built
  315. // in handlers, allowing operations to run on the descriptor
  316. // after it has completed transferring.
  317. HandlerWrapper func(images.Handler) images.Handler
  318. // ConvertSchema1 is whether to convert Docker registry schema 1
  319. // manifests. If this option is false then any image which resolves
  320. // to schema 1 will return an error since schema 1 is not supported.
  321. //
  322. // Deprecated: use Schema 2 or OCI images.
  323. ConvertSchema1 bool
  324. // Platforms defines which platforms to handle when doing the image operation.
  325. // Platforms is ignored when a PlatformMatcher is set, otherwise the
  326. // platforms will be used to create a PlatformMatcher with no ordering
  327. // preference.
  328. Platforms []string
  329. // MaxConcurrentDownloads is the max concurrent content downloads for each pull.
  330. MaxConcurrentDownloads int
  331. // MaxConcurrentUploadedLayers is the max concurrent uploaded layers for each push.
  332. MaxConcurrentUploadedLayers int
  333. // AllMetadata downloads all manifests and known-configuration files
  334. AllMetadata bool
  335. // ChildLabelMap sets the labels used to reference child objects in the content
  336. // store. By default, all GC reference labels will be set for all fetched content.
  337. ChildLabelMap func(ocispec.Descriptor) []string
  338. }
  339. func defaultRemoteContext() *RemoteContext {
  340. return &RemoteContext{
  341. Resolver: docker.NewResolver(docker.ResolverOptions{}),
  342. }
  343. }
  344. // Fetch downloads the provided content into containerd's content store
  345. // and returns a non-platform specific image reference
  346. func (c *Client) Fetch(ctx context.Context, ref string, opts ...RemoteOpt) (images.Image, error) {
  347. fetchCtx := defaultRemoteContext()
  348. for _, o := range opts {
  349. if err := o(c, fetchCtx); err != nil {
  350. return images.Image{}, err
  351. }
  352. }
  353. if fetchCtx.Unpack {
  354. return images.Image{}, fmt.Errorf("unpack on fetch not supported, try pull: %w", errdefs.ErrNotImplemented)
  355. }
  356. if fetchCtx.PlatformMatcher == nil {
  357. if len(fetchCtx.Platforms) == 0 {
  358. fetchCtx.PlatformMatcher = platforms.All
  359. } else {
  360. var ps []ocispec.Platform
  361. for _, s := range fetchCtx.Platforms {
  362. p, err := platforms.Parse(s)
  363. if err != nil {
  364. return images.Image{}, fmt.Errorf("invalid platform %s: %w", s, err)
  365. }
  366. ps = append(ps, p)
  367. }
  368. fetchCtx.PlatformMatcher = platforms.Any(ps...)
  369. }
  370. }
  371. ctx, done, err := c.WithLease(ctx)
  372. if err != nil {
  373. return images.Image{}, err
  374. }
  375. defer done(ctx)
  376. img, err := c.fetch(ctx, fetchCtx, ref, 0)
  377. if err != nil {
  378. return images.Image{}, err
  379. }
  380. return c.createNewImage(ctx, img)
  381. }
  382. // Push uploads the provided content to a remote resource
  383. func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error {
  384. pushCtx := defaultRemoteContext()
  385. for _, o := range opts {
  386. if err := o(c, pushCtx); err != nil {
  387. return err
  388. }
  389. }
  390. if pushCtx.PlatformMatcher == nil {
  391. if len(pushCtx.Platforms) > 0 {
  392. var ps []ocispec.Platform
  393. for _, platform := range pushCtx.Platforms {
  394. p, err := platforms.Parse(platform)
  395. if err != nil {
  396. return fmt.Errorf("invalid platform %s: %w", platform, err)
  397. }
  398. ps = append(ps, p)
  399. }
  400. pushCtx.PlatformMatcher = platforms.Any(ps...)
  401. } else {
  402. pushCtx.PlatformMatcher = platforms.All
  403. }
  404. }
  405. // Annotate ref with digest to push only push tag for single digest
  406. if !strings.Contains(ref, "@") {
  407. ref = ref + "@" + desc.Digest.String()
  408. }
  409. pusher, err := pushCtx.Resolver.Pusher(ctx, ref)
  410. if err != nil {
  411. return err
  412. }
  413. var wrapper func(images.Handler) images.Handler
  414. if len(pushCtx.BaseHandlers) > 0 {
  415. wrapper = func(h images.Handler) images.Handler {
  416. h = images.Handlers(append(pushCtx.BaseHandlers, h)...)
  417. if pushCtx.HandlerWrapper != nil {
  418. h = pushCtx.HandlerWrapper(h)
  419. }
  420. return h
  421. }
  422. } else if pushCtx.HandlerWrapper != nil {
  423. wrapper = pushCtx.HandlerWrapper
  424. }
  425. var limiter *semaphore.Weighted
  426. if pushCtx.MaxConcurrentUploadedLayers > 0 {
  427. limiter = semaphore.NewWeighted(int64(pushCtx.MaxConcurrentUploadedLayers))
  428. }
  429. return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), limiter, pushCtx.PlatformMatcher, wrapper)
  430. }
  431. // GetImage returns an existing image
  432. func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
  433. i, err := c.ImageService().Get(ctx, ref)
  434. if err != nil {
  435. return nil, err
  436. }
  437. return NewImage(c, i), nil
  438. }
  439. // ListImages returns all existing images
  440. func (c *Client) ListImages(ctx context.Context, filters ...string) ([]Image, error) {
  441. imgs, err := c.ImageService().List(ctx, filters...)
  442. if err != nil {
  443. return nil, err
  444. }
  445. images := make([]Image, len(imgs))
  446. for i, img := range imgs {
  447. images[i] = NewImage(c, img)
  448. }
  449. return images, nil
  450. }
  451. // Restore restores a container from a checkpoint
  452. func (c *Client) Restore(ctx context.Context, id string, checkpoint Image, opts ...RestoreOpts) (Container, error) {
  453. store := c.ContentStore()
  454. index, err := decodeIndex(ctx, store, checkpoint.Target())
  455. if err != nil {
  456. return nil, err
  457. }
  458. ctx, done, err := c.WithLease(ctx)
  459. if err != nil {
  460. return nil, err
  461. }
  462. defer done(ctx)
  463. copts := []NewContainerOpts{}
  464. for _, o := range opts {
  465. copts = append(copts, o(ctx, id, c, checkpoint, index))
  466. }
  467. ctr, err := c.NewContainer(ctx, id, copts...)
  468. if err != nil {
  469. return nil, err
  470. }
  471. return ctr, nil
  472. }
  473. func writeIndex(ctx context.Context, index *ocispec.Index, client *Client, ref string) (d ocispec.Descriptor, err error) {
  474. labels := map[string]string{}
  475. for i, m := range index.Manifests {
  476. labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String()
  477. }
  478. data, err := json.Marshal(index)
  479. if err != nil {
  480. return ocispec.Descriptor{}, err
  481. }
  482. return writeContent(ctx, client.ContentStore(), ocispec.MediaTypeImageIndex, ref, bytes.NewReader(data), content.WithLabels(labels))
  483. }
  484. // GetLabel gets a label value from namespace store
  485. // If there is no default label, an empty string returned with nil error
  486. func (c *Client) GetLabel(ctx context.Context, label string) (string, error) {
  487. ns, err := namespaces.NamespaceRequired(ctx)
  488. if err != nil {
  489. if c.defaultns == "" {
  490. return "", err
  491. }
  492. ns = c.defaultns
  493. }
  494. srv := c.NamespaceService()
  495. labels, err := srv.Labels(ctx, ns)
  496. if err != nil {
  497. return "", err
  498. }
  499. value := labels[label]
  500. return value, nil
  501. }
  502. // Subscribe to events that match one or more of the provided filters.
  503. //
  504. // Callers should listen on both the envelope and errs channels. If the errs
  505. // channel returns nil or an error, the subscriber should terminate.
  506. //
  507. // The subscriber can stop receiving events by canceling the provided context.
  508. // The errs channel will be closed and return a nil error.
  509. func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
  510. return c.EventService().Subscribe(ctx, filters...)
  511. }
  512. // Close closes the clients connection to containerd
  513. func (c *Client) Close() error {
  514. c.connMu.Lock()
  515. defer c.connMu.Unlock()
  516. if c.conn != nil {
  517. return c.conn.Close()
  518. }
  519. return nil
  520. }
  521. // NamespaceService returns the underlying Namespaces Store
  522. func (c *Client) NamespaceService() namespaces.Store {
  523. if c.namespaceStore != nil {
  524. return c.namespaceStore
  525. }
  526. c.connMu.Lock()
  527. defer c.connMu.Unlock()
  528. return NewNamespaceStoreFromClient(namespacesapi.NewNamespacesClient(c.conn))
  529. }
  530. // ContainerService returns the underlying container Store
  531. func (c *Client) ContainerService() containers.Store {
  532. if c.containerStore != nil {
  533. return c.containerStore
  534. }
  535. c.connMu.Lock()
  536. defer c.connMu.Unlock()
  537. return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
  538. }
  539. // ContentStore returns the underlying content Store
  540. func (c *Client) ContentStore() content.Store {
  541. if c.contentStore != nil {
  542. return c.contentStore
  543. }
  544. c.connMu.Lock()
  545. defer c.connMu.Unlock()
  546. return contentproxy.NewContentStore(contentapi.NewContentClient(c.conn))
  547. }
  548. // SnapshotService returns the underlying snapshotter for the provided snapshotter name
  549. func (c *Client) SnapshotService(snapshotterName string) snapshots.Snapshotter {
  550. snapshotterName, err := c.resolveSnapshotterName(context.Background(), snapshotterName)
  551. if err != nil {
  552. snapshotterName = DefaultSnapshotter
  553. }
  554. if c.snapshotters != nil {
  555. return c.snapshotters[snapshotterName]
  556. }
  557. c.connMu.Lock()
  558. defer c.connMu.Unlock()
  559. return snproxy.NewSnapshotter(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName)
  560. }
  561. // DefaultNamespace return the default namespace
  562. func (c *Client) DefaultNamespace() string {
  563. return c.defaultns
  564. }
  565. // TaskService returns the underlying TasksClient
  566. func (c *Client) TaskService() tasks.TasksClient {
  567. if c.taskService != nil {
  568. return c.taskService
  569. }
  570. c.connMu.Lock()
  571. defer c.connMu.Unlock()
  572. return tasks.NewTasksClient(c.conn)
  573. }
  574. // ImageService returns the underlying image Store
  575. func (c *Client) ImageService() images.Store {
  576. if c.imageStore != nil {
  577. return c.imageStore
  578. }
  579. c.connMu.Lock()
  580. defer c.connMu.Unlock()
  581. return NewImageStoreFromClient(imagesapi.NewImagesClient(c.conn))
  582. }
  583. // DiffService returns the underlying Differ
  584. func (c *Client) DiffService() DiffService {
  585. if c.diffService != nil {
  586. return c.diffService
  587. }
  588. c.connMu.Lock()
  589. defer c.connMu.Unlock()
  590. return NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
  591. }
  592. // IntrospectionService returns the underlying Introspection Client
  593. func (c *Client) IntrospectionService() introspection.Service {
  594. if c.introspectionService != nil {
  595. return c.introspectionService
  596. }
  597. c.connMu.Lock()
  598. defer c.connMu.Unlock()
  599. return introspection.NewIntrospectionServiceFromClient(introspectionapi.NewIntrospectionClient(c.conn))
  600. }
  601. // LeasesService returns the underlying Leases Client
  602. func (c *Client) LeasesService() leases.Manager {
  603. if c.leasesService != nil {
  604. return c.leasesService
  605. }
  606. c.connMu.Lock()
  607. defer c.connMu.Unlock()
  608. return leasesproxy.NewLeaseManager(leasesapi.NewLeasesClient(c.conn))
  609. }
  610. // HealthService returns the underlying GRPC HealthClient
  611. func (c *Client) HealthService() grpc_health_v1.HealthClient {
  612. c.connMu.Lock()
  613. defer c.connMu.Unlock()
  614. return grpc_health_v1.NewHealthClient(c.conn)
  615. }
  616. // EventService returns the underlying event service
  617. func (c *Client) EventService() EventService {
  618. if c.eventService != nil {
  619. return c.eventService
  620. }
  621. c.connMu.Lock()
  622. defer c.connMu.Unlock()
  623. return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn))
  624. }
  625. // SandboxStore returns the underlying sandbox store client
  626. func (c *Client) SandboxStore() sandbox.Store {
  627. if c.sandboxStore != nil {
  628. return c.sandboxStore
  629. }
  630. c.connMu.Lock()
  631. defer c.connMu.Unlock()
  632. return sandboxproxy.NewSandboxStore(sandboxsapi.NewStoreClient(c.conn))
  633. }
  634. // SandboxController returns the underlying sandbox controller client
  635. func (c *Client) SandboxController() sandbox.Controller {
  636. if c.sandboxController != nil {
  637. return c.sandboxController
  638. }
  639. c.connMu.Lock()
  640. defer c.connMu.Unlock()
  641. return sandboxproxy.NewSandboxController(sandboxsapi.NewControllerClient(c.conn))
  642. }
  643. // VersionService returns the underlying VersionClient
  644. func (c *Client) VersionService() versionservice.VersionClient {
  645. c.connMu.Lock()
  646. defer c.connMu.Unlock()
  647. return versionservice.NewVersionClient(c.conn)
  648. }
  649. // Conn returns the underlying GRPC connection object
  650. func (c *Client) Conn() *grpc.ClientConn {
  651. c.connMu.Lock()
  652. defer c.connMu.Unlock()
  653. return c.conn
  654. }
  655. // Version of containerd
  656. type Version struct {
  657. // Version number
  658. Version string
  659. // Revision from git that was built
  660. Revision string
  661. }
  662. // Version returns the version of containerd that the client is connected to
  663. func (c *Client) Version(ctx context.Context) (Version, error) {
  664. c.connMu.Lock()
  665. if c.conn == nil {
  666. c.connMu.Unlock()
  667. return Version{}, fmt.Errorf("no grpc connection available: %w", errdefs.ErrUnavailable)
  668. }
  669. c.connMu.Unlock()
  670. response, err := c.VersionService().Version(ctx, &ptypes.Empty{})
  671. if err != nil {
  672. return Version{}, err
  673. }
  674. return Version{
  675. Version: response.Version,
  676. Revision: response.Revision,
  677. }, nil
  678. }
  679. // ServerInfo represents the introspected server information
  680. type ServerInfo struct {
  681. UUID string
  682. }
  683. // Server returns server information from the introspection service
  684. func (c *Client) Server(ctx context.Context) (ServerInfo, error) {
  685. c.connMu.Lock()
  686. if c.conn == nil {
  687. c.connMu.Unlock()
  688. return ServerInfo{}, fmt.Errorf("no grpc connection available: %w", errdefs.ErrUnavailable)
  689. }
  690. c.connMu.Unlock()
  691. response, err := c.IntrospectionService().Server(ctx, &ptypes.Empty{})
  692. if err != nil {
  693. return ServerInfo{}, err
  694. }
  695. return ServerInfo{
  696. UUID: response.UUID,
  697. }, nil
  698. }
  699. func (c *Client) resolveSnapshotterName(ctx context.Context, name string) (string, error) {
  700. if name == "" {
  701. label, err := c.GetLabel(ctx, defaults.DefaultSnapshotterNSLabel)
  702. if err != nil {
  703. return "", err
  704. }
  705. if label != "" {
  706. name = label
  707. } else {
  708. name = DefaultSnapshotter
  709. }
  710. }
  711. return name, nil
  712. }
  713. func (c *Client) getSnapshotter(ctx context.Context, name string) (snapshots.Snapshotter, error) {
  714. name, err := c.resolveSnapshotterName(ctx, name)
  715. if err != nil {
  716. return nil, err
  717. }
  718. s := c.SnapshotService(name)
  719. if s == nil {
  720. return nil, fmt.Errorf("snapshotter %s was not found: %w", name, errdefs.ErrNotFound)
  721. }
  722. return s, nil
  723. }
  724. // CheckRuntime returns true if the current runtime matches the expected
  725. // runtime. Providing various parts of the runtime schema will match those
  726. // parts of the expected runtime
  727. func CheckRuntime(current, expected string) bool {
  728. cp := strings.Split(current, ".")
  729. l := len(cp)
  730. for i, p := range strings.Split(expected, ".") {
  731. if i > l {
  732. return false
  733. }
  734. if p != cp[i] {
  735. return false
  736. }
  737. }
  738. return true
  739. }
  740. // GetSnapshotterSupportedPlatforms returns a platform matchers which represents the
  741. // supported platforms for the given snapshotters
  742. func (c *Client) GetSnapshotterSupportedPlatforms(ctx context.Context, snapshotterName string) (platforms.MatchComparer, error) {
  743. filters := []string{fmt.Sprintf("type==%s, id==%s", plugin.SnapshotPlugin, snapshotterName)}
  744. in := c.IntrospectionService()
  745. resp, err := in.Plugins(ctx, filters)
  746. if err != nil {
  747. return nil, err
  748. }
  749. if len(resp.Plugins) <= 0 {
  750. return nil, fmt.Errorf("inspection service could not find snapshotter %s plugin", snapshotterName)
  751. }
  752. sn := resp.Plugins[0]
  753. snPlatforms := toPlatforms(sn.Platforms)
  754. return platforms.Any(snPlatforms...), nil
  755. }
  756. func toPlatforms(pt []*apitypes.Platform) []ocispec.Platform {
  757. platforms := make([]ocispec.Platform, len(pt))
  758. for i, p := range pt {
  759. platforms[i] = ocispec.Platform{
  760. Architecture: p.Architecture,
  761. OS: p.OS,
  762. Variant: p.Variant,
  763. }
  764. }
  765. return platforms
  766. }
  767. // GetSnapshotterCapabilities returns the capabilities of a snapshotter.
  768. func (c *Client) GetSnapshotterCapabilities(ctx context.Context, snapshotterName string) ([]string, error) {
  769. filters := []string{fmt.Sprintf("type==%s, id==%s", plugin.SnapshotPlugin, snapshotterName)}
  770. in := c.IntrospectionService()
  771. resp, err := in.Plugins(ctx, filters)
  772. if err != nil {
  773. return nil, err
  774. }
  775. if len(resp.Plugins) <= 0 {
  776. return nil, fmt.Errorf("inspection service could not find snapshotter %s plugin", snapshotterName)
  777. }
  778. sn := resp.Plugins[0]
  779. return sn.Capabilities, nil
  780. }