resumable.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gensupport
  5. import (
  6. "context"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/google/uuid"
  15. "google.golang.org/api/internal"
  16. )
  17. // ResumableUpload is used by the generated APIs to provide resumable uploads.
  18. // It is not used by developers directly.
  19. type ResumableUpload struct {
  20. Client *http.Client
  21. // URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
  22. URI string
  23. UserAgent string // User-Agent for header of the request
  24. // Media is the object being uploaded.
  25. Media *MediaBuffer
  26. // MediaType defines the media type, e.g. "image/jpeg".
  27. MediaType string
  28. mu sync.Mutex // guards progress
  29. progress int64 // number of bytes uploaded so far
  30. // Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
  31. Callback func(int64)
  32. // Retry optionally configures retries for requests made against the upload.
  33. Retry *RetryConfig
  34. // ChunkRetryDeadline configures the per-chunk deadline after which no further
  35. // retries should happen.
  36. ChunkRetryDeadline time.Duration
  37. // Track current request invocation ID and attempt count for retry metrics
  38. // and idempotency headers.
  39. invocationID string
  40. attempts int
  41. }
  42. // Progress returns the number of bytes uploaded at this point.
  43. func (rx *ResumableUpload) Progress() int64 {
  44. rx.mu.Lock()
  45. defer rx.mu.Unlock()
  46. return rx.progress
  47. }
  48. // doUploadRequest performs a single HTTP request to upload data.
  49. // off specifies the offset in rx.Media from which data is drawn.
  50. // size is the number of bytes in data.
  51. // final specifies whether data is the final chunk to be uploaded.
  52. func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
  53. req, err := http.NewRequest("POST", rx.URI, data)
  54. if err != nil {
  55. return nil, err
  56. }
  57. req.ContentLength = size
  58. var contentRange string
  59. if final {
  60. if size == 0 {
  61. contentRange = fmt.Sprintf("bytes */%v", off)
  62. } else {
  63. contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
  64. }
  65. } else {
  66. contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
  67. }
  68. req.Header.Set("Content-Range", contentRange)
  69. req.Header.Set("Content-Type", rx.MediaType)
  70. req.Header.Set("User-Agent", rx.UserAgent)
  71. // TODO(b/274504690): Consider dropping gccl-invocation-id key since it
  72. // duplicates the X-Goog-Gcs-Idempotency-Token header (added in v0.115.0).
  73. baseXGoogHeader := "gl-go/" + GoVersion() + " gdcl/" + internal.Version
  74. invocationHeader := fmt.Sprintf("gccl-invocation-id/%s gccl-attempt-count/%d", rx.invocationID, rx.attempts)
  75. req.Header.Set("X-Goog-Api-Client", strings.Join([]string{baseXGoogHeader, invocationHeader}, " "))
  76. // Set idempotency token header which is used by GCS uploads.
  77. req.Header.Set("X-Goog-Gcs-Idempotency-Token", rx.invocationID)
  78. // Google's upload endpoint uses status code 308 for a
  79. // different purpose than the "308 Permanent Redirect"
  80. // since-standardized in RFC 7238. Because of the conflict in
  81. // semantics, Google added this new request header which
  82. // causes it to not use "308" and instead reply with 200 OK
  83. // and sets the upload-specific "X-HTTP-Status-Code-Override:
  84. // 308" response header.
  85. req.Header.Set("X-GUploader-No-308", "yes")
  86. return SendRequest(ctx, rx.Client, req)
  87. }
  88. func statusResumeIncomplete(resp *http.Response) bool {
  89. // This is how the server signals "status resume incomplete"
  90. // when X-GUploader-No-308 is set to "yes":
  91. return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
  92. }
  93. // reportProgress calls a user-supplied callback to report upload progress.
  94. // If old==updated, the callback is not called.
  95. func (rx *ResumableUpload) reportProgress(old, updated int64) {
  96. if updated-old == 0 {
  97. return
  98. }
  99. rx.mu.Lock()
  100. rx.progress = updated
  101. rx.mu.Unlock()
  102. if rx.Callback != nil {
  103. rx.Callback(updated)
  104. }
  105. }
  106. // transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
  107. func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
  108. chunk, off, size, err := rx.Media.Chunk()
  109. done := err == io.EOF
  110. if !done && err != nil {
  111. return nil, err
  112. }
  113. res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
  114. if err != nil {
  115. return res, err
  116. }
  117. // We sent "X-GUploader-No-308: yes" (see comment elsewhere in
  118. // this file), so we don't expect to get a 308.
  119. if res.StatusCode == 308 {
  120. return nil, errors.New("unexpected 308 response status code")
  121. }
  122. if res.StatusCode == http.StatusOK {
  123. rx.reportProgress(off, off+int64(size))
  124. }
  125. if statusResumeIncomplete(res) {
  126. rx.Media.Next()
  127. }
  128. return res, nil
  129. }
  130. // Upload starts the process of a resumable upload with a cancellable context.
  131. // It retries using the provided back off strategy until cancelled or the
  132. // strategy indicates to stop retrying.
  133. // It is called from the auto-generated API code and is not visible to the user.
  134. // Before sending an HTTP request, Upload calls any registered hook functions,
  135. // and calls the returned functions after the request returns (see send.go).
  136. // rx is private to the auto-generated API code.
  137. // Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
  138. func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
  139. // There are a couple of cases where it's possible for err and resp to both
  140. // be non-nil. However, we expose a simpler contract to our callers: exactly
  141. // one of resp and err will be non-nil. This means that any response body
  142. // must be closed here before returning a non-nil error.
  143. var prepareReturn = func(resp *http.Response, err error) (*http.Response, error) {
  144. if err != nil {
  145. if resp != nil && resp.Body != nil {
  146. resp.Body.Close()
  147. }
  148. return nil, err
  149. }
  150. // This case is very unlikely but possible only if rx.ChunkRetryDeadline is
  151. // set to a very small value, in which case no requests will be sent before
  152. // the deadline. Return an error to avoid causing a panic.
  153. if resp == nil {
  154. return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDealine", rx.URI)
  155. }
  156. return resp, nil
  157. }
  158. // Configure retryable error criteria.
  159. errorFunc := rx.Retry.errorFunc()
  160. // Configure per-chunk retry deadline.
  161. var retryDeadline time.Duration
  162. if rx.ChunkRetryDeadline != 0 {
  163. retryDeadline = rx.ChunkRetryDeadline
  164. } else {
  165. retryDeadline = defaultRetryDeadline
  166. }
  167. // Send all chunks.
  168. for {
  169. var pause time.Duration
  170. // Each chunk gets its own initialized-at-zero backoff and invocation ID.
  171. bo := rx.Retry.backoff()
  172. quitAfterTimer := time.NewTimer(retryDeadline)
  173. rx.attempts = 1
  174. rx.invocationID = uuid.New().String()
  175. // Retry loop for a single chunk.
  176. for {
  177. pauseTimer := time.NewTimer(pause)
  178. select {
  179. case <-ctx.Done():
  180. quitAfterTimer.Stop()
  181. pauseTimer.Stop()
  182. if err == nil {
  183. err = ctx.Err()
  184. }
  185. return prepareReturn(resp, err)
  186. case <-pauseTimer.C:
  187. case <-quitAfterTimer.C:
  188. pauseTimer.Stop()
  189. return prepareReturn(resp, err)
  190. }
  191. pauseTimer.Stop()
  192. // Check for context cancellation or timeout once more. If more than one
  193. // case in the select statement above was satisfied at the same time, Go
  194. // will choose one arbitrarily.
  195. // That can cause an operation to go through even if the context was
  196. // canceled before or the timeout was reached.
  197. select {
  198. case <-ctx.Done():
  199. quitAfterTimer.Stop()
  200. if err == nil {
  201. err = ctx.Err()
  202. }
  203. return prepareReturn(resp, err)
  204. case <-quitAfterTimer.C:
  205. return prepareReturn(resp, err)
  206. default:
  207. }
  208. resp, err = rx.transferChunk(ctx)
  209. var status int
  210. if resp != nil {
  211. status = resp.StatusCode
  212. }
  213. // Check if we should retry the request.
  214. if !errorFunc(status, err) {
  215. quitAfterTimer.Stop()
  216. break
  217. }
  218. rx.attempts++
  219. pause = bo.Pause()
  220. if resp != nil && resp.Body != nil {
  221. resp.Body.Close()
  222. }
  223. }
  224. // If the chunk was uploaded successfully, but there's still
  225. // more to go, upload the next chunk without any delay.
  226. if statusResumeIncomplete(resp) {
  227. resp.Body.Close()
  228. continue
  229. }
  230. return prepareReturn(resp, err)
  231. }
  232. }