task.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720
  1. /*
  2. Copyright The containerd Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package containerd
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. goruntime "runtime"
  22. "strings"
  23. "syscall"
  24. "time"
  25. "github.com/containerd/containerd/api/services/tasks/v1"
  26. "github.com/containerd/containerd/api/types"
  27. "github.com/containerd/containerd/cio"
  28. "github.com/containerd/containerd/content"
  29. "github.com/containerd/containerd/diff"
  30. "github.com/containerd/containerd/errdefs"
  31. "github.com/containerd/containerd/images"
  32. "github.com/containerd/containerd/mount"
  33. "github.com/containerd/containerd/oci"
  34. "github.com/containerd/containerd/plugin"
  35. "github.com/containerd/containerd/protobuf"
  36. google_protobuf "github.com/containerd/containerd/protobuf/types"
  37. "github.com/containerd/containerd/rootfs"
  38. "github.com/containerd/containerd/runtime/linux/runctypes"
  39. "github.com/containerd/containerd/runtime/v2/runc/options"
  40. "github.com/containerd/typeurl/v2"
  41. digest "github.com/opencontainers/go-digest"
  42. is "github.com/opencontainers/image-spec/specs-go"
  43. v1 "github.com/opencontainers/image-spec/specs-go/v1"
  44. specs "github.com/opencontainers/runtime-spec/specs-go"
  45. )
  46. // UnknownExitStatus is returned when containerd is unable to
  47. // determine the exit status of a process. This can happen if the process never starts
  48. // or if an error was encountered when obtaining the exit status, it is set to 255.
  49. const UnknownExitStatus = 255
  50. const (
  51. checkpointDateFormat = "01-02-2006-15:04:05"
  52. checkpointNameFormat = "containerd.io/checkpoint/%s:%s"
  53. )
  54. // Status returns process status and exit information
  55. type Status struct {
  56. // Status of the process
  57. Status ProcessStatus
  58. // ExitStatus returned by the process
  59. ExitStatus uint32
  60. // ExitedTime is the time at which the process died
  61. ExitTime time.Time
  62. }
  63. // ProcessInfo provides platform specific process information
  64. type ProcessInfo struct {
  65. // Pid is the process ID
  66. Pid uint32
  67. // Info includes additional process information
  68. // Info varies by platform
  69. Info *google_protobuf.Any
  70. }
  71. // ProcessStatus returns a human readable status for the Process representing its current status
  72. type ProcessStatus string
  73. const (
  74. // Running indicates the process is currently executing
  75. Running ProcessStatus = "running"
  76. // Created indicates the process has been created within containerd but the
  77. // user's defined process has not started
  78. Created ProcessStatus = "created"
  79. // Stopped indicates that the process has ran and exited
  80. Stopped ProcessStatus = "stopped"
  81. // Paused indicates that the process is currently paused
  82. Paused ProcessStatus = "paused"
  83. // Pausing indicates that the process is currently switching from a
  84. // running state into a paused state
  85. Pausing ProcessStatus = "pausing"
  86. // Unknown indicates that we could not determine the status from the runtime
  87. Unknown ProcessStatus = "unknown"
  88. )
  89. // IOCloseInfo allows specific io pipes to be closed on a process
  90. type IOCloseInfo struct {
  91. Stdin bool
  92. }
  93. // IOCloserOpts allows the caller to set specific pipes as closed on a process
  94. type IOCloserOpts func(*IOCloseInfo)
  95. // WithStdinCloser closes the stdin of a process
  96. func WithStdinCloser(r *IOCloseInfo) {
  97. r.Stdin = true
  98. }
  99. // CheckpointTaskInfo allows specific checkpoint information to be set for the task
  100. type CheckpointTaskInfo struct {
  101. Name string
  102. // ParentCheckpoint is the digest of a parent checkpoint
  103. ParentCheckpoint digest.Digest
  104. // Options hold runtime specific settings for checkpointing a task
  105. Options interface{}
  106. runtime string
  107. }
  108. // Runtime name for the container
  109. func (i *CheckpointTaskInfo) Runtime() string {
  110. return i.runtime
  111. }
  112. // CheckpointTaskOpts allows the caller to set checkpoint options
  113. type CheckpointTaskOpts func(*CheckpointTaskInfo) error
  114. // TaskInfo sets options for task creation
  115. type TaskInfo struct {
  116. // Checkpoint is the Descriptor for an existing checkpoint that can be used
  117. // to restore a task's runtime and memory state
  118. Checkpoint *types.Descriptor
  119. // RootFS is a list of mounts to use as the task's root filesystem
  120. RootFS []mount.Mount
  121. // Options hold runtime specific settings for task creation
  122. Options interface{}
  123. // RuntimePath is an absolute path that can be used to overwrite path
  124. // to a shim runtime binary.
  125. RuntimePath string
  126. // runtime is the runtime name for the container, and cannot be changed.
  127. runtime string
  128. }
  129. // Runtime name for the container
  130. func (i *TaskInfo) Runtime() string {
  131. return i.runtime
  132. }
  133. // Task is the executable object within containerd
  134. type Task interface {
  135. Process
  136. // Pause suspends the execution of the task
  137. Pause(context.Context) error
  138. // Resume the execution of the task
  139. Resume(context.Context) error
  140. // Exec creates a new process inside the task
  141. Exec(context.Context, string, *specs.Process, cio.Creator) (Process, error)
  142. // Pids returns a list of system specific process ids inside the task
  143. Pids(context.Context) ([]ProcessInfo, error)
  144. // Checkpoint serializes the runtime and memory information of a task into an
  145. // OCI Index that can be pushed and pulled from a remote resource.
  146. //
  147. // Additional software like CRIU maybe required to checkpoint and restore tasks
  148. // NOTE: Checkpoint supports to dump task information to a directory, in this way,
  149. // an empty OCI Index will be returned.
  150. Checkpoint(context.Context, ...CheckpointTaskOpts) (Image, error)
  151. // Update modifies executing tasks with updated settings
  152. Update(context.Context, ...UpdateTaskOpts) error
  153. // LoadProcess loads a previously created exec'd process
  154. LoadProcess(context.Context, string, cio.Attach) (Process, error)
  155. // Metrics returns task metrics for runtime specific metrics
  156. //
  157. // The metric types are generic to containerd and change depending on the runtime
  158. // For the built in Linux runtime, github.com/containerd/cgroups.Metrics
  159. // are returned in protobuf format
  160. Metrics(context.Context) (*types.Metric, error)
  161. // Spec returns the current OCI specification for the task
  162. Spec(context.Context) (*oci.Spec, error)
  163. }
  164. var _ = (Task)(&task{})
  165. type task struct {
  166. client *Client
  167. c Container
  168. io cio.IO
  169. id string
  170. pid uint32
  171. }
  172. // Spec returns the current OCI specification for the task
  173. func (t *task) Spec(ctx context.Context) (*oci.Spec, error) {
  174. return t.c.Spec(ctx)
  175. }
  176. // ID of the task
  177. func (t *task) ID() string {
  178. return t.id
  179. }
  180. // Pid returns the pid or process id for the task
  181. func (t *task) Pid() uint32 {
  182. return t.pid
  183. }
  184. func (t *task) Start(ctx context.Context) error {
  185. r, err := t.client.TaskService().Start(ctx, &tasks.StartRequest{
  186. ContainerID: t.id,
  187. })
  188. if err != nil {
  189. if t.io != nil {
  190. t.io.Cancel()
  191. t.io.Close()
  192. }
  193. return errdefs.FromGRPC(err)
  194. }
  195. t.pid = r.Pid
  196. return nil
  197. }
  198. func (t *task) Kill(ctx context.Context, s syscall.Signal, opts ...KillOpts) error {
  199. var i KillInfo
  200. for _, o := range opts {
  201. if err := o(ctx, &i); err != nil {
  202. return err
  203. }
  204. }
  205. _, err := t.client.TaskService().Kill(ctx, &tasks.KillRequest{
  206. Signal: uint32(s),
  207. ContainerID: t.id,
  208. ExecID: i.ExecID,
  209. All: i.All,
  210. })
  211. if err != nil {
  212. return errdefs.FromGRPC(err)
  213. }
  214. return nil
  215. }
  216. func (t *task) Pause(ctx context.Context) error {
  217. _, err := t.client.TaskService().Pause(ctx, &tasks.PauseTaskRequest{
  218. ContainerID: t.id,
  219. })
  220. return errdefs.FromGRPC(err)
  221. }
  222. func (t *task) Resume(ctx context.Context) error {
  223. _, err := t.client.TaskService().Resume(ctx, &tasks.ResumeTaskRequest{
  224. ContainerID: t.id,
  225. })
  226. return errdefs.FromGRPC(err)
  227. }
  228. func (t *task) Status(ctx context.Context) (Status, error) {
  229. r, err := t.client.TaskService().Get(ctx, &tasks.GetRequest{
  230. ContainerID: t.id,
  231. })
  232. if err != nil {
  233. return Status{}, errdefs.FromGRPC(err)
  234. }
  235. return Status{
  236. Status: ProcessStatus(strings.ToLower(r.Process.Status.String())),
  237. ExitStatus: r.Process.ExitStatus,
  238. ExitTime: protobuf.FromTimestamp(r.Process.ExitedAt),
  239. }, nil
  240. }
  241. func (t *task) Wait(ctx context.Context) (<-chan ExitStatus, error) {
  242. c := make(chan ExitStatus, 1)
  243. go func() {
  244. defer close(c)
  245. r, err := t.client.TaskService().Wait(ctx, &tasks.WaitRequest{
  246. ContainerID: t.id,
  247. })
  248. if err != nil {
  249. c <- ExitStatus{
  250. code: UnknownExitStatus,
  251. err: err,
  252. }
  253. return
  254. }
  255. c <- ExitStatus{
  256. code: r.ExitStatus,
  257. exitedAt: protobuf.FromTimestamp(r.ExitedAt),
  258. }
  259. }()
  260. return c, nil
  261. }
  262. // Delete deletes the task and its runtime state
  263. // it returns the exit status of the task and any errors that were encountered
  264. // during cleanup
  265. func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStatus, error) {
  266. for _, o := range opts {
  267. if err := o(ctx, t); err != nil {
  268. return nil, err
  269. }
  270. }
  271. status, err := t.Status(ctx)
  272. if err != nil && errdefs.IsNotFound(err) {
  273. return nil, err
  274. }
  275. switch status.Status {
  276. case Stopped, Unknown, "":
  277. case Created:
  278. if t.client.runtime == fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "windows") {
  279. // On windows Created is akin to Stopped
  280. break
  281. }
  282. if t.pid == 0 {
  283. // allow for deletion of created tasks with PID 0
  284. // https://github.com/containerd/containerd/issues/7357
  285. break
  286. }
  287. fallthrough
  288. default:
  289. return nil, fmt.Errorf("task must be stopped before deletion: %s: %w", status.Status, errdefs.ErrFailedPrecondition)
  290. }
  291. if t.io != nil {
  292. // io.Wait locks for restored tasks on Windows unless we call
  293. // io.Close first (https://github.com/containerd/containerd/issues/5621)
  294. // in other cases, preserve the contract and let IO finish before closing
  295. if t.client.runtime == fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "windows") {
  296. t.io.Close()
  297. }
  298. // io.Cancel is used to cancel the io goroutine while it is in
  299. // fifo-opening state. It does not stop the pipes since these
  300. // should be closed on the shim's side, otherwise we might lose
  301. // data from the container!
  302. t.io.Cancel()
  303. t.io.Wait()
  304. }
  305. r, err := t.client.TaskService().Delete(ctx, &tasks.DeleteTaskRequest{
  306. ContainerID: t.id,
  307. })
  308. if err != nil {
  309. return nil, errdefs.FromGRPC(err)
  310. }
  311. // Only cleanup the IO after a successful Delete
  312. if t.io != nil {
  313. t.io.Close()
  314. }
  315. return &ExitStatus{code: r.ExitStatus, exitedAt: protobuf.FromTimestamp(r.ExitedAt)}, nil
  316. }
  317. func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creator) (_ Process, err error) {
  318. if id == "" {
  319. return nil, fmt.Errorf("exec id must not be empty: %w", errdefs.ErrInvalidArgument)
  320. }
  321. i, err := ioCreate(id)
  322. if err != nil {
  323. return nil, err
  324. }
  325. defer func() {
  326. if err != nil && i != nil {
  327. i.Cancel()
  328. i.Close()
  329. }
  330. }()
  331. any, err := protobuf.MarshalAnyToProto(spec)
  332. if err != nil {
  333. return nil, err
  334. }
  335. cfg := i.Config()
  336. request := &tasks.ExecProcessRequest{
  337. ContainerID: t.id,
  338. ExecID: id,
  339. Terminal: cfg.Terminal,
  340. Stdin: cfg.Stdin,
  341. Stdout: cfg.Stdout,
  342. Stderr: cfg.Stderr,
  343. Spec: any,
  344. }
  345. if _, err := t.client.TaskService().Exec(ctx, request); err != nil {
  346. i.Cancel()
  347. i.Wait()
  348. i.Close()
  349. return nil, errdefs.FromGRPC(err)
  350. }
  351. return &process{
  352. id: id,
  353. task: t,
  354. io: i,
  355. }, nil
  356. }
  357. func (t *task) Pids(ctx context.Context) ([]ProcessInfo, error) {
  358. response, err := t.client.TaskService().ListPids(ctx, &tasks.ListPidsRequest{
  359. ContainerID: t.id,
  360. })
  361. if err != nil {
  362. return nil, errdefs.FromGRPC(err)
  363. }
  364. var processList []ProcessInfo
  365. for _, p := range response.Processes {
  366. processList = append(processList, ProcessInfo{
  367. Pid: p.Pid,
  368. Info: p.Info,
  369. })
  370. }
  371. return processList, nil
  372. }
  373. func (t *task) CloseIO(ctx context.Context, opts ...IOCloserOpts) error {
  374. r := &tasks.CloseIORequest{
  375. ContainerID: t.id,
  376. }
  377. var i IOCloseInfo
  378. for _, o := range opts {
  379. o(&i)
  380. }
  381. r.Stdin = i.Stdin
  382. _, err := t.client.TaskService().CloseIO(ctx, r)
  383. return errdefs.FromGRPC(err)
  384. }
  385. func (t *task) IO() cio.IO {
  386. return t.io
  387. }
  388. func (t *task) Resize(ctx context.Context, w, h uint32) error {
  389. _, err := t.client.TaskService().ResizePty(ctx, &tasks.ResizePtyRequest{
  390. ContainerID: t.id,
  391. Width: w,
  392. Height: h,
  393. })
  394. return errdefs.FromGRPC(err)
  395. }
  396. // NOTE: Checkpoint supports to dump task information to a directory, in this way, an empty
  397. // OCI Index will be returned.
  398. func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Image, error) {
  399. ctx, done, err := t.client.WithLease(ctx)
  400. if err != nil {
  401. return nil, err
  402. }
  403. defer done(ctx)
  404. cr, err := t.client.ContainerService().Get(ctx, t.id)
  405. if err != nil {
  406. return nil, err
  407. }
  408. request := &tasks.CheckpointTaskRequest{
  409. ContainerID: t.id,
  410. }
  411. i := CheckpointTaskInfo{
  412. runtime: cr.Runtime.Name,
  413. }
  414. for _, o := range opts {
  415. if err := o(&i); err != nil {
  416. return nil, err
  417. }
  418. }
  419. // set a default name
  420. if i.Name == "" {
  421. i.Name = fmt.Sprintf(checkpointNameFormat, t.id, time.Now().Format(checkpointDateFormat))
  422. }
  423. request.ParentCheckpoint = i.ParentCheckpoint.String()
  424. if i.Options != nil {
  425. any, err := protobuf.MarshalAnyToProto(i.Options)
  426. if err != nil {
  427. return nil, err
  428. }
  429. request.Options = any
  430. }
  431. status, err := t.Status(ctx)
  432. if err != nil {
  433. return nil, err
  434. }
  435. if status.Status != Paused {
  436. // make sure we pause it and resume after all other filesystem operations are completed
  437. if err := t.Pause(ctx); err != nil {
  438. return nil, err
  439. }
  440. defer t.Resume(ctx)
  441. }
  442. index := v1.Index{
  443. Versioned: is.Versioned{
  444. SchemaVersion: 2,
  445. },
  446. Annotations: make(map[string]string),
  447. }
  448. if err := t.checkpointTask(ctx, &index, request); err != nil {
  449. return nil, err
  450. }
  451. // if checkpoint image path passed, jump checkpoint image,
  452. // return an empty image
  453. if isCheckpointPathExist(cr.Runtime.Name, i.Options) {
  454. return NewImage(t.client, images.Image{}), nil
  455. }
  456. if cr.Image != "" {
  457. if err := t.checkpointImage(ctx, &index, cr.Image); err != nil {
  458. return nil, err
  459. }
  460. index.Annotations["image.name"] = cr.Image
  461. }
  462. if cr.SnapshotKey != "" {
  463. if err := t.checkpointRWSnapshot(ctx, &index, cr.Snapshotter, cr.SnapshotKey); err != nil {
  464. return nil, err
  465. }
  466. }
  467. desc, err := t.writeIndex(ctx, &index)
  468. if err != nil {
  469. return nil, err
  470. }
  471. im := images.Image{
  472. Name: i.Name,
  473. Target: desc,
  474. Labels: map[string]string{
  475. "containerd.io/checkpoint": "true",
  476. },
  477. }
  478. if im, err = t.client.ImageService().Create(ctx, im); err != nil {
  479. return nil, err
  480. }
  481. return NewImage(t.client, im), nil
  482. }
  483. // UpdateTaskInfo allows updated specific settings to be changed on a task
  484. type UpdateTaskInfo struct {
  485. // Resources updates a tasks resource constraints
  486. Resources interface{}
  487. // Annotations allows arbitrary and/or experimental resource constraints for task update
  488. Annotations map[string]string
  489. }
  490. // UpdateTaskOpts allows a caller to update task settings
  491. type UpdateTaskOpts func(context.Context, *Client, *UpdateTaskInfo) error
  492. func (t *task) Update(ctx context.Context, opts ...UpdateTaskOpts) error {
  493. request := &tasks.UpdateTaskRequest{
  494. ContainerID: t.id,
  495. }
  496. var i UpdateTaskInfo
  497. for _, o := range opts {
  498. if err := o(ctx, t.client, &i); err != nil {
  499. return err
  500. }
  501. }
  502. if i.Resources != nil {
  503. any, err := typeurl.MarshalAny(i.Resources)
  504. if err != nil {
  505. return err
  506. }
  507. request.Resources = protobuf.FromAny(any)
  508. }
  509. if i.Annotations != nil {
  510. request.Annotations = i.Annotations
  511. }
  512. _, err := t.client.TaskService().Update(ctx, request)
  513. return errdefs.FromGRPC(err)
  514. }
  515. func (t *task) LoadProcess(ctx context.Context, id string, ioAttach cio.Attach) (Process, error) {
  516. if id == t.id && ioAttach == nil {
  517. return t, nil
  518. }
  519. response, err := t.client.TaskService().Get(ctx, &tasks.GetRequest{
  520. ContainerID: t.id,
  521. ExecID: id,
  522. })
  523. if err != nil {
  524. err = errdefs.FromGRPC(err)
  525. if errdefs.IsNotFound(err) {
  526. return nil, fmt.Errorf("no running process found: %w", err)
  527. }
  528. return nil, err
  529. }
  530. var i cio.IO
  531. if ioAttach != nil {
  532. if i, err = attachExistingIO(response, ioAttach); err != nil {
  533. return nil, err
  534. }
  535. }
  536. return &process{
  537. id: id,
  538. task: t,
  539. io: i,
  540. }, nil
  541. }
  542. func (t *task) Metrics(ctx context.Context) (*types.Metric, error) {
  543. response, err := t.client.TaskService().Metrics(ctx, &tasks.MetricsRequest{
  544. Filters: []string{
  545. "id==" + t.id,
  546. },
  547. })
  548. if err != nil {
  549. return nil, errdefs.FromGRPC(err)
  550. }
  551. if response.Metrics == nil {
  552. _, err := t.Status(ctx)
  553. if err != nil && errdefs.IsNotFound(err) {
  554. return nil, err
  555. }
  556. return nil, errors.New("no metrics received")
  557. }
  558. return response.Metrics[0], nil
  559. }
  560. func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tasks.CheckpointTaskRequest) error {
  561. response, err := t.client.TaskService().Checkpoint(ctx, request)
  562. if err != nil {
  563. return errdefs.FromGRPC(err)
  564. }
  565. // NOTE: response.Descriptors can be an empty slice if checkpoint image is jumped
  566. // add the checkpoint descriptors to the index
  567. for _, d := range response.Descriptors {
  568. index.Manifests = append(index.Manifests, v1.Descriptor{
  569. MediaType: d.MediaType,
  570. Size: d.Size,
  571. Digest: digest.Digest(d.Digest),
  572. Platform: &v1.Platform{
  573. OS: goruntime.GOOS,
  574. Architecture: goruntime.GOARCH,
  575. },
  576. Annotations: d.Annotations,
  577. })
  578. }
  579. return nil
  580. }
  581. func (t *task) checkpointRWSnapshot(ctx context.Context, index *v1.Index, snapshotterName string, id string) error {
  582. opts := []diff.Opt{
  583. diff.WithReference(fmt.Sprintf("checkpoint-rw-%s", id)),
  584. }
  585. rw, err := rootfs.CreateDiff(ctx, id, t.client.SnapshotService(snapshotterName), t.client.DiffService(), opts...)
  586. if err != nil {
  587. return err
  588. }
  589. rw.Platform = &v1.Platform{
  590. OS: goruntime.GOOS,
  591. Architecture: goruntime.GOARCH,
  592. }
  593. index.Manifests = append(index.Manifests, rw)
  594. return nil
  595. }
  596. func (t *task) checkpointImage(ctx context.Context, index *v1.Index, image string) error {
  597. if image == "" {
  598. return fmt.Errorf("cannot checkpoint image with empty name")
  599. }
  600. ir, err := t.client.ImageService().Get(ctx, image)
  601. if err != nil {
  602. return err
  603. }
  604. index.Manifests = append(index.Manifests, ir.Target)
  605. return nil
  606. }
  607. func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor, err error) {
  608. labels := map[string]string{}
  609. for i, m := range index.Manifests {
  610. labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String()
  611. }
  612. buf := bytes.NewBuffer(nil)
  613. if err := json.NewEncoder(buf).Encode(index); err != nil {
  614. return v1.Descriptor{}, err
  615. }
  616. return writeContent(ctx, t.client.ContentStore(), v1.MediaTypeImageIndex, t.id, buf, content.WithLabels(labels))
  617. }
  618. func writeContent(ctx context.Context, store content.Ingester, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) {
  619. writer, err := store.Writer(ctx, content.WithRef(ref))
  620. if err != nil {
  621. return d, err
  622. }
  623. defer writer.Close()
  624. size, err := io.Copy(writer, r)
  625. if err != nil {
  626. return d, err
  627. }
  628. if err := writer.Commit(ctx, size, "", opts...); err != nil {
  629. if !errdefs.IsAlreadyExists(err) {
  630. return d, err
  631. }
  632. }
  633. return v1.Descriptor{
  634. MediaType: mediaType,
  635. Digest: writer.Digest(),
  636. Size: size,
  637. }, nil
  638. }
  639. // isCheckpointPathExist only suitable for runc runtime now
  640. func isCheckpointPathExist(runtime string, v interface{}) bool {
  641. if v == nil {
  642. return false
  643. }
  644. switch runtime {
  645. case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2:
  646. if opts, ok := v.(*options.CheckpointOptions); ok && opts.ImagePath != "" {
  647. return true
  648. }
  649. case plugin.RuntimeLinuxV1:
  650. if opts, ok := v.(*runctypes.CheckpointOptions); ok && opts.ImagePath != "" {
  651. return true
  652. }
  653. }
  654. return false
  655. }