| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445 |
- // Copyright 2022 Google LLC
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package storage
- import (
- "context"
- "encoding/base64"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "reflect"
- "strconv"
- "strings"
- "time"
- "cloud.google.com/go/iam/apiv1/iampb"
- "cloud.google.com/go/internal/optional"
- "cloud.google.com/go/internal/trace"
- "github.com/googleapis/gax-go/v2/callctx"
- "golang.org/x/oauth2/google"
- "google.golang.org/api/googleapi"
- "google.golang.org/api/iterator"
- "google.golang.org/api/option"
- "google.golang.org/api/option/internaloption"
- raw "google.golang.org/api/storage/v1"
- "google.golang.org/api/transport"
- htransport "google.golang.org/api/transport/http"
- )
- // httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
- // storageClient interface.
- type httpStorageClient struct {
- creds *google.Credentials
- hc *http.Client
- xmlHost string
- raw *raw.Service
- scheme string
- settings *settings
- config *storageConfig
- }
- // newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON
- // Storage API.
- func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) {
- s := initSettings(opts...)
- o := s.clientOption
- config := newStorageConfig(o...)
- var creds *google.Credentials
- // In general, it is recommended to use raw.NewService instead of htransport.NewClient
- // since raw.NewService configures the correct default endpoints when initializing the
- // internal http client. However, in our case, "NewRangeReader" in reader.go needs to
- // access the http client directly to make requests, so we create the client manually
- // here so it can be re-used by both reader.go and raw.NewService. This means we need to
- // manually configure the default endpoint options on the http client. Furthermore, we
- // need to account for STORAGE_EMULATOR_HOST override when setting the default endpoints.
- if host := os.Getenv("STORAGE_EMULATOR_HOST"); host == "" {
- // Prepend default options to avoid overriding options passed by the user.
- o = append([]option.ClientOption{option.WithScopes(ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"), option.WithUserAgent(userAgent)}, o...)
- o = append(o, internaloption.WithDefaultEndpointTemplate("https://storage.UNIVERSE_DOMAIN/storage/v1/"),
- internaloption.WithDefaultMTLSEndpoint("https://storage.mtls.googleapis.com/storage/v1/"),
- internaloption.WithDefaultUniverseDomain("googleapis.com"),
- )
- // Don't error out here. The user may have passed in their own HTTP
- // client which does not auth with ADC or other common conventions.
- c, err := transport.Creds(ctx, o...)
- if err == nil {
- creds = c
- o = append(o, internaloption.WithCredentials(creds))
- }
- } else {
- var hostURL *url.URL
- if strings.Contains(host, "://") {
- h, err := url.Parse(host)
- if err != nil {
- return nil, err
- }
- hostURL = h
- } else {
- // Add scheme for user if not supplied in STORAGE_EMULATOR_HOST
- // URL is only parsed correctly if it has a scheme, so we build it ourselves
- hostURL = &url.URL{Scheme: "http", Host: host}
- }
- hostURL.Path = "storage/v1/"
- endpoint := hostURL.String()
- // Append the emulator host as default endpoint for the user
- o = append([]option.ClientOption{option.WithoutAuthentication()}, o...)
- o = append(o, internaloption.WithDefaultEndpoint(endpoint))
- o = append(o, internaloption.WithDefaultMTLSEndpoint(endpoint))
- }
- s.clientOption = o
- // htransport selects the correct endpoint among WithEndpoint (user override), WithDefaultEndpoint, and WithDefaultMTLSEndpoint.
- hc, ep, err := htransport.NewClient(ctx, s.clientOption...)
- if err != nil {
- return nil, fmt.Errorf("dialing: %w", err)
- }
- // RawService should be created with the chosen endpoint to take account of user override.
- rawService, err := raw.NewService(ctx, option.WithEndpoint(ep), option.WithHTTPClient(hc))
- if err != nil {
- return nil, fmt.Errorf("storage client: %w", err)
- }
- // Update xmlHost and scheme with the chosen endpoint.
- u, err := url.Parse(ep)
- if err != nil {
- return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err)
- }
- return &httpStorageClient{
- creds: creds,
- hc: hc,
- xmlHost: u.Host,
- raw: rawService,
- scheme: u.Scheme,
- settings: s,
- config: &config,
- }, nil
- }
- func (c *httpStorageClient) Close() error {
- c.hc.CloseIdleConnections()
- return nil
- }
- // Top-level methods.
- func (c *httpStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) {
- s := callSettings(c.settings, opts...)
- call := c.raw.Projects.ServiceAccount.Get(project)
- var res *raw.ServiceAccount
- err := run(ctx, func(ctx context.Context) error {
- var err error
- res, err = call.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return "", err
- }
- return res.EmailAddress, nil
- }
- func (c *httpStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, enableObjectRetention *bool, opts ...storageOption) (*BucketAttrs, error) {
- s := callSettings(c.settings, opts...)
- var bkt *raw.Bucket
- if attrs != nil {
- bkt = attrs.toRawBucket()
- } else {
- bkt = &raw.Bucket{}
- }
- bkt.Name = bucket
- // If there is lifecycle information but no location, explicitly set
- // the location. This is a GCS quirk/bug.
- if bkt.Location == "" && bkt.Lifecycle != nil {
- bkt.Location = "US"
- }
- req := c.raw.Buckets.Insert(project, bkt)
- setClientHeader(req.Header())
- if attrs != nil && attrs.PredefinedACL != "" {
- req.PredefinedAcl(attrs.PredefinedACL)
- }
- if attrs != nil && attrs.PredefinedDefaultObjectACL != "" {
- req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL)
- }
- if enableObjectRetention != nil {
- req.EnableObjectRetention(*enableObjectRetention)
- }
- var battrs *BucketAttrs
- err := run(ctx, func(ctx context.Context) error {
- b, err := req.Context(ctx).Do()
- if err != nil {
- return err
- }
- battrs, err = newBucket(b)
- return err
- }, s.retry, s.idempotent)
- return battrs, err
- }
- func (c *httpStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator {
- s := callSettings(c.settings, opts...)
- it := &BucketIterator{
- ctx: ctx,
- projectID: project,
- }
- fetch := func(pageSize int, pageToken string) (token string, err error) {
- req := c.raw.Buckets.List(it.projectID)
- setClientHeader(req.Header())
- req.Projection("full")
- req.Prefix(it.Prefix)
- req.PageToken(pageToken)
- if pageSize > 0 {
- req.MaxResults(int64(pageSize))
- }
- var resp *raw.Buckets
- err = run(it.ctx, func(ctx context.Context) error {
- resp, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return "", err
- }
- for _, item := range resp.Items {
- b, err := newBucket(item)
- if err != nil {
- return "", err
- }
- it.buckets = append(it.buckets, b)
- }
- return resp.NextPageToken, nil
- }
- it.pageInfo, it.nextFunc = iterator.NewPageInfo(
- fetch,
- func() int { return len(it.buckets) },
- func() interface{} { b := it.buckets; it.buckets = nil; return b })
- return it
- }
- // Bucket methods.
- func (c *httpStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := c.raw.Buckets.Delete(bucket)
- setClientHeader(req.Header())
- if err := applyBucketConds("httpStorageClient.DeleteBucket", conds, req); err != nil {
- return err
- }
- if s.userProject != "" {
- req.UserProject(s.userProject)
- }
- return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
- }
- func (c *httpStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
- s := callSettings(c.settings, opts...)
- req := c.raw.Buckets.Get(bucket).Projection("full")
- setClientHeader(req.Header())
- err := applyBucketConds("httpStorageClient.GetBucket", conds, req)
- if err != nil {
- return nil, err
- }
- if s.userProject != "" {
- req.UserProject(s.userProject)
- }
- var resp *raw.Bucket
- err = run(ctx, func(ctx context.Context) error {
- resp, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- var e *googleapi.Error
- if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
- return nil, ErrBucketNotExist
- }
- if err != nil {
- return nil, err
- }
- return newBucket(resp)
- }
- func (c *httpStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
- s := callSettings(c.settings, opts...)
- rb := uattrs.toRawBucket()
- req := c.raw.Buckets.Patch(bucket, rb).Projection("full")
- setClientHeader(req.Header())
- err := applyBucketConds("httpStorageClient.UpdateBucket", conds, req)
- if err != nil {
- return nil, err
- }
- if s.userProject != "" {
- req.UserProject(s.userProject)
- }
- if uattrs != nil && uattrs.PredefinedACL != "" {
- req.PredefinedAcl(uattrs.PredefinedACL)
- }
- if uattrs != nil && uattrs.PredefinedDefaultObjectACL != "" {
- req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL)
- }
- var rawBucket *raw.Bucket
- err = run(ctx, func(ctx context.Context) error {
- rawBucket, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return newBucket(rawBucket)
- }
- func (c *httpStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- var metageneration int64
- if conds != nil {
- metageneration = conds.MetagenerationMatch
- }
- req := c.raw.Buckets.LockRetentionPolicy(bucket, metageneration)
- return run(ctx, func(ctx context.Context) error {
- _, err := req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- }
- func (c *httpStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator {
- s := callSettings(c.settings, opts...)
- it := &ObjectIterator{
- ctx: ctx,
- }
- if q != nil {
- it.query = *q
- }
- fetch := func(pageSize int, pageToken string) (string, error) {
- req := c.raw.Objects.List(bucket)
- setClientHeader(req.Header())
- projection := it.query.Projection
- if projection == ProjectionDefault {
- projection = ProjectionFull
- }
- req.Projection(projection.String())
- req.Delimiter(it.query.Delimiter)
- req.Prefix(it.query.Prefix)
- req.StartOffset(it.query.StartOffset)
- req.EndOffset(it.query.EndOffset)
- req.Versions(it.query.Versions)
- req.IncludeTrailingDelimiter(it.query.IncludeTrailingDelimiter)
- req.MatchGlob(it.query.MatchGlob)
- req.IncludeFoldersAsPrefixes(it.query.IncludeFoldersAsPrefixes)
- if selection := it.query.toFieldSelection(); selection != "" {
- req.Fields("nextPageToken", googleapi.Field(selection))
- }
- req.PageToken(pageToken)
- if s.userProject != "" {
- req.UserProject(s.userProject)
- }
- if pageSize > 0 {
- req.MaxResults(int64(pageSize))
- }
- var resp *raw.Objects
- var err error
- err = run(it.ctx, func(ctx context.Context) error {
- resp, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- var e *googleapi.Error
- if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
- err = ErrBucketNotExist
- }
- return "", err
- }
- for _, item := range resp.Items {
- it.items = append(it.items, newObject(item))
- }
- for _, prefix := range resp.Prefixes {
- it.items = append(it.items, &ObjectAttrs{Prefix: prefix})
- }
- return resp.NextPageToken, nil
- }
- it.pageInfo, it.nextFunc = iterator.NewPageInfo(
- fetch,
- func() int { return len(it.items) },
- func() interface{} { b := it.items; it.items = nil; return b })
- return it
- }
- // Object metadata methods.
- func (c *httpStorageClient) DeleteObject(ctx context.Context, bucket, object string, gen int64, conds *Conditions, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := c.raw.Objects.Delete(bucket, object).Context(ctx)
- if err := applyConds("Delete", gen, conds, req); err != nil {
- return err
- }
- if s.userProject != "" {
- req.UserProject(s.userProject)
- }
- err := run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
- var e *googleapi.Error
- if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
- return ErrObjectNotExist
- }
- return err
- }
- func (c *httpStorageClient) GetObject(ctx context.Context, bucket, object string, gen int64, encryptionKey []byte, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) {
- s := callSettings(c.settings, opts...)
- req := c.raw.Objects.Get(bucket, object).Projection("full").Context(ctx)
- if err := applyConds("Attrs", gen, conds, req); err != nil {
- return nil, err
- }
- if s.userProject != "" {
- req.UserProject(s.userProject)
- }
- if err := setEncryptionHeaders(req.Header(), encryptionKey, false); err != nil {
- return nil, err
- }
- var obj *raw.Object
- var err error
- err = run(ctx, func(ctx context.Context) error {
- obj, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- var e *googleapi.Error
- if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
- return nil, ErrObjectNotExist
- }
- if err != nil {
- return nil, err
- }
- return newObject(obj), nil
- }
- func (c *httpStorageClient) UpdateObject(ctx context.Context, params *updateObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
- uattrs := params.uattrs
- s := callSettings(c.settings, opts...)
- var attrs ObjectAttrs
- // Lists of fields to send, and set to null, in the JSON.
- var forceSendFields, nullFields []string
- if uattrs.ContentType != nil {
- attrs.ContentType = optional.ToString(uattrs.ContentType)
- // For ContentType, sending the empty string is a no-op.
- // Instead we send a null.
- if attrs.ContentType == "" {
- nullFields = append(nullFields, "ContentType")
- } else {
- forceSendFields = append(forceSendFields, "ContentType")
- }
- }
- if uattrs.ContentLanguage != nil {
- attrs.ContentLanguage = optional.ToString(uattrs.ContentLanguage)
- // For ContentLanguage it's an error to send the empty string.
- // Instead we send a null.
- if attrs.ContentLanguage == "" {
- nullFields = append(nullFields, "ContentLanguage")
- } else {
- forceSendFields = append(forceSendFields, "ContentLanguage")
- }
- }
- if uattrs.ContentEncoding != nil {
- attrs.ContentEncoding = optional.ToString(uattrs.ContentEncoding)
- forceSendFields = append(forceSendFields, "ContentEncoding")
- }
- if uattrs.ContentDisposition != nil {
- attrs.ContentDisposition = optional.ToString(uattrs.ContentDisposition)
- forceSendFields = append(forceSendFields, "ContentDisposition")
- }
- if uattrs.CacheControl != nil {
- attrs.CacheControl = optional.ToString(uattrs.CacheControl)
- forceSendFields = append(forceSendFields, "CacheControl")
- }
- if uattrs.EventBasedHold != nil {
- attrs.EventBasedHold = optional.ToBool(uattrs.EventBasedHold)
- forceSendFields = append(forceSendFields, "EventBasedHold")
- }
- if uattrs.TemporaryHold != nil {
- attrs.TemporaryHold = optional.ToBool(uattrs.TemporaryHold)
- forceSendFields = append(forceSendFields, "TemporaryHold")
- }
- if !uattrs.CustomTime.IsZero() {
- attrs.CustomTime = uattrs.CustomTime
- forceSendFields = append(forceSendFields, "CustomTime")
- }
- if uattrs.Metadata != nil {
- attrs.Metadata = uattrs.Metadata
- if len(attrs.Metadata) == 0 {
- // Sending the empty map is a no-op. We send null instead.
- nullFields = append(nullFields, "Metadata")
- } else {
- forceSendFields = append(forceSendFields, "Metadata")
- }
- }
- if uattrs.ACL != nil {
- attrs.ACL = uattrs.ACL
- // It's an error to attempt to delete the ACL, so
- // we don't append to nullFields here.
- forceSendFields = append(forceSendFields, "Acl")
- }
- if uattrs.Retention != nil {
- // For ObjectRetention it's an error to send empty fields.
- // Instead we send a null as the user's intention is to remove.
- if uattrs.Retention.Mode == "" && uattrs.Retention.RetainUntil.IsZero() {
- nullFields = append(nullFields, "Retention")
- } else {
- attrs.Retention = uattrs.Retention
- forceSendFields = append(forceSendFields, "Retention")
- }
- }
- rawObj := attrs.toRawObject(params.bucket)
- rawObj.ForceSendFields = forceSendFields
- rawObj.NullFields = nullFields
- call := c.raw.Objects.Patch(params.bucket, params.object, rawObj).Projection("full")
- if err := applyConds("Update", params.gen, params.conds, call); err != nil {
- return nil, err
- }
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- if uattrs.PredefinedACL != "" {
- call.PredefinedAcl(uattrs.PredefinedACL)
- }
- if err := setEncryptionHeaders(call.Header(), params.encryptionKey, false); err != nil {
- return nil, err
- }
- if params.overrideRetention != nil {
- call.OverrideUnlockedRetention(*params.overrideRetention)
- }
- var obj *raw.Object
- var err error
- err = run(ctx, func(ctx context.Context) error { obj, err = call.Context(ctx).Do(); return err }, s.retry, s.idempotent)
- var e *googleapi.Error
- if errors.As(err, &e) && e.Code == http.StatusNotFound {
- return nil, ErrObjectNotExist
- }
- if err != nil {
- return nil, err
- }
- return newObject(obj), nil
- }
- // Default Object ACL methods.
- func (c *httpStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := c.raw.DefaultObjectAccessControls.Delete(bucket, string(entity))
- configureACLCall(ctx, s.userProject, req)
- return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
- }
- func (c *httpStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
- s := callSettings(c.settings, opts...)
- var acls *raw.ObjectAccessControls
- var err error
- req := c.raw.DefaultObjectAccessControls.List(bucket)
- configureACLCall(ctx, s.userProject, req)
- err = run(ctx, func(ctx context.Context) error {
- acls, err = req.Context(ctx).Do()
- return err
- }, s.retry, true)
- if err != nil {
- return nil, err
- }
- return toObjectACLRules(acls.Items), nil
- }
- func (c *httpStorageClient) UpdateDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- type setRequest interface {
- Do(opts ...googleapi.CallOption) (*raw.ObjectAccessControl, error)
- Header() http.Header
- }
- acl := &raw.ObjectAccessControl{
- Bucket: bucket,
- Entity: string(entity),
- Role: string(role),
- }
- var err error
- req := c.raw.DefaultObjectAccessControls.Update(bucket, string(entity), acl)
- configureACLCall(ctx, s.userProject, req)
- return run(ctx, func(ctx context.Context) error {
- _, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- }
- // Bucket ACL methods.
- func (c *httpStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := c.raw.BucketAccessControls.Delete(bucket, string(entity))
- configureACLCall(ctx, s.userProject, req)
- return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
- }
- func (c *httpStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
- s := callSettings(c.settings, opts...)
- var acls *raw.BucketAccessControls
- var err error
- req := c.raw.BucketAccessControls.List(bucket)
- configureACLCall(ctx, s.userProject, req)
- err = run(ctx, func(ctx context.Context) error {
- acls, err = req.Context(ctx).Do()
- return err
- }, s.retry, true)
- if err != nil {
- return nil, err
- }
- return toBucketACLRules(acls.Items), nil
- }
- func (c *httpStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- acl := &raw.BucketAccessControl{
- Bucket: bucket,
- Entity: string(entity),
- Role: string(role),
- }
- req := c.raw.BucketAccessControls.Update(bucket, string(entity), acl)
- configureACLCall(ctx, s.userProject, req)
- var err error
- return run(ctx, func(ctx context.Context) error {
- _, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- }
- // configureACLCall sets the context, user project and headers on the apiary library call.
- // This will panic if the call does not have the correct methods.
- func configureACLCall(ctx context.Context, userProject string, call interface{ Header() http.Header }) {
- vc := reflect.ValueOf(call)
- vc.MethodByName("Context").Call([]reflect.Value{reflect.ValueOf(ctx)})
- if userProject != "" {
- vc.MethodByName("UserProject").Call([]reflect.Value{reflect.ValueOf(userProject)})
- }
- setClientHeader(call.Header())
- }
- // Object ACL methods.
- func (c *httpStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := c.raw.ObjectAccessControls.Delete(bucket, object, string(entity))
- configureACLCall(ctx, s.userProject, req)
- return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
- }
- // ListObjectACLs retrieves object ACL entries. By default, it operates on the latest generation of this object.
- // Selecting a specific generation of this object is not currently supported by the client.
- func (c *httpStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) {
- s := callSettings(c.settings, opts...)
- var acls *raw.ObjectAccessControls
- var err error
- req := c.raw.ObjectAccessControls.List(bucket, object)
- configureACLCall(ctx, s.userProject, req)
- err = run(ctx, func(ctx context.Context) error {
- acls, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return toObjectACLRules(acls.Items), nil
- }
- func (c *httpStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- type setRequest interface {
- Do(opts ...googleapi.CallOption) (*raw.ObjectAccessControl, error)
- Header() http.Header
- }
- acl := &raw.ObjectAccessControl{
- Bucket: bucket,
- Entity: string(entity),
- Role: string(role),
- }
- var err error
- req := c.raw.ObjectAccessControls.Update(bucket, object, string(entity), acl)
- configureACLCall(ctx, s.userProject, req)
- return run(ctx, func(ctx context.Context) error {
- _, err = req.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- }
- // Media operations.
- func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) {
- s := callSettings(c.settings, opts...)
- rawReq := &raw.ComposeRequest{}
- // Compose requires a non-empty Destination, so we always set it,
- // even if the caller-provided ObjectAttrs is the zero value.
- rawReq.Destination = req.dstObject.attrs.toRawObject(req.dstBucket)
- if req.sendCRC32C {
- rawReq.Destination.Crc32c = encodeUint32(req.dstObject.attrs.CRC32C)
- }
- for _, src := range req.srcs {
- srcObj := &raw.ComposeRequestSourceObjects{
- Name: src.name,
- }
- if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
- return nil, err
- }
- rawReq.SourceObjects = append(rawReq.SourceObjects, srcObj)
- }
- call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq)
- if err := applyConds("ComposeFrom destination", defaultGen, req.dstObject.conds, call); err != nil {
- return nil, err
- }
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- if req.predefinedACL != "" {
- call.DestinationPredefinedAcl(req.predefinedACL)
- }
- if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
- return nil, err
- }
- var obj *raw.Object
- setClientHeader(call.Header())
- var err error
- retryCall := func(ctx context.Context) error { obj, err = call.Context(ctx).Do(); return err }
- if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
- return nil, err
- }
- return newObject(obj), nil
- }
- func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
- s := callSettings(c.settings, opts...)
- rawObject := req.dstObject.attrs.toRawObject("")
- call := c.raw.Objects.Rewrite(req.srcObject.bucket, req.srcObject.name, req.dstObject.bucket, req.dstObject.name, rawObject)
- call.Projection("full")
- if req.token != "" {
- call.RewriteToken(req.token)
- }
- if req.dstObject.keyName != "" {
- call.DestinationKmsKeyName(req.dstObject.keyName)
- }
- if req.predefinedACL != "" {
- call.DestinationPredefinedAcl(req.predefinedACL)
- }
- if err := applyConds("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
- return nil, err
- }
- if err := applySourceConds(req.srcObject.gen, req.srcObject.conds, call); err != nil {
- return nil, err
- }
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- // Set destination encryption headers.
- if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
- return nil, err
- }
- // Set source encryption headers.
- if err := setEncryptionHeaders(call.Header(), req.srcObject.encryptionKey, true); err != nil {
- return nil, err
- }
- if req.maxBytesRewrittenPerCall != 0 {
- call.MaxBytesRewrittenPerCall(req.maxBytesRewrittenPerCall)
- }
- var res *raw.RewriteResponse
- var err error
- setClientHeader(call.Header())
- retryCall := func(ctx context.Context) error { res, err = call.Context(ctx).Do(); return err }
- if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
- return nil, err
- }
- r := &rewriteObjectResponse{
- done: res.Done,
- written: res.TotalBytesRewritten,
- size: res.ObjectSize,
- token: res.RewriteToken,
- resource: newObject(res.Resource),
- }
- return r, nil
- }
- func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.NewRangeReader")
- defer func() { trace.EndSpan(ctx, err) }()
- s := callSettings(c.settings, opts...)
- if c.config.useJSONforReads {
- return c.newRangeReaderJSON(ctx, params, s)
- }
- return c.newRangeReaderXML(ctx, params, s)
- }
- func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRangeReaderParams, s *settings) (r *Reader, err error) {
- u := &url.URL{
- Scheme: c.scheme,
- Host: c.xmlHost,
- Path: fmt.Sprintf("/%s/%s", params.bucket, params.object),
- RawPath: fmt.Sprintf("/%s/%s", params.bucket, url.PathEscape(params.object)),
- }
- verb := "GET"
- if params.length == 0 {
- verb = "HEAD"
- }
- req, err := http.NewRequest(verb, u.String(), nil)
- if err != nil {
- return nil, err
- }
- if s.userProject != "" {
- req.Header.Set("X-Goog-User-Project", s.userProject)
- }
- if err := setRangeReaderHeaders(req.Header, params); err != nil {
- return nil, err
- }
- // Set custom headers passed in via the context. This is only required for XML;
- // for gRPC & JSON this is handled in the GAPIC and Apiary layers respectively.
- ctxHeaders := callctx.HeadersFromContext(ctx)
- for k, vals := range ctxHeaders {
- for _, v := range vals {
- req.Header.Add(k, v)
- }
- }
- reopen := readerReopen(ctx, req.Header, params, s,
- func(ctx context.Context) (*http.Response, error) { return c.hc.Do(req.WithContext(ctx)) },
- func() error { return setConditionsHeaders(req.Header, params.conds) },
- func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) })
- res, err := reopen(0)
- if err != nil {
- return nil, err
- }
- return parseReadResponse(res, params, reopen)
- }
- func (c *httpStorageClient) newRangeReaderJSON(ctx context.Context, params *newRangeReaderParams, s *settings) (r *Reader, err error) {
- call := c.raw.Objects.Get(params.bucket, params.object)
- setClientHeader(call.Header())
- call.Projection("full")
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- if err := setRangeReaderHeaders(call.Header(), params); err != nil {
- return nil, err
- }
- reopen := readerReopen(ctx, call.Header(), params, s, func(ctx context.Context) (*http.Response, error) { return call.Context(ctx).Download() },
- func() error { return applyConds("NewReader", params.gen, params.conds, call) },
- func() { call.Generation(params.gen) })
- res, err := reopen(0)
- if err != nil {
- return nil, err
- }
- return parseReadResponse(res, params, reopen)
- }
- func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
- s := callSettings(c.settings, opts...)
- errorf := params.setError
- setObj := params.setObj
- progress := params.progress
- attrs := params.attrs
- mediaOpts := []googleapi.MediaOption{
- googleapi.ChunkSize(params.chunkSize),
- }
- if c := attrs.ContentType; c != "" || params.forceEmptyContentType {
- mediaOpts = append(mediaOpts, googleapi.ContentType(c))
- }
- if params.chunkRetryDeadline != 0 {
- mediaOpts = append(mediaOpts, googleapi.ChunkRetryDeadline(params.chunkRetryDeadline))
- }
- pr, pw := io.Pipe()
- go func() {
- defer close(params.donec)
- rawObj := attrs.toRawObject(params.bucket)
- if params.sendCRC32C {
- rawObj.Crc32c = encodeUint32(attrs.CRC32C)
- }
- if attrs.MD5 != nil {
- rawObj.Md5Hash = base64.StdEncoding.EncodeToString(attrs.MD5)
- }
- call := c.raw.Objects.Insert(params.bucket, rawObj).
- Media(pr, mediaOpts...).
- Projection("full").
- Context(params.ctx).
- Name(params.attrs.Name)
- call.ProgressUpdater(func(n, _ int64) { progress(n) })
- if attrs.KMSKeyName != "" {
- call.KmsKeyName(attrs.KMSKeyName)
- }
- if attrs.PredefinedACL != "" {
- call.PredefinedAcl(attrs.PredefinedACL)
- }
- if err := setEncryptionHeaders(call.Header(), params.encryptionKey, false); err != nil {
- errorf(err)
- pr.CloseWithError(err)
- return
- }
- var resp *raw.Object
- err := applyConds("NewWriter", defaultGen, params.conds, call)
- if err == nil {
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- // TODO(tritone): Remove this code when Uploads begin to support
- // retry attempt header injection with "client header" injection.
- setClientHeader(call.Header())
- // The internals that perform call.Do automatically retry both the initial
- // call to set up the upload as well as calls to upload individual chunks
- // for a resumable upload (as long as the chunk size is non-zero). Hence
- // there is no need to add retries here.
- // Retry only when the operation is idempotent or the retry policy is RetryAlways.
- var useRetry bool
- if (s.retry == nil || s.retry.policy == RetryIdempotent) && s.idempotent {
- useRetry = true
- } else if s.retry != nil && s.retry.policy == RetryAlways {
- useRetry = true
- }
- if useRetry {
- if s.retry != nil {
- call.WithRetry(s.retry.backoff, s.retry.shouldRetry)
- } else {
- call.WithRetry(nil, nil)
- }
- }
- resp, err = call.Do()
- }
- if err != nil {
- errorf(err)
- pr.CloseWithError(err)
- return
- }
- setObj(newObject(resp))
- }()
- return pw, nil
- }
- // IAM methods.
- func (c *httpStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) {
- s := callSettings(c.settings, opts...)
- call := c.raw.Buckets.GetIamPolicy(resource).OptionsRequestedPolicyVersion(int64(version))
- setClientHeader(call.Header())
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- var rp *raw.Policy
- err := run(ctx, func(ctx context.Context) error {
- var err error
- rp, err = call.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return iamFromStoragePolicy(rp), nil
- }
- func (c *httpStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- rp := iamToStoragePolicy(policy)
- call := c.raw.Buckets.SetIamPolicy(resource, rp)
- setClientHeader(call.Header())
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- return run(ctx, func(ctx context.Context) error {
- _, err := call.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- }
- func (c *httpStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) {
- s := callSettings(c.settings, opts...)
- call := c.raw.Buckets.TestIamPermissions(resource, permissions)
- setClientHeader(call.Header())
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- var res *raw.TestIamPermissionsResponse
- err := run(ctx, func(ctx context.Context) error {
- var err error
- res, err = call.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return res.Permissions, nil
- }
- // HMAC Key methods.
- func (c *httpStorageClient) GetHMACKey(ctx context.Context, project, accessID string, opts ...storageOption) (*HMACKey, error) {
- s := callSettings(c.settings, opts...)
- call := c.raw.Projects.HmacKeys.Get(project, accessID)
- if s.userProject != "" {
- call = call.UserProject(s.userProject)
- }
- var metadata *raw.HmacKeyMetadata
- var err error
- if err := run(ctx, func(ctx context.Context) error {
- metadata, err = call.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent); err != nil {
- return nil, err
- }
- hk := &raw.HmacKey{
- Metadata: metadata,
- }
- return toHMACKeyFromRaw(hk, false)
- }
- func (c *httpStorageClient) ListHMACKeys(ctx context.Context, project, serviceAccountEmail string, showDeletedKeys bool, opts ...storageOption) *HMACKeysIterator {
- s := callSettings(c.settings, opts...)
- it := &HMACKeysIterator{
- ctx: ctx,
- raw: c.raw.Projects.HmacKeys,
- projectID: project,
- retry: s.retry,
- }
- fetch := func(pageSize int, pageToken string) (token string, err error) {
- call := c.raw.Projects.HmacKeys.List(project)
- setClientHeader(call.Header())
- if pageToken != "" {
- call = call.PageToken(pageToken)
- }
- if pageSize > 0 {
- call = call.MaxResults(int64(pageSize))
- }
- if showDeletedKeys {
- call = call.ShowDeletedKeys(true)
- }
- if s.userProject != "" {
- call = call.UserProject(s.userProject)
- }
- if serviceAccountEmail != "" {
- call = call.ServiceAccountEmail(serviceAccountEmail)
- }
- var resp *raw.HmacKeysMetadata
- err = run(it.ctx, func(ctx context.Context) error {
- resp, err = call.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return "", err
- }
- for _, metadata := range resp.Items {
- hk := &raw.HmacKey{
- Metadata: metadata,
- }
- hkey, err := toHMACKeyFromRaw(hk, true)
- if err != nil {
- return "", err
- }
- it.hmacKeys = append(it.hmacKeys, hkey)
- }
- return resp.NextPageToken, nil
- }
- it.pageInfo, it.nextFunc = iterator.NewPageInfo(
- fetch,
- func() int { return len(it.hmacKeys) - it.index },
- func() interface{} {
- prev := it.hmacKeys
- it.hmacKeys = it.hmacKeys[:0]
- it.index = 0
- return prev
- })
- return it
- }
- func (c *httpStorageClient) UpdateHMACKey(ctx context.Context, project, serviceAccountEmail, accessID string, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) {
- s := callSettings(c.settings, opts...)
- call := c.raw.Projects.HmacKeys.Update(project, accessID, &raw.HmacKeyMetadata{
- Etag: attrs.Etag,
- State: string(attrs.State),
- })
- if s.userProject != "" {
- call = call.UserProject(s.userProject)
- }
- var metadata *raw.HmacKeyMetadata
- var err error
- if err := run(ctx, func(ctx context.Context) error {
- metadata, err = call.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent); err != nil {
- return nil, err
- }
- hk := &raw.HmacKey{
- Metadata: metadata,
- }
- return toHMACKeyFromRaw(hk, false)
- }
- func (c *httpStorageClient) CreateHMACKey(ctx context.Context, project, serviceAccountEmail string, opts ...storageOption) (*HMACKey, error) {
- s := callSettings(c.settings, opts...)
- call := c.raw.Projects.HmacKeys.Create(project, serviceAccountEmail)
- if s.userProject != "" {
- call = call.UserProject(s.userProject)
- }
- var hk *raw.HmacKey
- if err := run(ctx, func(ctx context.Context) error {
- h, err := call.Context(ctx).Do()
- hk = h
- return err
- }, s.retry, s.idempotent); err != nil {
- return nil, err
- }
- return toHMACKeyFromRaw(hk, true)
- }
- func (c *httpStorageClient) DeleteHMACKey(ctx context.Context, project string, accessID string, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- call := c.raw.Projects.HmacKeys.Delete(project, accessID)
- if s.userProject != "" {
- call = call.UserProject(s.userProject)
- }
- return run(ctx, func(ctx context.Context) error {
- return call.Context(ctx).Do()
- }, s.retry, s.idempotent)
- }
- // Notification methods.
- // ListNotifications returns all the Notifications configured for this bucket, as a map indexed by notification ID.
- //
- // Note: This API does not support pagination. However, entity limits cap the number of notifications on a single bucket,
- // so all results will be returned in the first response. See https://cloud.google.com/storage/quotas#buckets.
- func (c *httpStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.ListNotifications")
- defer func() { trace.EndSpan(ctx, err) }()
- s := callSettings(c.settings, opts...)
- call := c.raw.Notifications.List(bucket)
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- var res *raw.Notifications
- err = run(ctx, func(ctx context.Context) error {
- res, err = call.Context(ctx).Do()
- return err
- }, s.retry, true)
- if err != nil {
- return nil, err
- }
- return notificationsToMap(res.Items), nil
- }
- func (c *httpStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.CreateNotification")
- defer func() { trace.EndSpan(ctx, err) }()
- s := callSettings(c.settings, opts...)
- call := c.raw.Notifications.Insert(bucket, toRawNotification(n))
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- var rn *raw.Notification
- err = run(ctx, func(ctx context.Context) error {
- rn, err = call.Context(ctx).Do()
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return toNotification(rn), nil
- }
- func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.DeleteNotification")
- defer func() { trace.EndSpan(ctx, err) }()
- s := callSettings(c.settings, opts...)
- call := c.raw.Notifications.Delete(bucket, id)
- if s.userProject != "" {
- call.UserProject(s.userProject)
- }
- return run(ctx, func(ctx context.Context) error {
- return call.Context(ctx).Do()
- }, s.retry, s.idempotent)
- }
- type httpReader struct {
- body io.ReadCloser
- seen int64
- reopen func(seen int64) (*http.Response, error)
- }
- func (r *httpReader) Read(p []byte) (int, error) {
- n := 0
- for len(p[n:]) > 0 {
- m, err := r.body.Read(p[n:])
- n += m
- r.seen += int64(m)
- if err == nil || err == io.EOF {
- return n, err
- }
- // Read failed (likely due to connection issues), but we will try to reopen
- // the pipe and continue. Send a ranged read request that takes into account
- // the number of bytes we've already seen.
- res, err := r.reopen(r.seen)
- if err != nil {
- // reopen already retries
- return n, err
- }
- r.body.Close()
- r.body = res.Body
- }
- return n, nil
- }
- func (r *httpReader) Close() error {
- return r.body.Close()
- }
- func setRangeReaderHeaders(h http.Header, params *newRangeReaderParams) error {
- if params.readCompressed {
- h.Set("Accept-Encoding", "gzip")
- }
- if err := setEncryptionHeaders(h, params.encryptionKey, false); err != nil {
- return err
- }
- return nil
- }
- // readerReopen initiates a Read with offset and length, assuming we
- // have already read seen bytes.
- func readerReopen(ctx context.Context, header http.Header, params *newRangeReaderParams, s *settings,
- doDownload func(context.Context) (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) {
- return func(seen int64) (*http.Response, error) {
- // If the context has already expired, return immediately without making a
- // call.
- if err := ctx.Err(); err != nil {
- return nil, err
- }
- start := params.offset + seen
- if params.length < 0 && start < 0 {
- header.Set("Range", fmt.Sprintf("bytes=%d", start))
- } else if params.length < 0 && start > 0 {
- header.Set("Range", fmt.Sprintf("bytes=%d-", start))
- } else if params.length > 0 {
- // The end character isn't affected by how many bytes we've seen.
- header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, params.offset+params.length-1))
- }
- // We wait to assign conditions here because the generation number can change in between reopen() runs.
- if err := applyConditions(); err != nil {
- return nil, err
- }
- // If an object generation is specified, include generation as query string parameters.
- if params.gen >= 0 {
- setGeneration()
- }
- var err error
- var res *http.Response
- err = run(ctx, func(ctx context.Context) error {
- res, err = doDownload(ctx)
- if err != nil {
- var e *googleapi.Error
- if errors.As(err, &e) {
- if e.Code == http.StatusNotFound {
- return ErrObjectNotExist
- }
- }
- return err
- }
- if res.StatusCode == http.StatusNotFound {
- // this check is necessary only for XML
- res.Body.Close()
- return ErrObjectNotExist
- }
- if res.StatusCode < 200 || res.StatusCode > 299 {
- body, _ := ioutil.ReadAll(res.Body)
- res.Body.Close()
- return &googleapi.Error{
- Code: res.StatusCode,
- Header: res.Header,
- Body: string(body),
- }
- }
- partialContentNotSatisfied :=
- !decompressiveTranscoding(res) &&
- start > 0 && params.length != 0 &&
- res.StatusCode != http.StatusPartialContent
- if partialContentNotSatisfied {
- res.Body.Close()
- return errors.New("storage: partial request not satisfied")
- }
- // With "Content-Encoding": "gzip" aka decompressive transcoding, GCS serves
- // back the whole file regardless of the range count passed in as per:
- // https://cloud.google.com/storage/docs/transcoding#range,
- // thus we have to manually move the body forward by seen bytes.
- if decompressiveTranscoding(res) && seen > 0 {
- _, _ = io.CopyN(ioutil.Discard, res.Body, seen)
- }
- // If a generation hasn't been specified, and this is the first response we get, let's record the
- // generation. In future requests we'll use this generation as a precondition to avoid data races.
- if params.gen < 0 && res.Header.Get("X-Goog-Generation") != "" {
- gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64)
- if err != nil {
- return err
- }
- params.gen = gen64
- }
- return nil
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
- }
- func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen func(int64) (*http.Response, error)) (*Reader, error) {
- var err error
- var (
- size int64 // total size of object, even if a range was requested.
- checkCRC bool
- crc uint32
- startOffset int64 // non-zero if range request.
- )
- if res.StatusCode == http.StatusPartialContent {
- cr := strings.TrimSpace(res.Header.Get("Content-Range"))
- if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {
- return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
- }
- // Content range is formatted <first byte>-<last byte>/<total size>. We take
- // the total size.
- size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
- if err != nil {
- return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
- }
- dashIndex := strings.Index(cr, "-")
- if dashIndex >= 0 {
- startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64)
- if err != nil {
- return nil, fmt.Errorf("storage: invalid Content-Range %q: %w", cr, err)
- }
- }
- } else {
- size = res.ContentLength
- // Check the CRC iff all of the following hold:
- // - We asked for content (length != 0).
- // - We got all the content (status != PartialContent).
- // - The server sent a CRC header.
- // - The Go http stack did not uncompress the file.
- // - We were not served compressed data that was uncompressed on download.
- // The problem with the last two cases is that the CRC will not match -- GCS
- // computes it on the compressed contents, but we compute it on the
- // uncompressed contents.
- if params.length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
- crc, checkCRC = parseCRC32c(res)
- }
- }
- remain := res.ContentLength
- body := res.Body
- // If the user requested zero bytes, explicitly close and remove the request
- // body.
- if params.length == 0 {
- remain = 0
- body.Close()
- body = emptyBody
- }
- var metaGen int64
- if res.Header.Get("X-Goog-Metageneration") != "" {
- metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64)
- if err != nil {
- return nil, err
- }
- }
- var lm time.Time
- if res.Header.Get("Last-Modified") != "" {
- lm, err = http.ParseTime(res.Header.Get("Last-Modified"))
- if err != nil {
- return nil, err
- }
- }
- attrs := ReaderObjectAttrs{
- Size: size,
- ContentType: res.Header.Get("Content-Type"),
- ContentEncoding: res.Header.Get("Content-Encoding"),
- CacheControl: res.Header.Get("Cache-Control"),
- LastModified: lm,
- StartOffset: startOffset,
- Generation: params.gen,
- Metageneration: metaGen,
- }
- return &Reader{
- Attrs: attrs,
- size: size,
- remain: remain,
- wantCRC: crc,
- checkCRC: checkCRC,
- reader: &httpReader{
- reopen: reopen,
- body: body,
- },
- }, nil
- }
|