http_client.go 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445
  1. // Copyright 2022 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "context"
  17. "encoding/base64"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "io/ioutil"
  22. "net/http"
  23. "net/url"
  24. "os"
  25. "reflect"
  26. "strconv"
  27. "strings"
  28. "time"
  29. "cloud.google.com/go/iam/apiv1/iampb"
  30. "cloud.google.com/go/internal/optional"
  31. "cloud.google.com/go/internal/trace"
  32. "github.com/googleapis/gax-go/v2/callctx"
  33. "golang.org/x/oauth2/google"
  34. "google.golang.org/api/googleapi"
  35. "google.golang.org/api/iterator"
  36. "google.golang.org/api/option"
  37. "google.golang.org/api/option/internaloption"
  38. raw "google.golang.org/api/storage/v1"
  39. "google.golang.org/api/transport"
  40. htransport "google.golang.org/api/transport/http"
  41. )
  42. // httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
  43. // storageClient interface.
  44. type httpStorageClient struct {
  45. creds *google.Credentials
  46. hc *http.Client
  47. xmlHost string
  48. raw *raw.Service
  49. scheme string
  50. settings *settings
  51. config *storageConfig
  52. }
  53. // newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON
  54. // Storage API.
  55. func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) {
  56. s := initSettings(opts...)
  57. o := s.clientOption
  58. config := newStorageConfig(o...)
  59. var creds *google.Credentials
  60. // In general, it is recommended to use raw.NewService instead of htransport.NewClient
  61. // since raw.NewService configures the correct default endpoints when initializing the
  62. // internal http client. However, in our case, "NewRangeReader" in reader.go needs to
  63. // access the http client directly to make requests, so we create the client manually
  64. // here so it can be re-used by both reader.go and raw.NewService. This means we need to
  65. // manually configure the default endpoint options on the http client. Furthermore, we
  66. // need to account for STORAGE_EMULATOR_HOST override when setting the default endpoints.
  67. if host := os.Getenv("STORAGE_EMULATOR_HOST"); host == "" {
  68. // Prepend default options to avoid overriding options passed by the user.
  69. o = append([]option.ClientOption{option.WithScopes(ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"), option.WithUserAgent(userAgent)}, o...)
  70. o = append(o, internaloption.WithDefaultEndpointTemplate("https://storage.UNIVERSE_DOMAIN/storage/v1/"),
  71. internaloption.WithDefaultMTLSEndpoint("https://storage.mtls.googleapis.com/storage/v1/"),
  72. internaloption.WithDefaultUniverseDomain("googleapis.com"),
  73. )
  74. // Don't error out here. The user may have passed in their own HTTP
  75. // client which does not auth with ADC or other common conventions.
  76. c, err := transport.Creds(ctx, o...)
  77. if err == nil {
  78. creds = c
  79. o = append(o, internaloption.WithCredentials(creds))
  80. }
  81. } else {
  82. var hostURL *url.URL
  83. if strings.Contains(host, "://") {
  84. h, err := url.Parse(host)
  85. if err != nil {
  86. return nil, err
  87. }
  88. hostURL = h
  89. } else {
  90. // Add scheme for user if not supplied in STORAGE_EMULATOR_HOST
  91. // URL is only parsed correctly if it has a scheme, so we build it ourselves
  92. hostURL = &url.URL{Scheme: "http", Host: host}
  93. }
  94. hostURL.Path = "storage/v1/"
  95. endpoint := hostURL.String()
  96. // Append the emulator host as default endpoint for the user
  97. o = append([]option.ClientOption{option.WithoutAuthentication()}, o...)
  98. o = append(o, internaloption.WithDefaultEndpoint(endpoint))
  99. o = append(o, internaloption.WithDefaultMTLSEndpoint(endpoint))
  100. }
  101. s.clientOption = o
  102. // htransport selects the correct endpoint among WithEndpoint (user override), WithDefaultEndpoint, and WithDefaultMTLSEndpoint.
  103. hc, ep, err := htransport.NewClient(ctx, s.clientOption...)
  104. if err != nil {
  105. return nil, fmt.Errorf("dialing: %w", err)
  106. }
  107. // RawService should be created with the chosen endpoint to take account of user override.
  108. rawService, err := raw.NewService(ctx, option.WithEndpoint(ep), option.WithHTTPClient(hc))
  109. if err != nil {
  110. return nil, fmt.Errorf("storage client: %w", err)
  111. }
  112. // Update xmlHost and scheme with the chosen endpoint.
  113. u, err := url.Parse(ep)
  114. if err != nil {
  115. return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err)
  116. }
  117. return &httpStorageClient{
  118. creds: creds,
  119. hc: hc,
  120. xmlHost: u.Host,
  121. raw: rawService,
  122. scheme: u.Scheme,
  123. settings: s,
  124. config: &config,
  125. }, nil
  126. }
  127. func (c *httpStorageClient) Close() error {
  128. c.hc.CloseIdleConnections()
  129. return nil
  130. }
  131. // Top-level methods.
  132. func (c *httpStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) {
  133. s := callSettings(c.settings, opts...)
  134. call := c.raw.Projects.ServiceAccount.Get(project)
  135. var res *raw.ServiceAccount
  136. err := run(ctx, func(ctx context.Context) error {
  137. var err error
  138. res, err = call.Context(ctx).Do()
  139. return err
  140. }, s.retry, s.idempotent)
  141. if err != nil {
  142. return "", err
  143. }
  144. return res.EmailAddress, nil
  145. }
  146. func (c *httpStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, enableObjectRetention *bool, opts ...storageOption) (*BucketAttrs, error) {
  147. s := callSettings(c.settings, opts...)
  148. var bkt *raw.Bucket
  149. if attrs != nil {
  150. bkt = attrs.toRawBucket()
  151. } else {
  152. bkt = &raw.Bucket{}
  153. }
  154. bkt.Name = bucket
  155. // If there is lifecycle information but no location, explicitly set
  156. // the location. This is a GCS quirk/bug.
  157. if bkt.Location == "" && bkt.Lifecycle != nil {
  158. bkt.Location = "US"
  159. }
  160. req := c.raw.Buckets.Insert(project, bkt)
  161. setClientHeader(req.Header())
  162. if attrs != nil && attrs.PredefinedACL != "" {
  163. req.PredefinedAcl(attrs.PredefinedACL)
  164. }
  165. if attrs != nil && attrs.PredefinedDefaultObjectACL != "" {
  166. req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL)
  167. }
  168. if enableObjectRetention != nil {
  169. req.EnableObjectRetention(*enableObjectRetention)
  170. }
  171. var battrs *BucketAttrs
  172. err := run(ctx, func(ctx context.Context) error {
  173. b, err := req.Context(ctx).Do()
  174. if err != nil {
  175. return err
  176. }
  177. battrs, err = newBucket(b)
  178. return err
  179. }, s.retry, s.idempotent)
  180. return battrs, err
  181. }
  182. func (c *httpStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator {
  183. s := callSettings(c.settings, opts...)
  184. it := &BucketIterator{
  185. ctx: ctx,
  186. projectID: project,
  187. }
  188. fetch := func(pageSize int, pageToken string) (token string, err error) {
  189. req := c.raw.Buckets.List(it.projectID)
  190. setClientHeader(req.Header())
  191. req.Projection("full")
  192. req.Prefix(it.Prefix)
  193. req.PageToken(pageToken)
  194. if pageSize > 0 {
  195. req.MaxResults(int64(pageSize))
  196. }
  197. var resp *raw.Buckets
  198. err = run(it.ctx, func(ctx context.Context) error {
  199. resp, err = req.Context(ctx).Do()
  200. return err
  201. }, s.retry, s.idempotent)
  202. if err != nil {
  203. return "", err
  204. }
  205. for _, item := range resp.Items {
  206. b, err := newBucket(item)
  207. if err != nil {
  208. return "", err
  209. }
  210. it.buckets = append(it.buckets, b)
  211. }
  212. return resp.NextPageToken, nil
  213. }
  214. it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  215. fetch,
  216. func() int { return len(it.buckets) },
  217. func() interface{} { b := it.buckets; it.buckets = nil; return b })
  218. return it
  219. }
  220. // Bucket methods.
  221. func (c *httpStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
  222. s := callSettings(c.settings, opts...)
  223. req := c.raw.Buckets.Delete(bucket)
  224. setClientHeader(req.Header())
  225. if err := applyBucketConds("httpStorageClient.DeleteBucket", conds, req); err != nil {
  226. return err
  227. }
  228. if s.userProject != "" {
  229. req.UserProject(s.userProject)
  230. }
  231. return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
  232. }
  233. func (c *httpStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
  234. s := callSettings(c.settings, opts...)
  235. req := c.raw.Buckets.Get(bucket).Projection("full")
  236. setClientHeader(req.Header())
  237. err := applyBucketConds("httpStorageClient.GetBucket", conds, req)
  238. if err != nil {
  239. return nil, err
  240. }
  241. if s.userProject != "" {
  242. req.UserProject(s.userProject)
  243. }
  244. var resp *raw.Bucket
  245. err = run(ctx, func(ctx context.Context) error {
  246. resp, err = req.Context(ctx).Do()
  247. return err
  248. }, s.retry, s.idempotent)
  249. var e *googleapi.Error
  250. if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
  251. return nil, ErrBucketNotExist
  252. }
  253. if err != nil {
  254. return nil, err
  255. }
  256. return newBucket(resp)
  257. }
  258. func (c *httpStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
  259. s := callSettings(c.settings, opts...)
  260. rb := uattrs.toRawBucket()
  261. req := c.raw.Buckets.Patch(bucket, rb).Projection("full")
  262. setClientHeader(req.Header())
  263. err := applyBucketConds("httpStorageClient.UpdateBucket", conds, req)
  264. if err != nil {
  265. return nil, err
  266. }
  267. if s.userProject != "" {
  268. req.UserProject(s.userProject)
  269. }
  270. if uattrs != nil && uattrs.PredefinedACL != "" {
  271. req.PredefinedAcl(uattrs.PredefinedACL)
  272. }
  273. if uattrs != nil && uattrs.PredefinedDefaultObjectACL != "" {
  274. req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL)
  275. }
  276. var rawBucket *raw.Bucket
  277. err = run(ctx, func(ctx context.Context) error {
  278. rawBucket, err = req.Context(ctx).Do()
  279. return err
  280. }, s.retry, s.idempotent)
  281. if err != nil {
  282. return nil, err
  283. }
  284. return newBucket(rawBucket)
  285. }
  286. func (c *httpStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
  287. s := callSettings(c.settings, opts...)
  288. var metageneration int64
  289. if conds != nil {
  290. metageneration = conds.MetagenerationMatch
  291. }
  292. req := c.raw.Buckets.LockRetentionPolicy(bucket, metageneration)
  293. return run(ctx, func(ctx context.Context) error {
  294. _, err := req.Context(ctx).Do()
  295. return err
  296. }, s.retry, s.idempotent)
  297. }
  298. func (c *httpStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator {
  299. s := callSettings(c.settings, opts...)
  300. it := &ObjectIterator{
  301. ctx: ctx,
  302. }
  303. if q != nil {
  304. it.query = *q
  305. }
  306. fetch := func(pageSize int, pageToken string) (string, error) {
  307. req := c.raw.Objects.List(bucket)
  308. setClientHeader(req.Header())
  309. projection := it.query.Projection
  310. if projection == ProjectionDefault {
  311. projection = ProjectionFull
  312. }
  313. req.Projection(projection.String())
  314. req.Delimiter(it.query.Delimiter)
  315. req.Prefix(it.query.Prefix)
  316. req.StartOffset(it.query.StartOffset)
  317. req.EndOffset(it.query.EndOffset)
  318. req.Versions(it.query.Versions)
  319. req.IncludeTrailingDelimiter(it.query.IncludeTrailingDelimiter)
  320. req.MatchGlob(it.query.MatchGlob)
  321. req.IncludeFoldersAsPrefixes(it.query.IncludeFoldersAsPrefixes)
  322. if selection := it.query.toFieldSelection(); selection != "" {
  323. req.Fields("nextPageToken", googleapi.Field(selection))
  324. }
  325. req.PageToken(pageToken)
  326. if s.userProject != "" {
  327. req.UserProject(s.userProject)
  328. }
  329. if pageSize > 0 {
  330. req.MaxResults(int64(pageSize))
  331. }
  332. var resp *raw.Objects
  333. var err error
  334. err = run(it.ctx, func(ctx context.Context) error {
  335. resp, err = req.Context(ctx).Do()
  336. return err
  337. }, s.retry, s.idempotent)
  338. if err != nil {
  339. var e *googleapi.Error
  340. if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
  341. err = ErrBucketNotExist
  342. }
  343. return "", err
  344. }
  345. for _, item := range resp.Items {
  346. it.items = append(it.items, newObject(item))
  347. }
  348. for _, prefix := range resp.Prefixes {
  349. it.items = append(it.items, &ObjectAttrs{Prefix: prefix})
  350. }
  351. return resp.NextPageToken, nil
  352. }
  353. it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  354. fetch,
  355. func() int { return len(it.items) },
  356. func() interface{} { b := it.items; it.items = nil; return b })
  357. return it
  358. }
  359. // Object metadata methods.
  360. func (c *httpStorageClient) DeleteObject(ctx context.Context, bucket, object string, gen int64, conds *Conditions, opts ...storageOption) error {
  361. s := callSettings(c.settings, opts...)
  362. req := c.raw.Objects.Delete(bucket, object).Context(ctx)
  363. if err := applyConds("Delete", gen, conds, req); err != nil {
  364. return err
  365. }
  366. if s.userProject != "" {
  367. req.UserProject(s.userProject)
  368. }
  369. err := run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
  370. var e *googleapi.Error
  371. if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
  372. return ErrObjectNotExist
  373. }
  374. return err
  375. }
  376. func (c *httpStorageClient) GetObject(ctx context.Context, bucket, object string, gen int64, encryptionKey []byte, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) {
  377. s := callSettings(c.settings, opts...)
  378. req := c.raw.Objects.Get(bucket, object).Projection("full").Context(ctx)
  379. if err := applyConds("Attrs", gen, conds, req); err != nil {
  380. return nil, err
  381. }
  382. if s.userProject != "" {
  383. req.UserProject(s.userProject)
  384. }
  385. if err := setEncryptionHeaders(req.Header(), encryptionKey, false); err != nil {
  386. return nil, err
  387. }
  388. var obj *raw.Object
  389. var err error
  390. err = run(ctx, func(ctx context.Context) error {
  391. obj, err = req.Context(ctx).Do()
  392. return err
  393. }, s.retry, s.idempotent)
  394. var e *googleapi.Error
  395. if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
  396. return nil, ErrObjectNotExist
  397. }
  398. if err != nil {
  399. return nil, err
  400. }
  401. return newObject(obj), nil
  402. }
  403. func (c *httpStorageClient) UpdateObject(ctx context.Context, params *updateObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
  404. uattrs := params.uattrs
  405. s := callSettings(c.settings, opts...)
  406. var attrs ObjectAttrs
  407. // Lists of fields to send, and set to null, in the JSON.
  408. var forceSendFields, nullFields []string
  409. if uattrs.ContentType != nil {
  410. attrs.ContentType = optional.ToString(uattrs.ContentType)
  411. // For ContentType, sending the empty string is a no-op.
  412. // Instead we send a null.
  413. if attrs.ContentType == "" {
  414. nullFields = append(nullFields, "ContentType")
  415. } else {
  416. forceSendFields = append(forceSendFields, "ContentType")
  417. }
  418. }
  419. if uattrs.ContentLanguage != nil {
  420. attrs.ContentLanguage = optional.ToString(uattrs.ContentLanguage)
  421. // For ContentLanguage it's an error to send the empty string.
  422. // Instead we send a null.
  423. if attrs.ContentLanguage == "" {
  424. nullFields = append(nullFields, "ContentLanguage")
  425. } else {
  426. forceSendFields = append(forceSendFields, "ContentLanguage")
  427. }
  428. }
  429. if uattrs.ContentEncoding != nil {
  430. attrs.ContentEncoding = optional.ToString(uattrs.ContentEncoding)
  431. forceSendFields = append(forceSendFields, "ContentEncoding")
  432. }
  433. if uattrs.ContentDisposition != nil {
  434. attrs.ContentDisposition = optional.ToString(uattrs.ContentDisposition)
  435. forceSendFields = append(forceSendFields, "ContentDisposition")
  436. }
  437. if uattrs.CacheControl != nil {
  438. attrs.CacheControl = optional.ToString(uattrs.CacheControl)
  439. forceSendFields = append(forceSendFields, "CacheControl")
  440. }
  441. if uattrs.EventBasedHold != nil {
  442. attrs.EventBasedHold = optional.ToBool(uattrs.EventBasedHold)
  443. forceSendFields = append(forceSendFields, "EventBasedHold")
  444. }
  445. if uattrs.TemporaryHold != nil {
  446. attrs.TemporaryHold = optional.ToBool(uattrs.TemporaryHold)
  447. forceSendFields = append(forceSendFields, "TemporaryHold")
  448. }
  449. if !uattrs.CustomTime.IsZero() {
  450. attrs.CustomTime = uattrs.CustomTime
  451. forceSendFields = append(forceSendFields, "CustomTime")
  452. }
  453. if uattrs.Metadata != nil {
  454. attrs.Metadata = uattrs.Metadata
  455. if len(attrs.Metadata) == 0 {
  456. // Sending the empty map is a no-op. We send null instead.
  457. nullFields = append(nullFields, "Metadata")
  458. } else {
  459. forceSendFields = append(forceSendFields, "Metadata")
  460. }
  461. }
  462. if uattrs.ACL != nil {
  463. attrs.ACL = uattrs.ACL
  464. // It's an error to attempt to delete the ACL, so
  465. // we don't append to nullFields here.
  466. forceSendFields = append(forceSendFields, "Acl")
  467. }
  468. if uattrs.Retention != nil {
  469. // For ObjectRetention it's an error to send empty fields.
  470. // Instead we send a null as the user's intention is to remove.
  471. if uattrs.Retention.Mode == "" && uattrs.Retention.RetainUntil.IsZero() {
  472. nullFields = append(nullFields, "Retention")
  473. } else {
  474. attrs.Retention = uattrs.Retention
  475. forceSendFields = append(forceSendFields, "Retention")
  476. }
  477. }
  478. rawObj := attrs.toRawObject(params.bucket)
  479. rawObj.ForceSendFields = forceSendFields
  480. rawObj.NullFields = nullFields
  481. call := c.raw.Objects.Patch(params.bucket, params.object, rawObj).Projection("full")
  482. if err := applyConds("Update", params.gen, params.conds, call); err != nil {
  483. return nil, err
  484. }
  485. if s.userProject != "" {
  486. call.UserProject(s.userProject)
  487. }
  488. if uattrs.PredefinedACL != "" {
  489. call.PredefinedAcl(uattrs.PredefinedACL)
  490. }
  491. if err := setEncryptionHeaders(call.Header(), params.encryptionKey, false); err != nil {
  492. return nil, err
  493. }
  494. if params.overrideRetention != nil {
  495. call.OverrideUnlockedRetention(*params.overrideRetention)
  496. }
  497. var obj *raw.Object
  498. var err error
  499. err = run(ctx, func(ctx context.Context) error { obj, err = call.Context(ctx).Do(); return err }, s.retry, s.idempotent)
  500. var e *googleapi.Error
  501. if errors.As(err, &e) && e.Code == http.StatusNotFound {
  502. return nil, ErrObjectNotExist
  503. }
  504. if err != nil {
  505. return nil, err
  506. }
  507. return newObject(obj), nil
  508. }
  509. // Default Object ACL methods.
  510. func (c *httpStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
  511. s := callSettings(c.settings, opts...)
  512. req := c.raw.DefaultObjectAccessControls.Delete(bucket, string(entity))
  513. configureACLCall(ctx, s.userProject, req)
  514. return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
  515. }
  516. func (c *httpStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
  517. s := callSettings(c.settings, opts...)
  518. var acls *raw.ObjectAccessControls
  519. var err error
  520. req := c.raw.DefaultObjectAccessControls.List(bucket)
  521. configureACLCall(ctx, s.userProject, req)
  522. err = run(ctx, func(ctx context.Context) error {
  523. acls, err = req.Context(ctx).Do()
  524. return err
  525. }, s.retry, true)
  526. if err != nil {
  527. return nil, err
  528. }
  529. return toObjectACLRules(acls.Items), nil
  530. }
  531. func (c *httpStorageClient) UpdateDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
  532. s := callSettings(c.settings, opts...)
  533. type setRequest interface {
  534. Do(opts ...googleapi.CallOption) (*raw.ObjectAccessControl, error)
  535. Header() http.Header
  536. }
  537. acl := &raw.ObjectAccessControl{
  538. Bucket: bucket,
  539. Entity: string(entity),
  540. Role: string(role),
  541. }
  542. var err error
  543. req := c.raw.DefaultObjectAccessControls.Update(bucket, string(entity), acl)
  544. configureACLCall(ctx, s.userProject, req)
  545. return run(ctx, func(ctx context.Context) error {
  546. _, err = req.Context(ctx).Do()
  547. return err
  548. }, s.retry, s.idempotent)
  549. }
  550. // Bucket ACL methods.
  551. func (c *httpStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
  552. s := callSettings(c.settings, opts...)
  553. req := c.raw.BucketAccessControls.Delete(bucket, string(entity))
  554. configureACLCall(ctx, s.userProject, req)
  555. return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
  556. }
  557. func (c *httpStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
  558. s := callSettings(c.settings, opts...)
  559. var acls *raw.BucketAccessControls
  560. var err error
  561. req := c.raw.BucketAccessControls.List(bucket)
  562. configureACLCall(ctx, s.userProject, req)
  563. err = run(ctx, func(ctx context.Context) error {
  564. acls, err = req.Context(ctx).Do()
  565. return err
  566. }, s.retry, true)
  567. if err != nil {
  568. return nil, err
  569. }
  570. return toBucketACLRules(acls.Items), nil
  571. }
  572. func (c *httpStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
  573. s := callSettings(c.settings, opts...)
  574. acl := &raw.BucketAccessControl{
  575. Bucket: bucket,
  576. Entity: string(entity),
  577. Role: string(role),
  578. }
  579. req := c.raw.BucketAccessControls.Update(bucket, string(entity), acl)
  580. configureACLCall(ctx, s.userProject, req)
  581. var err error
  582. return run(ctx, func(ctx context.Context) error {
  583. _, err = req.Context(ctx).Do()
  584. return err
  585. }, s.retry, s.idempotent)
  586. }
  587. // configureACLCall sets the context, user project and headers on the apiary library call.
  588. // This will panic if the call does not have the correct methods.
  589. func configureACLCall(ctx context.Context, userProject string, call interface{ Header() http.Header }) {
  590. vc := reflect.ValueOf(call)
  591. vc.MethodByName("Context").Call([]reflect.Value{reflect.ValueOf(ctx)})
  592. if userProject != "" {
  593. vc.MethodByName("UserProject").Call([]reflect.Value{reflect.ValueOf(userProject)})
  594. }
  595. setClientHeader(call.Header())
  596. }
  597. // Object ACL methods.
  598. func (c *httpStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error {
  599. s := callSettings(c.settings, opts...)
  600. req := c.raw.ObjectAccessControls.Delete(bucket, object, string(entity))
  601. configureACLCall(ctx, s.userProject, req)
  602. return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
  603. }
  604. // ListObjectACLs retrieves object ACL entries. By default, it operates on the latest generation of this object.
  605. // Selecting a specific generation of this object is not currently supported by the client.
  606. func (c *httpStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) {
  607. s := callSettings(c.settings, opts...)
  608. var acls *raw.ObjectAccessControls
  609. var err error
  610. req := c.raw.ObjectAccessControls.List(bucket, object)
  611. configureACLCall(ctx, s.userProject, req)
  612. err = run(ctx, func(ctx context.Context) error {
  613. acls, err = req.Context(ctx).Do()
  614. return err
  615. }, s.retry, s.idempotent)
  616. if err != nil {
  617. return nil, err
  618. }
  619. return toObjectACLRules(acls.Items), nil
  620. }
  621. func (c *httpStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
  622. s := callSettings(c.settings, opts...)
  623. type setRequest interface {
  624. Do(opts ...googleapi.CallOption) (*raw.ObjectAccessControl, error)
  625. Header() http.Header
  626. }
  627. acl := &raw.ObjectAccessControl{
  628. Bucket: bucket,
  629. Entity: string(entity),
  630. Role: string(role),
  631. }
  632. var err error
  633. req := c.raw.ObjectAccessControls.Update(bucket, object, string(entity), acl)
  634. configureACLCall(ctx, s.userProject, req)
  635. return run(ctx, func(ctx context.Context) error {
  636. _, err = req.Context(ctx).Do()
  637. return err
  638. }, s.retry, s.idempotent)
  639. }
  640. // Media operations.
  641. func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) {
  642. s := callSettings(c.settings, opts...)
  643. rawReq := &raw.ComposeRequest{}
  644. // Compose requires a non-empty Destination, so we always set it,
  645. // even if the caller-provided ObjectAttrs is the zero value.
  646. rawReq.Destination = req.dstObject.attrs.toRawObject(req.dstBucket)
  647. if req.sendCRC32C {
  648. rawReq.Destination.Crc32c = encodeUint32(req.dstObject.attrs.CRC32C)
  649. }
  650. for _, src := range req.srcs {
  651. srcObj := &raw.ComposeRequestSourceObjects{
  652. Name: src.name,
  653. }
  654. if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
  655. return nil, err
  656. }
  657. rawReq.SourceObjects = append(rawReq.SourceObjects, srcObj)
  658. }
  659. call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq)
  660. if err := applyConds("ComposeFrom destination", defaultGen, req.dstObject.conds, call); err != nil {
  661. return nil, err
  662. }
  663. if s.userProject != "" {
  664. call.UserProject(s.userProject)
  665. }
  666. if req.predefinedACL != "" {
  667. call.DestinationPredefinedAcl(req.predefinedACL)
  668. }
  669. if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
  670. return nil, err
  671. }
  672. var obj *raw.Object
  673. setClientHeader(call.Header())
  674. var err error
  675. retryCall := func(ctx context.Context) error { obj, err = call.Context(ctx).Do(); return err }
  676. if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
  677. return nil, err
  678. }
  679. return newObject(obj), nil
  680. }
  681. func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
  682. s := callSettings(c.settings, opts...)
  683. rawObject := req.dstObject.attrs.toRawObject("")
  684. call := c.raw.Objects.Rewrite(req.srcObject.bucket, req.srcObject.name, req.dstObject.bucket, req.dstObject.name, rawObject)
  685. call.Projection("full")
  686. if req.token != "" {
  687. call.RewriteToken(req.token)
  688. }
  689. if req.dstObject.keyName != "" {
  690. call.DestinationKmsKeyName(req.dstObject.keyName)
  691. }
  692. if req.predefinedACL != "" {
  693. call.DestinationPredefinedAcl(req.predefinedACL)
  694. }
  695. if err := applyConds("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
  696. return nil, err
  697. }
  698. if err := applySourceConds(req.srcObject.gen, req.srcObject.conds, call); err != nil {
  699. return nil, err
  700. }
  701. if s.userProject != "" {
  702. call.UserProject(s.userProject)
  703. }
  704. // Set destination encryption headers.
  705. if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
  706. return nil, err
  707. }
  708. // Set source encryption headers.
  709. if err := setEncryptionHeaders(call.Header(), req.srcObject.encryptionKey, true); err != nil {
  710. return nil, err
  711. }
  712. if req.maxBytesRewrittenPerCall != 0 {
  713. call.MaxBytesRewrittenPerCall(req.maxBytesRewrittenPerCall)
  714. }
  715. var res *raw.RewriteResponse
  716. var err error
  717. setClientHeader(call.Header())
  718. retryCall := func(ctx context.Context) error { res, err = call.Context(ctx).Do(); return err }
  719. if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
  720. return nil, err
  721. }
  722. r := &rewriteObjectResponse{
  723. done: res.Done,
  724. written: res.TotalBytesRewritten,
  725. size: res.ObjectSize,
  726. token: res.RewriteToken,
  727. resource: newObject(res.Resource),
  728. }
  729. return r, nil
  730. }
  731. func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
  732. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.NewRangeReader")
  733. defer func() { trace.EndSpan(ctx, err) }()
  734. s := callSettings(c.settings, opts...)
  735. if c.config.useJSONforReads {
  736. return c.newRangeReaderJSON(ctx, params, s)
  737. }
  738. return c.newRangeReaderXML(ctx, params, s)
  739. }
  740. func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRangeReaderParams, s *settings) (r *Reader, err error) {
  741. u := &url.URL{
  742. Scheme: c.scheme,
  743. Host: c.xmlHost,
  744. Path: fmt.Sprintf("/%s/%s", params.bucket, params.object),
  745. RawPath: fmt.Sprintf("/%s/%s", params.bucket, url.PathEscape(params.object)),
  746. }
  747. verb := "GET"
  748. if params.length == 0 {
  749. verb = "HEAD"
  750. }
  751. req, err := http.NewRequest(verb, u.String(), nil)
  752. if err != nil {
  753. return nil, err
  754. }
  755. if s.userProject != "" {
  756. req.Header.Set("X-Goog-User-Project", s.userProject)
  757. }
  758. if err := setRangeReaderHeaders(req.Header, params); err != nil {
  759. return nil, err
  760. }
  761. // Set custom headers passed in via the context. This is only required for XML;
  762. // for gRPC & JSON this is handled in the GAPIC and Apiary layers respectively.
  763. ctxHeaders := callctx.HeadersFromContext(ctx)
  764. for k, vals := range ctxHeaders {
  765. for _, v := range vals {
  766. req.Header.Add(k, v)
  767. }
  768. }
  769. reopen := readerReopen(ctx, req.Header, params, s,
  770. func(ctx context.Context) (*http.Response, error) { return c.hc.Do(req.WithContext(ctx)) },
  771. func() error { return setConditionsHeaders(req.Header, params.conds) },
  772. func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) })
  773. res, err := reopen(0)
  774. if err != nil {
  775. return nil, err
  776. }
  777. return parseReadResponse(res, params, reopen)
  778. }
  779. func (c *httpStorageClient) newRangeReaderJSON(ctx context.Context, params *newRangeReaderParams, s *settings) (r *Reader, err error) {
  780. call := c.raw.Objects.Get(params.bucket, params.object)
  781. setClientHeader(call.Header())
  782. call.Projection("full")
  783. if s.userProject != "" {
  784. call.UserProject(s.userProject)
  785. }
  786. if err := setRangeReaderHeaders(call.Header(), params); err != nil {
  787. return nil, err
  788. }
  789. reopen := readerReopen(ctx, call.Header(), params, s, func(ctx context.Context) (*http.Response, error) { return call.Context(ctx).Download() },
  790. func() error { return applyConds("NewReader", params.gen, params.conds, call) },
  791. func() { call.Generation(params.gen) })
  792. res, err := reopen(0)
  793. if err != nil {
  794. return nil, err
  795. }
  796. return parseReadResponse(res, params, reopen)
  797. }
  798. func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
  799. s := callSettings(c.settings, opts...)
  800. errorf := params.setError
  801. setObj := params.setObj
  802. progress := params.progress
  803. attrs := params.attrs
  804. mediaOpts := []googleapi.MediaOption{
  805. googleapi.ChunkSize(params.chunkSize),
  806. }
  807. if c := attrs.ContentType; c != "" || params.forceEmptyContentType {
  808. mediaOpts = append(mediaOpts, googleapi.ContentType(c))
  809. }
  810. if params.chunkRetryDeadline != 0 {
  811. mediaOpts = append(mediaOpts, googleapi.ChunkRetryDeadline(params.chunkRetryDeadline))
  812. }
  813. pr, pw := io.Pipe()
  814. go func() {
  815. defer close(params.donec)
  816. rawObj := attrs.toRawObject(params.bucket)
  817. if params.sendCRC32C {
  818. rawObj.Crc32c = encodeUint32(attrs.CRC32C)
  819. }
  820. if attrs.MD5 != nil {
  821. rawObj.Md5Hash = base64.StdEncoding.EncodeToString(attrs.MD5)
  822. }
  823. call := c.raw.Objects.Insert(params.bucket, rawObj).
  824. Media(pr, mediaOpts...).
  825. Projection("full").
  826. Context(params.ctx).
  827. Name(params.attrs.Name)
  828. call.ProgressUpdater(func(n, _ int64) { progress(n) })
  829. if attrs.KMSKeyName != "" {
  830. call.KmsKeyName(attrs.KMSKeyName)
  831. }
  832. if attrs.PredefinedACL != "" {
  833. call.PredefinedAcl(attrs.PredefinedACL)
  834. }
  835. if err := setEncryptionHeaders(call.Header(), params.encryptionKey, false); err != nil {
  836. errorf(err)
  837. pr.CloseWithError(err)
  838. return
  839. }
  840. var resp *raw.Object
  841. err := applyConds("NewWriter", defaultGen, params.conds, call)
  842. if err == nil {
  843. if s.userProject != "" {
  844. call.UserProject(s.userProject)
  845. }
  846. // TODO(tritone): Remove this code when Uploads begin to support
  847. // retry attempt header injection with "client header" injection.
  848. setClientHeader(call.Header())
  849. // The internals that perform call.Do automatically retry both the initial
  850. // call to set up the upload as well as calls to upload individual chunks
  851. // for a resumable upload (as long as the chunk size is non-zero). Hence
  852. // there is no need to add retries here.
  853. // Retry only when the operation is idempotent or the retry policy is RetryAlways.
  854. var useRetry bool
  855. if (s.retry == nil || s.retry.policy == RetryIdempotent) && s.idempotent {
  856. useRetry = true
  857. } else if s.retry != nil && s.retry.policy == RetryAlways {
  858. useRetry = true
  859. }
  860. if useRetry {
  861. if s.retry != nil {
  862. call.WithRetry(s.retry.backoff, s.retry.shouldRetry)
  863. } else {
  864. call.WithRetry(nil, nil)
  865. }
  866. }
  867. resp, err = call.Do()
  868. }
  869. if err != nil {
  870. errorf(err)
  871. pr.CloseWithError(err)
  872. return
  873. }
  874. setObj(newObject(resp))
  875. }()
  876. return pw, nil
  877. }
  878. // IAM methods.
  879. func (c *httpStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) {
  880. s := callSettings(c.settings, opts...)
  881. call := c.raw.Buckets.GetIamPolicy(resource).OptionsRequestedPolicyVersion(int64(version))
  882. setClientHeader(call.Header())
  883. if s.userProject != "" {
  884. call.UserProject(s.userProject)
  885. }
  886. var rp *raw.Policy
  887. err := run(ctx, func(ctx context.Context) error {
  888. var err error
  889. rp, err = call.Context(ctx).Do()
  890. return err
  891. }, s.retry, s.idempotent)
  892. if err != nil {
  893. return nil, err
  894. }
  895. return iamFromStoragePolicy(rp), nil
  896. }
  897. func (c *httpStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error {
  898. s := callSettings(c.settings, opts...)
  899. rp := iamToStoragePolicy(policy)
  900. call := c.raw.Buckets.SetIamPolicy(resource, rp)
  901. setClientHeader(call.Header())
  902. if s.userProject != "" {
  903. call.UserProject(s.userProject)
  904. }
  905. return run(ctx, func(ctx context.Context) error {
  906. _, err := call.Context(ctx).Do()
  907. return err
  908. }, s.retry, s.idempotent)
  909. }
  910. func (c *httpStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) {
  911. s := callSettings(c.settings, opts...)
  912. call := c.raw.Buckets.TestIamPermissions(resource, permissions)
  913. setClientHeader(call.Header())
  914. if s.userProject != "" {
  915. call.UserProject(s.userProject)
  916. }
  917. var res *raw.TestIamPermissionsResponse
  918. err := run(ctx, func(ctx context.Context) error {
  919. var err error
  920. res, err = call.Context(ctx).Do()
  921. return err
  922. }, s.retry, s.idempotent)
  923. if err != nil {
  924. return nil, err
  925. }
  926. return res.Permissions, nil
  927. }
  928. // HMAC Key methods.
  929. func (c *httpStorageClient) GetHMACKey(ctx context.Context, project, accessID string, opts ...storageOption) (*HMACKey, error) {
  930. s := callSettings(c.settings, opts...)
  931. call := c.raw.Projects.HmacKeys.Get(project, accessID)
  932. if s.userProject != "" {
  933. call = call.UserProject(s.userProject)
  934. }
  935. var metadata *raw.HmacKeyMetadata
  936. var err error
  937. if err := run(ctx, func(ctx context.Context) error {
  938. metadata, err = call.Context(ctx).Do()
  939. return err
  940. }, s.retry, s.idempotent); err != nil {
  941. return nil, err
  942. }
  943. hk := &raw.HmacKey{
  944. Metadata: metadata,
  945. }
  946. return toHMACKeyFromRaw(hk, false)
  947. }
  948. func (c *httpStorageClient) ListHMACKeys(ctx context.Context, project, serviceAccountEmail string, showDeletedKeys bool, opts ...storageOption) *HMACKeysIterator {
  949. s := callSettings(c.settings, opts...)
  950. it := &HMACKeysIterator{
  951. ctx: ctx,
  952. raw: c.raw.Projects.HmacKeys,
  953. projectID: project,
  954. retry: s.retry,
  955. }
  956. fetch := func(pageSize int, pageToken string) (token string, err error) {
  957. call := c.raw.Projects.HmacKeys.List(project)
  958. setClientHeader(call.Header())
  959. if pageToken != "" {
  960. call = call.PageToken(pageToken)
  961. }
  962. if pageSize > 0 {
  963. call = call.MaxResults(int64(pageSize))
  964. }
  965. if showDeletedKeys {
  966. call = call.ShowDeletedKeys(true)
  967. }
  968. if s.userProject != "" {
  969. call = call.UserProject(s.userProject)
  970. }
  971. if serviceAccountEmail != "" {
  972. call = call.ServiceAccountEmail(serviceAccountEmail)
  973. }
  974. var resp *raw.HmacKeysMetadata
  975. err = run(it.ctx, func(ctx context.Context) error {
  976. resp, err = call.Context(ctx).Do()
  977. return err
  978. }, s.retry, s.idempotent)
  979. if err != nil {
  980. return "", err
  981. }
  982. for _, metadata := range resp.Items {
  983. hk := &raw.HmacKey{
  984. Metadata: metadata,
  985. }
  986. hkey, err := toHMACKeyFromRaw(hk, true)
  987. if err != nil {
  988. return "", err
  989. }
  990. it.hmacKeys = append(it.hmacKeys, hkey)
  991. }
  992. return resp.NextPageToken, nil
  993. }
  994. it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  995. fetch,
  996. func() int { return len(it.hmacKeys) - it.index },
  997. func() interface{} {
  998. prev := it.hmacKeys
  999. it.hmacKeys = it.hmacKeys[:0]
  1000. it.index = 0
  1001. return prev
  1002. })
  1003. return it
  1004. }
  1005. func (c *httpStorageClient) UpdateHMACKey(ctx context.Context, project, serviceAccountEmail, accessID string, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) {
  1006. s := callSettings(c.settings, opts...)
  1007. call := c.raw.Projects.HmacKeys.Update(project, accessID, &raw.HmacKeyMetadata{
  1008. Etag: attrs.Etag,
  1009. State: string(attrs.State),
  1010. })
  1011. if s.userProject != "" {
  1012. call = call.UserProject(s.userProject)
  1013. }
  1014. var metadata *raw.HmacKeyMetadata
  1015. var err error
  1016. if err := run(ctx, func(ctx context.Context) error {
  1017. metadata, err = call.Context(ctx).Do()
  1018. return err
  1019. }, s.retry, s.idempotent); err != nil {
  1020. return nil, err
  1021. }
  1022. hk := &raw.HmacKey{
  1023. Metadata: metadata,
  1024. }
  1025. return toHMACKeyFromRaw(hk, false)
  1026. }
  1027. func (c *httpStorageClient) CreateHMACKey(ctx context.Context, project, serviceAccountEmail string, opts ...storageOption) (*HMACKey, error) {
  1028. s := callSettings(c.settings, opts...)
  1029. call := c.raw.Projects.HmacKeys.Create(project, serviceAccountEmail)
  1030. if s.userProject != "" {
  1031. call = call.UserProject(s.userProject)
  1032. }
  1033. var hk *raw.HmacKey
  1034. if err := run(ctx, func(ctx context.Context) error {
  1035. h, err := call.Context(ctx).Do()
  1036. hk = h
  1037. return err
  1038. }, s.retry, s.idempotent); err != nil {
  1039. return nil, err
  1040. }
  1041. return toHMACKeyFromRaw(hk, true)
  1042. }
  1043. func (c *httpStorageClient) DeleteHMACKey(ctx context.Context, project string, accessID string, opts ...storageOption) error {
  1044. s := callSettings(c.settings, opts...)
  1045. call := c.raw.Projects.HmacKeys.Delete(project, accessID)
  1046. if s.userProject != "" {
  1047. call = call.UserProject(s.userProject)
  1048. }
  1049. return run(ctx, func(ctx context.Context) error {
  1050. return call.Context(ctx).Do()
  1051. }, s.retry, s.idempotent)
  1052. }
  1053. // Notification methods.
  1054. // ListNotifications returns all the Notifications configured for this bucket, as a map indexed by notification ID.
  1055. //
  1056. // Note: This API does not support pagination. However, entity limits cap the number of notifications on a single bucket,
  1057. // so all results will be returned in the first response. See https://cloud.google.com/storage/quotas#buckets.
  1058. func (c *httpStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
  1059. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.ListNotifications")
  1060. defer func() { trace.EndSpan(ctx, err) }()
  1061. s := callSettings(c.settings, opts...)
  1062. call := c.raw.Notifications.List(bucket)
  1063. if s.userProject != "" {
  1064. call.UserProject(s.userProject)
  1065. }
  1066. var res *raw.Notifications
  1067. err = run(ctx, func(ctx context.Context) error {
  1068. res, err = call.Context(ctx).Do()
  1069. return err
  1070. }, s.retry, true)
  1071. if err != nil {
  1072. return nil, err
  1073. }
  1074. return notificationsToMap(res.Items), nil
  1075. }
  1076. func (c *httpStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
  1077. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.CreateNotification")
  1078. defer func() { trace.EndSpan(ctx, err) }()
  1079. s := callSettings(c.settings, opts...)
  1080. call := c.raw.Notifications.Insert(bucket, toRawNotification(n))
  1081. if s.userProject != "" {
  1082. call.UserProject(s.userProject)
  1083. }
  1084. var rn *raw.Notification
  1085. err = run(ctx, func(ctx context.Context) error {
  1086. rn, err = call.Context(ctx).Do()
  1087. return err
  1088. }, s.retry, s.idempotent)
  1089. if err != nil {
  1090. return nil, err
  1091. }
  1092. return toNotification(rn), nil
  1093. }
  1094. func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
  1095. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.DeleteNotification")
  1096. defer func() { trace.EndSpan(ctx, err) }()
  1097. s := callSettings(c.settings, opts...)
  1098. call := c.raw.Notifications.Delete(bucket, id)
  1099. if s.userProject != "" {
  1100. call.UserProject(s.userProject)
  1101. }
  1102. return run(ctx, func(ctx context.Context) error {
  1103. return call.Context(ctx).Do()
  1104. }, s.retry, s.idempotent)
  1105. }
  1106. type httpReader struct {
  1107. body io.ReadCloser
  1108. seen int64
  1109. reopen func(seen int64) (*http.Response, error)
  1110. }
  1111. func (r *httpReader) Read(p []byte) (int, error) {
  1112. n := 0
  1113. for len(p[n:]) > 0 {
  1114. m, err := r.body.Read(p[n:])
  1115. n += m
  1116. r.seen += int64(m)
  1117. if err == nil || err == io.EOF {
  1118. return n, err
  1119. }
  1120. // Read failed (likely due to connection issues), but we will try to reopen
  1121. // the pipe and continue. Send a ranged read request that takes into account
  1122. // the number of bytes we've already seen.
  1123. res, err := r.reopen(r.seen)
  1124. if err != nil {
  1125. // reopen already retries
  1126. return n, err
  1127. }
  1128. r.body.Close()
  1129. r.body = res.Body
  1130. }
  1131. return n, nil
  1132. }
  1133. func (r *httpReader) Close() error {
  1134. return r.body.Close()
  1135. }
  1136. func setRangeReaderHeaders(h http.Header, params *newRangeReaderParams) error {
  1137. if params.readCompressed {
  1138. h.Set("Accept-Encoding", "gzip")
  1139. }
  1140. if err := setEncryptionHeaders(h, params.encryptionKey, false); err != nil {
  1141. return err
  1142. }
  1143. return nil
  1144. }
  1145. // readerReopen initiates a Read with offset and length, assuming we
  1146. // have already read seen bytes.
  1147. func readerReopen(ctx context.Context, header http.Header, params *newRangeReaderParams, s *settings,
  1148. doDownload func(context.Context) (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) {
  1149. return func(seen int64) (*http.Response, error) {
  1150. // If the context has already expired, return immediately without making a
  1151. // call.
  1152. if err := ctx.Err(); err != nil {
  1153. return nil, err
  1154. }
  1155. start := params.offset + seen
  1156. if params.length < 0 && start < 0 {
  1157. header.Set("Range", fmt.Sprintf("bytes=%d", start))
  1158. } else if params.length < 0 && start > 0 {
  1159. header.Set("Range", fmt.Sprintf("bytes=%d-", start))
  1160. } else if params.length > 0 {
  1161. // The end character isn't affected by how many bytes we've seen.
  1162. header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, params.offset+params.length-1))
  1163. }
  1164. // We wait to assign conditions here because the generation number can change in between reopen() runs.
  1165. if err := applyConditions(); err != nil {
  1166. return nil, err
  1167. }
  1168. // If an object generation is specified, include generation as query string parameters.
  1169. if params.gen >= 0 {
  1170. setGeneration()
  1171. }
  1172. var err error
  1173. var res *http.Response
  1174. err = run(ctx, func(ctx context.Context) error {
  1175. res, err = doDownload(ctx)
  1176. if err != nil {
  1177. var e *googleapi.Error
  1178. if errors.As(err, &e) {
  1179. if e.Code == http.StatusNotFound {
  1180. return ErrObjectNotExist
  1181. }
  1182. }
  1183. return err
  1184. }
  1185. if res.StatusCode == http.StatusNotFound {
  1186. // this check is necessary only for XML
  1187. res.Body.Close()
  1188. return ErrObjectNotExist
  1189. }
  1190. if res.StatusCode < 200 || res.StatusCode > 299 {
  1191. body, _ := ioutil.ReadAll(res.Body)
  1192. res.Body.Close()
  1193. return &googleapi.Error{
  1194. Code: res.StatusCode,
  1195. Header: res.Header,
  1196. Body: string(body),
  1197. }
  1198. }
  1199. partialContentNotSatisfied :=
  1200. !decompressiveTranscoding(res) &&
  1201. start > 0 && params.length != 0 &&
  1202. res.StatusCode != http.StatusPartialContent
  1203. if partialContentNotSatisfied {
  1204. res.Body.Close()
  1205. return errors.New("storage: partial request not satisfied")
  1206. }
  1207. // With "Content-Encoding": "gzip" aka decompressive transcoding, GCS serves
  1208. // back the whole file regardless of the range count passed in as per:
  1209. // https://cloud.google.com/storage/docs/transcoding#range,
  1210. // thus we have to manually move the body forward by seen bytes.
  1211. if decompressiveTranscoding(res) && seen > 0 {
  1212. _, _ = io.CopyN(ioutil.Discard, res.Body, seen)
  1213. }
  1214. // If a generation hasn't been specified, and this is the first response we get, let's record the
  1215. // generation. In future requests we'll use this generation as a precondition to avoid data races.
  1216. if params.gen < 0 && res.Header.Get("X-Goog-Generation") != "" {
  1217. gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64)
  1218. if err != nil {
  1219. return err
  1220. }
  1221. params.gen = gen64
  1222. }
  1223. return nil
  1224. }, s.retry, s.idempotent)
  1225. if err != nil {
  1226. return nil, err
  1227. }
  1228. return res, nil
  1229. }
  1230. }
  1231. func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen func(int64) (*http.Response, error)) (*Reader, error) {
  1232. var err error
  1233. var (
  1234. size int64 // total size of object, even if a range was requested.
  1235. checkCRC bool
  1236. crc uint32
  1237. startOffset int64 // non-zero if range request.
  1238. )
  1239. if res.StatusCode == http.StatusPartialContent {
  1240. cr := strings.TrimSpace(res.Header.Get("Content-Range"))
  1241. if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {
  1242. return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
  1243. }
  1244. // Content range is formatted <first byte>-<last byte>/<total size>. We take
  1245. // the total size.
  1246. size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
  1247. if err != nil {
  1248. return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
  1249. }
  1250. dashIndex := strings.Index(cr, "-")
  1251. if dashIndex >= 0 {
  1252. startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64)
  1253. if err != nil {
  1254. return nil, fmt.Errorf("storage: invalid Content-Range %q: %w", cr, err)
  1255. }
  1256. }
  1257. } else {
  1258. size = res.ContentLength
  1259. // Check the CRC iff all of the following hold:
  1260. // - We asked for content (length != 0).
  1261. // - We got all the content (status != PartialContent).
  1262. // - The server sent a CRC header.
  1263. // - The Go http stack did not uncompress the file.
  1264. // - We were not served compressed data that was uncompressed on download.
  1265. // The problem with the last two cases is that the CRC will not match -- GCS
  1266. // computes it on the compressed contents, but we compute it on the
  1267. // uncompressed contents.
  1268. if params.length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
  1269. crc, checkCRC = parseCRC32c(res)
  1270. }
  1271. }
  1272. remain := res.ContentLength
  1273. body := res.Body
  1274. // If the user requested zero bytes, explicitly close and remove the request
  1275. // body.
  1276. if params.length == 0 {
  1277. remain = 0
  1278. body.Close()
  1279. body = emptyBody
  1280. }
  1281. var metaGen int64
  1282. if res.Header.Get("X-Goog-Metageneration") != "" {
  1283. metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64)
  1284. if err != nil {
  1285. return nil, err
  1286. }
  1287. }
  1288. var lm time.Time
  1289. if res.Header.Get("Last-Modified") != "" {
  1290. lm, err = http.ParseTime(res.Header.Get("Last-Modified"))
  1291. if err != nil {
  1292. return nil, err
  1293. }
  1294. }
  1295. attrs := ReaderObjectAttrs{
  1296. Size: size,
  1297. ContentType: res.Header.Get("Content-Type"),
  1298. ContentEncoding: res.Header.Get("Content-Encoding"),
  1299. CacheControl: res.Header.Get("Cache-Control"),
  1300. LastModified: lm,
  1301. StartOffset: startOffset,
  1302. Generation: params.gen,
  1303. Metageneration: metaGen,
  1304. }
  1305. return &Reader{
  1306. Attrs: attrs,
  1307. size: size,
  1308. remain: remain,
  1309. wantCRC: crc,
  1310. checkCRC: checkCRC,
  1311. reader: &httpReader{
  1312. reopen: reopen,
  1313. body: body,
  1314. },
  1315. }, nil
  1316. }