| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889 |
- // Copyright 2022 Google LLC
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package storage
- import (
- "context"
- "encoding/base64"
- "errors"
- "fmt"
- "io"
- "net/url"
- "os"
- "cloud.google.com/go/iam/apiv1/iampb"
- "cloud.google.com/go/internal/trace"
- gapic "cloud.google.com/go/storage/internal/apiv2"
- "cloud.google.com/go/storage/internal/apiv2/storagepb"
- "github.com/googleapis/gax-go/v2"
- "google.golang.org/api/googleapi"
- "google.golang.org/api/iterator"
- "google.golang.org/api/option"
- "google.golang.org/api/option/internaloption"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb"
- )
- const (
- // defaultConnPoolSize is the default number of channels
- // to initialize in the GAPIC gRPC connection pool. A larger
- // connection pool may be necessary for jobs that require
- // high throughput and/or leverage many concurrent streams
- // if not running via DirectPath.
- //
- // This is only used for the gRPC client.
- defaultConnPoolSize = 1
- // maxPerMessageWriteSize is the maximum amount of content that can be sent
- // per WriteObjectRequest message. A buffer reaching this amount will
- // precipitate a flush of the buffer. It is only used by the gRPC Writer
- // implementation.
- maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)
- // globalProjectAlias is the project ID alias used for global buckets.
- //
- // This is only used for the gRPC API.
- globalProjectAlias = "_"
- // msgEntityNotSupported indicates ACL entites using project ID are not currently supported.
- //
- // This is only used for the gRPC API.
- msgEntityNotSupported = "The gRPC API currently does not support ACL entities using project ID, use project numbers instead"
- )
- // defaultGRPCOptions returns a set of the default client options
- // for gRPC client initialization.
- func defaultGRPCOptions() []option.ClientOption {
- defaults := []option.ClientOption{
- option.WithGRPCConnectionPool(defaultConnPoolSize),
- }
- // Set emulator options for gRPC if an emulator was specified. Note that in a
- // hybrid client, STORAGE_EMULATOR_HOST will set the host to use for HTTP and
- // STORAGE_EMULATOR_HOST_GRPC will set the host to use for gRPC (when using a
- // local emulator, HTTP and gRPC must use different ports, so this is
- // necessary).
- //
- // TODO: When the newHybridClient is not longer used, remove
- // STORAGE_EMULATOR_HOST_GRPC and use STORAGE_EMULATOR_HOST for both the
- // HTTP and gRPC based clients.
- if host := os.Getenv("STORAGE_EMULATOR_HOST_GRPC"); host != "" {
- // Strip the scheme from the emulator host. WithEndpoint does not take a
- // scheme for gRPC.
- host = stripScheme(host)
- defaults = append(defaults,
- option.WithEndpoint(host),
- option.WithGRPCDialOption(grpc.WithInsecure()),
- option.WithoutAuthentication(),
- )
- } else {
- // Only enable DirectPath when the emulator is not being targeted.
- defaults = append(defaults, internaloption.EnableDirectPath(true))
- }
- return defaults
- }
- // grpcStorageClient is the gRPC API implementation of the transport-agnostic
- // storageClient interface.
- type grpcStorageClient struct {
- raw *gapic.Client
- settings *settings
- }
- // newGRPCStorageClient initializes a new storageClient that uses the gRPC
- // Storage API.
- func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) {
- s := initSettings(opts...)
- s.clientOption = append(defaultGRPCOptions(), s.clientOption...)
- config := newStorageConfig(s.clientOption...)
- if config.readAPIWasSet {
- return nil, errors.New("storage: GRPC is incompatible with any option that specifies an API for reads")
- }
- g, err := gapic.NewClient(ctx, s.clientOption...)
- if err != nil {
- return nil, err
- }
- return &grpcStorageClient{
- raw: g,
- settings: s,
- }, nil
- }
- func (c *grpcStorageClient) Close() error {
- return c.raw.Close()
- }
- // Top-level methods.
- func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) {
- s := callSettings(c.settings, opts...)
- req := &storagepb.GetServiceAccountRequest{
- Project: toProjectResource(project),
- }
- var resp *storagepb.ServiceAccount
- err := run(ctx, func(ctx context.Context) error {
- var err error
- resp, err = c.raw.GetServiceAccount(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return "", err
- }
- return resp.EmailAddress, err
- }
- func (c *grpcStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, enableObjectRetention *bool, opts ...storageOption) (*BucketAttrs, error) {
- if enableObjectRetention != nil {
- // TO-DO: implement ObjectRetention once available - see b/308194853
- return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
- }
- s := callSettings(c.settings, opts...)
- b := attrs.toProtoBucket()
- b.Project = toProjectResource(project)
- // If there is lifecycle information but no location, explicitly set
- // the location. This is a GCS quirk/bug.
- if b.GetLocation() == "" && b.GetLifecycle() != nil {
- b.Location = "US"
- }
- req := &storagepb.CreateBucketRequest{
- Parent: fmt.Sprintf("projects/%s", globalProjectAlias),
- Bucket: b,
- BucketId: bucket,
- }
- if attrs != nil {
- req.PredefinedAcl = attrs.PredefinedACL
- req.PredefinedDefaultObjectAcl = attrs.PredefinedDefaultObjectACL
- }
- var battrs *BucketAttrs
- err := run(ctx, func(ctx context.Context) error {
- res, err := c.raw.CreateBucket(ctx, req, s.gax...)
- battrs = newBucketFromProto(res)
- return err
- }, s.retry, s.idempotent)
- return battrs, err
- }
- func (c *grpcStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator {
- s := callSettings(c.settings, opts...)
- it := &BucketIterator{
- ctx: ctx,
- projectID: project,
- }
- var gitr *gapic.BucketIterator
- fetch := func(pageSize int, pageToken string) (token string, err error) {
- var buckets []*storagepb.Bucket
- var next string
- err = run(it.ctx, func(ctx context.Context) error {
- // Initialize GAPIC-based iterator when pageToken is empty, which
- // indicates that this fetch call is attempting to get the first page.
- //
- // Note: Initializing the GAPIC-based iterator lazily is necessary to
- // capture the BucketIterator.Prefix set by the user *after* the
- // BucketIterator is returned to them from the veneer.
- if pageToken == "" {
- req := &storagepb.ListBucketsRequest{
- Parent: toProjectResource(it.projectID),
- Prefix: it.Prefix,
- }
- gitr = c.raw.ListBuckets(ctx, req, s.gax...)
- }
- buckets, next, err = gitr.InternalFetch(pageSize, pageToken)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return "", err
- }
- for _, bkt := range buckets {
- b := newBucketFromProto(bkt)
- it.buckets = append(it.buckets, b)
- }
- return next, nil
- }
- it.pageInfo, it.nextFunc = iterator.NewPageInfo(
- fetch,
- func() int { return len(it.buckets) },
- func() interface{} { b := it.buckets; it.buckets = nil; return b })
- return it
- }
- // Bucket methods.
- func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := &storagepb.DeleteBucketRequest{
- Name: bucketResourceName(globalProjectAlias, bucket),
- }
- if err := applyBucketCondsProto("grpcStorageClient.DeleteBucket", conds, req); err != nil {
- return err
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- return run(ctx, func(ctx context.Context) error {
- return c.raw.DeleteBucket(ctx, req, s.gax...)
- }, s.retry, s.idempotent)
- }
- func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
- s := callSettings(c.settings, opts...)
- req := &storagepb.GetBucketRequest{
- Name: bucketResourceName(globalProjectAlias, bucket),
- ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}},
- }
- if err := applyBucketCondsProto("grpcStorageClient.GetBucket", conds, req); err != nil {
- return nil, err
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- var battrs *BucketAttrs
- err := run(ctx, func(ctx context.Context) error {
- res, err := c.raw.GetBucket(ctx, req, s.gax...)
- battrs = newBucketFromProto(res)
- return err
- }, s.retry, s.idempotent)
- if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
- return nil, ErrBucketNotExist
- }
- return battrs, err
- }
- func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
- s := callSettings(c.settings, opts...)
- b := uattrs.toProtoBucket()
- b.Name = bucketResourceName(globalProjectAlias, bucket)
- req := &storagepb.UpdateBucketRequest{
- Bucket: b,
- PredefinedAcl: uattrs.PredefinedACL,
- PredefinedDefaultObjectAcl: uattrs.PredefinedDefaultObjectACL,
- }
- if err := applyBucketCondsProto("grpcStorageClient.UpdateBucket", conds, req); err != nil {
- return nil, err
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- var paths []string
- fieldMask := &fieldmaskpb.FieldMask{
- Paths: paths,
- }
- if uattrs.CORS != nil {
- fieldMask.Paths = append(fieldMask.Paths, "cors")
- }
- if uattrs.DefaultEventBasedHold != nil {
- fieldMask.Paths = append(fieldMask.Paths, "default_event_based_hold")
- }
- if uattrs.RetentionPolicy != nil {
- fieldMask.Paths = append(fieldMask.Paths, "retention_policy")
- }
- if uattrs.VersioningEnabled != nil {
- fieldMask.Paths = append(fieldMask.Paths, "versioning")
- }
- if uattrs.RequesterPays != nil {
- fieldMask.Paths = append(fieldMask.Paths, "billing")
- }
- if uattrs.BucketPolicyOnly != nil || uattrs.UniformBucketLevelAccess != nil || uattrs.PublicAccessPrevention != PublicAccessPreventionUnknown {
- fieldMask.Paths = append(fieldMask.Paths, "iam_config")
- }
- if uattrs.Encryption != nil {
- fieldMask.Paths = append(fieldMask.Paths, "encryption")
- }
- if uattrs.Lifecycle != nil {
- fieldMask.Paths = append(fieldMask.Paths, "lifecycle")
- }
- if uattrs.Logging != nil {
- fieldMask.Paths = append(fieldMask.Paths, "logging")
- }
- if uattrs.Website != nil {
- fieldMask.Paths = append(fieldMask.Paths, "website")
- }
- if uattrs.PredefinedACL != "" {
- // In cases where PredefinedACL is set, Acl is cleared.
- fieldMask.Paths = append(fieldMask.Paths, "acl")
- }
- if uattrs.PredefinedDefaultObjectACL != "" {
- // In cases where PredefinedDefaultObjectACL is set, DefaultObjectAcl is cleared.
- fieldMask.Paths = append(fieldMask.Paths, "default_object_acl")
- }
- // Note: This API currently does not support entites using project ID.
- // Use project numbers in ACL entities. Pending b/233617896.
- if uattrs.acl != nil {
- // In cases where acl is set by UpdateBucketACL method.
- fieldMask.Paths = append(fieldMask.Paths, "acl")
- }
- if uattrs.defaultObjectACL != nil {
- // In cases where defaultObjectACL is set by UpdateBucketACL method.
- fieldMask.Paths = append(fieldMask.Paths, "default_object_acl")
- }
- if uattrs.StorageClass != "" {
- fieldMask.Paths = append(fieldMask.Paths, "storage_class")
- }
- if uattrs.RPO != RPOUnknown {
- fieldMask.Paths = append(fieldMask.Paths, "rpo")
- }
- if uattrs.Autoclass != nil {
- fieldMask.Paths = append(fieldMask.Paths, "autoclass")
- }
- for label := range uattrs.setLabels {
- fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("labels.%s", label))
- }
- // Delete a label by not including it in Bucket.Labels but adding the key to the update mask.
- for label := range uattrs.deleteLabels {
- fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("labels.%s", label))
- }
- req.UpdateMask = fieldMask
- var battrs *BucketAttrs
- err := run(ctx, func(ctx context.Context) error {
- res, err := c.raw.UpdateBucket(ctx, req, s.gax...)
- battrs = newBucketFromProto(res)
- return err
- }, s.retry, s.idempotent)
- return battrs, err
- }
- func (c *grpcStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := &storagepb.LockBucketRetentionPolicyRequest{
- Bucket: bucketResourceName(globalProjectAlias, bucket),
- }
- if err := applyBucketCondsProto("grpcStorageClient.LockBucketRetentionPolicy", conds, req); err != nil {
- return err
- }
- return run(ctx, func(ctx context.Context) error {
- _, err := c.raw.LockBucketRetentionPolicy(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- }
- func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator {
- s := callSettings(c.settings, opts...)
- it := &ObjectIterator{
- ctx: ctx,
- }
- if q != nil {
- it.query = *q
- }
- req := &storagepb.ListObjectsRequest{
- Parent: bucketResourceName(globalProjectAlias, bucket),
- Prefix: it.query.Prefix,
- Delimiter: it.query.Delimiter,
- Versions: it.query.Versions,
- LexicographicStart: it.query.StartOffset,
- LexicographicEnd: it.query.EndOffset,
- IncludeTrailingDelimiter: it.query.IncludeTrailingDelimiter,
- MatchGlob: it.query.MatchGlob,
- ReadMask: q.toFieldMask(), // a nil Query still results in a "*" FieldMask
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- fetch := func(pageSize int, pageToken string) (token string, err error) {
- // IncludeFoldersAsPrefixes is not supported for gRPC
- // TODO: remove this when support is added in the proto.
- if it.query.IncludeFoldersAsPrefixes {
- return "", status.Errorf(codes.Unimplemented, "storage: IncludeFoldersAsPrefixes is not supported in gRPC")
- }
- var objects []*storagepb.Object
- var gitr *gapic.ObjectIterator
- err = run(it.ctx, func(ctx context.Context) error {
- gitr = c.raw.ListObjects(ctx, req, s.gax...)
- it.ctx = ctx
- objects, token, err = gitr.InternalFetch(pageSize, pageToken)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
- err = ErrBucketNotExist
- }
- return "", err
- }
- for _, obj := range objects {
- b := newObjectFromProto(obj)
- it.items = append(it.items, b)
- }
- // Response is always non-nil after a successful request.
- res := gitr.Response.(*storagepb.ListObjectsResponse)
- for _, prefix := range res.GetPrefixes() {
- it.items = append(it.items, &ObjectAttrs{Prefix: prefix})
- }
- return token, nil
- }
- it.pageInfo, it.nextFunc = iterator.NewPageInfo(
- fetch,
- func() int { return len(it.items) },
- func() interface{} { b := it.items; it.items = nil; return b })
- return it
- }
- // Object metadata methods.
- func (c *grpcStorageClient) DeleteObject(ctx context.Context, bucket, object string, gen int64, conds *Conditions, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := &storagepb.DeleteObjectRequest{
- Bucket: bucketResourceName(globalProjectAlias, bucket),
- Object: object,
- }
- if err := applyCondsProto("grpcStorageClient.DeleteObject", gen, conds, req); err != nil {
- return err
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- err := run(ctx, func(ctx context.Context) error {
- return c.raw.DeleteObject(ctx, req, s.gax...)
- }, s.retry, s.idempotent)
- if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
- return ErrObjectNotExist
- }
- return err
- }
- func (c *grpcStorageClient) GetObject(ctx context.Context, bucket, object string, gen int64, encryptionKey []byte, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) {
- s := callSettings(c.settings, opts...)
- req := &storagepb.GetObjectRequest{
- Bucket: bucketResourceName(globalProjectAlias, bucket),
- Object: object,
- // ProjectionFull by default.
- ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}},
- }
- if err := applyCondsProto("grpcStorageClient.GetObject", gen, conds, req); err != nil {
- return nil, err
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- if encryptionKey != nil {
- req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(encryptionKey)
- }
- var attrs *ObjectAttrs
- err := run(ctx, func(ctx context.Context) error {
- res, err := c.raw.GetObject(ctx, req, s.gax...)
- attrs = newObjectFromProto(res)
- return err
- }, s.retry, s.idempotent)
- if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
- return nil, ErrObjectNotExist
- }
- return attrs, err
- }
- func (c *grpcStorageClient) UpdateObject(ctx context.Context, params *updateObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
- uattrs := params.uattrs
- if params.overrideRetention != nil || uattrs.Retention != nil {
- // TO-DO: implement ObjectRetention once available - see b/308194853
- return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
- }
- s := callSettings(c.settings, opts...)
- o := uattrs.toProtoObject(bucketResourceName(globalProjectAlias, params.bucket), params.object)
- // For Update, generation is passed via the object message rather than a field on the request.
- if params.gen >= 0 {
- o.Generation = params.gen
- }
- req := &storagepb.UpdateObjectRequest{
- Object: o,
- PredefinedAcl: uattrs.PredefinedACL,
- }
- if err := applyCondsProto("grpcStorageClient.UpdateObject", defaultGen, params.conds, req); err != nil {
- return nil, err
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- if params.encryptionKey != nil {
- req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(params.encryptionKey)
- }
- fieldMask := &fieldmaskpb.FieldMask{Paths: nil}
- if uattrs.EventBasedHold != nil {
- fieldMask.Paths = append(fieldMask.Paths, "event_based_hold")
- }
- if uattrs.TemporaryHold != nil {
- fieldMask.Paths = append(fieldMask.Paths, "temporary_hold")
- }
- if uattrs.ContentType != nil {
- fieldMask.Paths = append(fieldMask.Paths, "content_type")
- }
- if uattrs.ContentLanguage != nil {
- fieldMask.Paths = append(fieldMask.Paths, "content_language")
- }
- if uattrs.ContentEncoding != nil {
- fieldMask.Paths = append(fieldMask.Paths, "content_encoding")
- }
- if uattrs.ContentDisposition != nil {
- fieldMask.Paths = append(fieldMask.Paths, "content_disposition")
- }
- if uattrs.CacheControl != nil {
- fieldMask.Paths = append(fieldMask.Paths, "cache_control")
- }
- if !uattrs.CustomTime.IsZero() {
- fieldMask.Paths = append(fieldMask.Paths, "custom_time")
- }
- // Note: This API currently does not support entites using project ID.
- // Use project numbers in ACL entities. Pending b/233617896.
- if uattrs.ACL != nil || len(uattrs.PredefinedACL) > 0 {
- fieldMask.Paths = append(fieldMask.Paths, "acl")
- }
- if uattrs.Metadata != nil {
- // We don't support deleting a specific metadata key; metadata is deleted
- // as a whole if provided an empty map, so we do not use dot notation here
- if len(uattrs.Metadata) == 0 {
- fieldMask.Paths = append(fieldMask.Paths, "metadata")
- } else {
- // We can, however, use dot notation for adding keys
- for key := range uattrs.Metadata {
- fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("metadata.%s", key))
- }
- }
- }
- req.UpdateMask = fieldMask
- var attrs *ObjectAttrs
- err := run(ctx, func(ctx context.Context) error {
- res, err := c.raw.UpdateObject(ctx, req, s.gax...)
- attrs = newObjectFromProto(res)
- return err
- }, s.retry, s.idempotent)
- if e, ok := status.FromError(err); ok && e.Code() == codes.NotFound {
- return nil, ErrObjectNotExist
- }
- return attrs, err
- }
- // Default Object ACL methods.
- func (c *grpcStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
- // There is no separate API for PATCH in gRPC.
- // Make a GET call first to retrieve BucketAttrs.
- attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
- if err != nil {
- return err
- }
- // Delete the entity and copy other remaining ACL entities.
- // Note: This API currently does not support entites using project ID.
- // Use project numbers in ACL entities. Pending b/233617896.
- // Return error if entity is not found or a project ID is used.
- invalidEntity := true
- var acl []ACLRule
- for _, a := range attrs.DefaultObjectACL {
- if a.Entity != entity {
- acl = append(acl, a)
- }
- if a.Entity == entity {
- invalidEntity = false
- }
- }
- if invalidEntity {
- return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.DefaultObjectACL, msgEntityNotSupported)
- }
- uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl}
- // Call UpdateBucket with a MetagenerationMatch precondition set.
- if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
- return err
- }
- return nil
- }
- func (c *grpcStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
- attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
- if err != nil {
- return nil, err
- }
- return attrs.DefaultObjectACL, nil
- }
- func (c *grpcStorageClient) UpdateDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
- // There is no separate API for PATCH in gRPC.
- // Make a GET call first to retrieve BucketAttrs.
- attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
- if err != nil {
- return err
- }
- // Note: This API currently does not support entites using project ID.
- // Use project numbers in ACL entities. Pending b/233617896.
- var acl []ACLRule
- aclRule := ACLRule{Entity: entity, Role: role}
- acl = append(attrs.DefaultObjectACL, aclRule)
- uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl}
- // Call UpdateBucket with a MetagenerationMatch precondition set.
- if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
- return err
- }
- return nil
- }
- // Bucket ACL methods.
- func (c *grpcStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
- // There is no separate API for PATCH in gRPC.
- // Make a GET call first to retrieve BucketAttrs.
- attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
- if err != nil {
- return err
- }
- // Delete the entity and copy other remaining ACL entities.
- // Note: This API currently does not support entites using project ID.
- // Use project numbers in ACL entities. Pending b/233617896.
- // Return error if entity is not found or a project ID is used.
- invalidEntity := true
- var acl []ACLRule
- for _, a := range attrs.ACL {
- if a.Entity != entity {
- acl = append(acl, a)
- }
- if a.Entity == entity {
- invalidEntity = false
- }
- }
- if invalidEntity {
- return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.ACL, msgEntityNotSupported)
- }
- uattrs := &BucketAttrsToUpdate{acl: acl}
- // Call UpdateBucket with a MetagenerationMatch precondition set.
- if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
- return err
- }
- return nil
- }
- func (c *grpcStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
- attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
- if err != nil {
- return nil, err
- }
- return attrs.ACL, nil
- }
- func (c *grpcStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
- // There is no separate API for PATCH in gRPC.
- // Make a GET call first to retrieve BucketAttrs.
- attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
- if err != nil {
- return err
- }
- // Note: This API currently does not support entites using project ID.
- // Use project numbers in ACL entities. Pending b/233617896.
- var acl []ACLRule
- aclRule := ACLRule{Entity: entity, Role: role}
- acl = append(attrs.ACL, aclRule)
- uattrs := &BucketAttrsToUpdate{acl: acl}
- // Call UpdateBucket with a MetagenerationMatch precondition set.
- if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
- return err
- }
- return nil
- }
- // Object ACL methods.
- func (c *grpcStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error {
- // There is no separate API for PATCH in gRPC.
- // Make a GET call first to retrieve ObjectAttrs.
- attrs, err := c.GetObject(ctx, bucket, object, defaultGen, nil, nil, opts...)
- if err != nil {
- return err
- }
- // Delete the entity and copy other remaining ACL entities.
- // Note: This API currently does not support entites using project ID.
- // Use project numbers in ACL entities. Pending b/233617896.
- // Return error if entity is not found or a project ID is used.
- invalidEntity := true
- var acl []ACLRule
- for _, a := range attrs.ACL {
- if a.Entity != entity {
- acl = append(acl, a)
- }
- if a.Entity == entity {
- invalidEntity = false
- }
- }
- if invalidEntity {
- return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.ACL, msgEntityNotSupported)
- }
- uattrs := &ObjectAttrsToUpdate{ACL: acl}
- // Call UpdateObject with the specified metageneration.
- params := &updateObjectParams{bucket: bucket, object: object, uattrs: uattrs, gen: defaultGen, conds: &Conditions{MetagenerationMatch: attrs.Metageneration}}
- if _, err = c.UpdateObject(ctx, params, opts...); err != nil {
- return err
- }
- return nil
- }
- // ListObjectACLs retrieves object ACL entries. By default, it operates on the latest generation of this object.
- // Selecting a specific generation of this object is not currently supported by the client.
- func (c *grpcStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) {
- o, err := c.GetObject(ctx, bucket, object, defaultGen, nil, nil, opts...)
- if err != nil {
- return nil, err
- }
- return o.ACL, nil
- }
- func (c *grpcStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
- // There is no separate API for PATCH in gRPC.
- // Make a GET call first to retrieve ObjectAttrs.
- attrs, err := c.GetObject(ctx, bucket, object, defaultGen, nil, nil, opts...)
- if err != nil {
- return err
- }
- // Note: This API currently does not support entites using project ID.
- // Use project numbers in ACL entities. Pending b/233617896.
- var acl []ACLRule
- aclRule := ACLRule{Entity: entity, Role: role}
- acl = append(attrs.ACL, aclRule)
- uattrs := &ObjectAttrsToUpdate{ACL: acl}
- // Call UpdateObject with the specified metageneration.
- params := &updateObjectParams{bucket: bucket, object: object, uattrs: uattrs, gen: defaultGen, conds: &Conditions{MetagenerationMatch: attrs.Metageneration}}
- if _, err = c.UpdateObject(ctx, params, opts...); err != nil {
- return err
- }
- return nil
- }
- // Media operations.
- func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) {
- s := callSettings(c.settings, opts...)
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket)
- dstObjPb.Name = req.dstObject.name
- if req.sendCRC32C {
- dstObjPb.Checksums.Crc32C = &req.dstObject.attrs.CRC32C
- }
- srcs := []*storagepb.ComposeObjectRequest_SourceObject{}
- for _, src := range req.srcs {
- srcObjPb := &storagepb.ComposeObjectRequest_SourceObject{Name: src.name, ObjectPreconditions: &storagepb.ComposeObjectRequest_SourceObject_ObjectPreconditions{}}
- if src.gen >= 0 {
- srcObjPb.Generation = src.gen
- }
- if err := applyCondsProto("ComposeObject source", defaultGen, src.conds, srcObjPb.ObjectPreconditions); err != nil {
- return nil, err
- }
- srcs = append(srcs, srcObjPb)
- }
- rawReq := &storagepb.ComposeObjectRequest{
- Destination: dstObjPb,
- SourceObjects: srcs,
- }
- if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, rawReq); err != nil {
- return nil, err
- }
- if req.predefinedACL != "" {
- rawReq.DestinationPredefinedAcl = req.predefinedACL
- }
- if req.dstObject.encryptionKey != nil {
- rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
- }
- var obj *storagepb.Object
- var err error
- if err := run(ctx, func(ctx context.Context) error {
- obj, err = c.raw.ComposeObject(ctx, rawReq, s.gax...)
- return err
- }, s.retry, s.idempotent); err != nil {
- return nil, err
- }
- return newObjectFromProto(obj), nil
- }
- func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
- s := callSettings(c.settings, opts...)
- obj := req.dstObject.attrs.toProtoObject("")
- call := &storagepb.RewriteObjectRequest{
- SourceBucket: bucketResourceName(globalProjectAlias, req.srcObject.bucket),
- SourceObject: req.srcObject.name,
- RewriteToken: req.token,
- DestinationBucket: bucketResourceName(globalProjectAlias, req.dstObject.bucket),
- DestinationName: req.dstObject.name,
- Destination: obj,
- DestinationKmsKey: req.dstObject.keyName,
- DestinationPredefinedAcl: req.predefinedACL,
- CommonObjectRequestParams: toProtoCommonObjectRequestParams(req.dstObject.encryptionKey),
- }
- // The userProject, whether source or destination project, is decided by the code calling the interface.
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- if err := applyCondsProto("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
- return nil, err
- }
- if err := applySourceCondsProto(req.srcObject.gen, req.srcObject.conds, call); err != nil {
- return nil, err
- }
- if len(req.dstObject.encryptionKey) > 0 {
- call.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
- }
- if len(req.srcObject.encryptionKey) > 0 {
- srcParams := toProtoCommonObjectRequestParams(req.srcObject.encryptionKey)
- call.CopySourceEncryptionAlgorithm = srcParams.GetEncryptionAlgorithm()
- call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes()
- call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes()
- }
- call.MaxBytesRewrittenPerCall = req.maxBytesRewrittenPerCall
- var res *storagepb.RewriteResponse
- var err error
- retryCall := func(ctx context.Context) error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err }
- if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
- return nil, err
- }
- r := &rewriteObjectResponse{
- done: res.GetDone(),
- written: res.GetTotalBytesRewritten(),
- size: res.GetObjectSize(),
- token: res.GetRewriteToken(),
- resource: newObjectFromProto(res.GetResource()),
- }
- return r, nil
- }
- func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader")
- defer func() { trace.EndSpan(ctx, err) }()
- s := callSettings(c.settings, opts...)
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- b := bucketResourceName(globalProjectAlias, params.bucket)
- req := &storagepb.ReadObjectRequest{
- Bucket: b,
- Object: params.object,
- CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey),
- }
- // The default is a negative value, which means latest.
- if params.gen >= 0 {
- req.Generation = params.gen
- }
- // Define a function that initiates a Read with offset and length, assuming
- // we have already read seen bytes.
- reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
- // If the context has already expired, return immediately without making
- // we call.
- if err := ctx.Err(); err != nil {
- return nil, nil, err
- }
- cc, cancel := context.WithCancel(ctx)
- req.ReadOffset = params.offset + seen
- // Only set a ReadLimit if length is greater than zero, because <= 0 means
- // to read it all.
- if params.length > 0 {
- req.ReadLimit = params.length - seen
- }
- if err := applyCondsProto("gRPCReader.reopen", params.gen, params.conds, req); err != nil {
- cancel()
- return nil, nil, err
- }
- var stream storagepb.Storage_ReadObjectClient
- var msg *storagepb.ReadObjectResponse
- var err error
- err = run(cc, func(ctx context.Context) error {
- stream, err = c.raw.ReadObject(cc, req, s.gax...)
- if err != nil {
- return err
- }
- msg, err = stream.Recv()
- // These types of errors show up on the Recv call, rather than the
- // initialization of the stream via ReadObject above.
- if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
- return ErrObjectNotExist
- }
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- // Close the stream context we just created to ensure we don't leak
- // resources.
- cancel()
- return nil, nil, err
- }
- return &readStreamResponse{stream, msg}, cancel, nil
- }
- res, cancel, err := reopen(0)
- if err != nil {
- return nil, err
- }
- // The first message was Recv'd on stream open, use it to populate the
- // object metadata.
- msg := res.response
- obj := msg.GetMetadata()
- // This is the size of the entire object, even if only a range was requested.
- size := obj.GetSize()
- r = &Reader{
- Attrs: ReaderObjectAttrs{
- Size: size,
- ContentType: obj.GetContentType(),
- ContentEncoding: obj.GetContentEncoding(),
- CacheControl: obj.GetCacheControl(),
- LastModified: obj.GetUpdateTime().AsTime(),
- Metageneration: obj.GetMetageneration(),
- Generation: obj.GetGeneration(),
- },
- reader: &gRPCReader{
- stream: res.stream,
- reopen: reopen,
- cancel: cancel,
- size: size,
- // Store the content from the first Recv in the
- // client buffer for reading later.
- leftovers: msg.GetChecksummedData().GetContent(),
- settings: s,
- zeroRange: params.length == 0,
- },
- }
- cr := msg.GetContentRange()
- if cr != nil {
- r.Attrs.StartOffset = cr.GetStart()
- r.remain = cr.GetEnd() - cr.GetStart()
- } else {
- r.remain = size
- }
- // For a zero-length request, explicitly close the stream and set remaining
- // bytes to zero.
- if params.length == 0 {
- r.remain = 0
- r.reader.Close()
- }
- // Only support checksums when reading an entire object, not a range.
- if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
- r.wantCRC = checksums.GetCrc32C()
- r.checkCRC = true
- }
- return r, nil
- }
- func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
- s := callSettings(c.settings, opts...)
- var offset int64
- errorf := params.setError
- progress := params.progress
- setObj := params.setObj
- pr, pw := io.Pipe()
- gw := newGRPCWriter(c, params, pr)
- gw.settings = s
- if s.userProject != "" {
- gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject)
- }
- // This function reads the data sent to the pipe and sends sets of messages
- // on the gRPC client-stream as the buffer is filled.
- go func() {
- defer close(params.donec)
- // Loop until there is an error or the Object has been finalized.
- for {
- // Note: This blocks until either the buffer is full or EOF is read.
- recvd, doneReading, err := gw.read()
- if err != nil {
- err = checkCanceled(err)
- errorf(err)
- pr.CloseWithError(err)
- return
- }
- if params.attrs.Retention != nil {
- // TO-DO: remove once ObjectRetention is available - see b/308194853
- err = status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
- errorf(err)
- pr.CloseWithError(err)
- return
- }
- // The chunk buffer is full, but there is no end in sight. This
- // means that either:
- // 1. A resumable upload will need to be used to send
- // multiple chunks, until we are done reading data. Start a
- // resumable upload if it has not already been started.
- // 2. ChunkSize of zero may also have a full buffer, but a resumable
- // session should not be initiated in this case.
- if !doneReading && gw.upid == "" && params.chunkSize != 0 {
- err = gw.startResumableUpload()
- if err != nil {
- err = checkCanceled(err)
- errorf(err)
- pr.CloseWithError(err)
- return
- }
- }
- o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
- if err != nil {
- err = checkCanceled(err)
- errorf(err)
- pr.CloseWithError(err)
- return
- }
- // At this point, the current buffer has been uploaded. For resumable
- // uploads and chunkSize = 0, capture the committed offset here in case
- // the upload was not finalized and another chunk is to be uploaded. Call
- // the progress function for resumable uploads only.
- if gw.upid != "" || gw.chunkSize == 0 {
- offset = off
- }
- if gw.upid != "" {
- progress(offset)
- }
- // When we are done reading data without errors, set the object and
- // finish.
- if doneReading {
- // Build Object from server's response.
- setObj(newObjectFromProto(o))
- return
- }
- }
- }()
- return pw, nil
- }
- // IAM methods.
- func (c *grpcStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) {
- // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
- s := callSettings(c.settings, opts...)
- req := &iampb.GetIamPolicyRequest{
- Resource: bucketResourceName(globalProjectAlias, resource),
- Options: &iampb.GetPolicyOptions{
- RequestedPolicyVersion: version,
- },
- }
- var rp *iampb.Policy
- err := run(ctx, func(ctx context.Context) error {
- var err error
- rp, err = c.raw.GetIamPolicy(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- return rp, err
- }
- func (c *grpcStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error {
- // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
- s := callSettings(c.settings, opts...)
- req := &iampb.SetIamPolicyRequest{
- Resource: bucketResourceName(globalProjectAlias, resource),
- Policy: policy,
- }
- return run(ctx, func(ctx context.Context) error {
- _, err := c.raw.SetIamPolicy(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- }
- func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) {
- // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
- s := callSettings(c.settings, opts...)
- req := &iampb.TestIamPermissionsRequest{
- Resource: bucketResourceName(globalProjectAlias, resource),
- Permissions: permissions,
- }
- var res *iampb.TestIamPermissionsResponse
- err := run(ctx, func(ctx context.Context) error {
- var err error
- res, err = c.raw.TestIamPermissions(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return res.Permissions, nil
- }
- // HMAC Key methods.
- func (c *grpcStorageClient) GetHMACKey(ctx context.Context, project, accessID string, opts ...storageOption) (*HMACKey, error) {
- s := callSettings(c.settings, opts...)
- req := &storagepb.GetHmacKeyRequest{
- AccessId: accessID,
- Project: toProjectResource(project),
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- var metadata *storagepb.HmacKeyMetadata
- err := run(ctx, func(ctx context.Context) error {
- var err error
- metadata, err = c.raw.GetHmacKey(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return toHMACKeyFromProto(metadata), nil
- }
- func (c *grpcStorageClient) ListHMACKeys(ctx context.Context, project, serviceAccountEmail string, showDeletedKeys bool, opts ...storageOption) *HMACKeysIterator {
- s := callSettings(c.settings, opts...)
- req := &storagepb.ListHmacKeysRequest{
- Project: toProjectResource(project),
- ServiceAccountEmail: serviceAccountEmail,
- ShowDeletedKeys: showDeletedKeys,
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- it := &HMACKeysIterator{
- ctx: ctx,
- projectID: project,
- retry: s.retry,
- }
- fetch := func(pageSize int, pageToken string) (token string, err error) {
- var hmacKeys []*storagepb.HmacKeyMetadata
- err = run(it.ctx, func(ctx context.Context) error {
- gitr := c.raw.ListHmacKeys(ctx, req, s.gax...)
- hmacKeys, token, err = gitr.InternalFetch(pageSize, pageToken)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return "", err
- }
- for _, hkmd := range hmacKeys {
- hk := toHMACKeyFromProto(hkmd)
- it.hmacKeys = append(it.hmacKeys, hk)
- }
- return token, nil
- }
- it.pageInfo, it.nextFunc = iterator.NewPageInfo(
- fetch,
- func() int { return len(it.hmacKeys) - it.index },
- func() interface{} {
- prev := it.hmacKeys
- it.hmacKeys = it.hmacKeys[:0]
- it.index = 0
- return prev
- })
- return it
- }
- func (c *grpcStorageClient) UpdateHMACKey(ctx context.Context, project, serviceAccountEmail, accessID string, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) {
- s := callSettings(c.settings, opts...)
- hk := &storagepb.HmacKeyMetadata{
- AccessId: accessID,
- Project: toProjectResource(project),
- ServiceAccountEmail: serviceAccountEmail,
- State: string(attrs.State),
- Etag: attrs.Etag,
- }
- var paths []string
- fieldMask := &fieldmaskpb.FieldMask{
- Paths: paths,
- }
- if attrs.State != "" {
- fieldMask.Paths = append(fieldMask.Paths, "state")
- }
- req := &storagepb.UpdateHmacKeyRequest{
- HmacKey: hk,
- UpdateMask: fieldMask,
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- var metadata *storagepb.HmacKeyMetadata
- err := run(ctx, func(ctx context.Context) error {
- var err error
- metadata, err = c.raw.UpdateHmacKey(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return toHMACKeyFromProto(metadata), nil
- }
- func (c *grpcStorageClient) CreateHMACKey(ctx context.Context, project, serviceAccountEmail string, opts ...storageOption) (*HMACKey, error) {
- s := callSettings(c.settings, opts...)
- req := &storagepb.CreateHmacKeyRequest{
- Project: toProjectResource(project),
- ServiceAccountEmail: serviceAccountEmail,
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- var res *storagepb.CreateHmacKeyResponse
- err := run(ctx, func(ctx context.Context) error {
- var err error
- res, err = c.raw.CreateHmacKey(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- key := toHMACKeyFromProto(res.Metadata)
- key.Secret = base64.StdEncoding.EncodeToString(res.SecretKeyBytes)
- return key, nil
- }
- func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, project string, accessID string, opts ...storageOption) error {
- s := callSettings(c.settings, opts...)
- req := &storagepb.DeleteHmacKeyRequest{
- AccessId: accessID,
- Project: toProjectResource(project),
- }
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- return run(ctx, func(ctx context.Context) error {
- return c.raw.DeleteHmacKey(ctx, req, s.gax...)
- }, s.retry, s.idempotent)
- }
- // Notification methods.
- func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.ListNotifications")
- defer func() { trace.EndSpan(ctx, err) }()
- s := callSettings(c.settings, opts...)
- if s.userProject != "" {
- ctx = setUserProjectMetadata(ctx, s.userProject)
- }
- req := &storagepb.ListNotificationConfigsRequest{
- Parent: bucketResourceName(globalProjectAlias, bucket),
- }
- var notifications []*storagepb.NotificationConfig
- err = run(ctx, func(ctx context.Context) error {
- gitr := c.raw.ListNotificationConfigs(ctx, req, s.gax...)
- for {
- // PageSize is not set and fallbacks to the API default pageSize of 100.
- items, nextPageToken, err := gitr.InternalFetch(int(req.GetPageSize()), req.GetPageToken())
- if err != nil {
- return err
- }
- notifications = append(notifications, items...)
- // If there are no more results, nextPageToken is empty and err is nil.
- if nextPageToken == "" {
- return err
- }
- req.PageToken = nextPageToken
- }
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return notificationsToMapFromProto(notifications), nil
- }
- func (c *grpcStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.CreateNotification")
- defer func() { trace.EndSpan(ctx, err) }()
- s := callSettings(c.settings, opts...)
- req := &storagepb.CreateNotificationConfigRequest{
- Parent: bucketResourceName(globalProjectAlias, bucket),
- NotificationConfig: toProtoNotification(n),
- }
- var pbn *storagepb.NotificationConfig
- err = run(ctx, func(ctx context.Context) error {
- var err error
- pbn, err = c.raw.CreateNotificationConfig(ctx, req, s.gax...)
- return err
- }, s.retry, s.idempotent)
- if err != nil {
- return nil, err
- }
- return toNotificationFromProto(pbn), err
- }
- func (c *grpcStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.DeleteNotification")
- defer func() { trace.EndSpan(ctx, err) }()
- s := callSettings(c.settings, opts...)
- req := &storagepb.DeleteNotificationConfigRequest{Name: id}
- return run(ctx, func(ctx context.Context) error {
- return c.raw.DeleteNotificationConfig(ctx, req, s.gax...)
- }, s.retry, s.idempotent)
- }
- // setUserProjectMetadata appends a project ID to the outgoing Context metadata
- // via the x-goog-user-project system parameter defined at
- // https://cloud.google.com/apis/docs/system-parameters. This is only for
- // billing purposes, and is generally optional, except for requester-pays
- // buckets.
- func setUserProjectMetadata(ctx context.Context, project string) context.Context {
- return metadata.AppendToOutgoingContext(ctx, "x-goog-user-project", project)
- }
- type readStreamResponse struct {
- stream storagepb.Storage_ReadObjectClient
- response *storagepb.ReadObjectResponse
- }
- type gRPCReader struct {
- seen, size int64
- zeroRange bool
- stream storagepb.Storage_ReadObjectClient
- reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
- leftovers []byte
- cancel context.CancelFunc
- settings *settings
- }
- // Read reads bytes into the user's buffer from an open gRPC stream.
- func (r *gRPCReader) Read(p []byte) (int, error) {
- // The entire object has been read by this reader, return EOF.
- if r.size == r.seen || r.zeroRange {
- return 0, io.EOF
- }
- // No stream to read from, either never initialized or Close was called.
- // Note: There is a potential concurrency issue if multiple routines are
- // using the same reader. One encounters an error and the stream is closed
- // and then reopened while the other routine attempts to read from it.
- if r.stream == nil {
- return 0, fmt.Errorf("reader has been closed")
- }
- var n int
- // Read leftovers and return what was available to conform to the Reader
- // interface: https://pkg.go.dev/io#Reader.
- if len(r.leftovers) > 0 {
- n = copy(p, r.leftovers)
- r.seen += int64(n)
- r.leftovers = r.leftovers[n:]
- return n, nil
- }
- // Attempt to Recv the next message on the stream.
- msg, err := r.recv()
- if err != nil {
- return 0, err
- }
- // TODO: Determine if we need to capture incremental CRC32C for this
- // chunk. The Object CRC32C checksum is captured when directed to read
- // the entire Object. If directed to read a range, we may need to
- // calculate the range's checksum for verification if the checksum is
- // present in the response here.
- // TODO: Figure out if we need to support decompressive transcoding
- // https://cloud.google.com/storage/docs/transcoding.
- content := msg.GetChecksummedData().GetContent()
- n = copy(p[n:], content)
- leftover := len(content) - n
- if leftover > 0 {
- // Wasn't able to copy all of the data in the message, store for
- // future Read calls.
- r.leftovers = content[n:]
- }
- r.seen += int64(n)
- return n, nil
- }
- // Close cancels the read stream's context in order for it to be closed and
- // collected.
- func (r *gRPCReader) Close() error {
- if r.cancel != nil {
- r.cancel()
- }
- r.stream = nil
- return nil
- }
- // recv attempts to Recv the next message on the stream. In the event
- // that a retryable error is encountered, the stream will be closed, reopened,
- // and Recv again. This will attempt to Recv until one of the following is true:
- //
- // * Recv is successful
- // * A non-retryable error is encountered
- // * The Reader's context is canceled
- //
- // The last error received is the one that is returned, which could be from
- // an attempt to reopen the stream.
- func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
- msg, err := r.stream.Recv()
- var shouldRetry = ShouldRetry
- if r.settings.retry != nil && r.settings.retry.shouldRetry != nil {
- shouldRetry = r.settings.retry.shouldRetry
- }
- if err != nil && shouldRetry(err) {
- // This will "close" the existing stream and immediately attempt to
- // reopen the stream, but will backoff if further attempts are necessary.
- // Reopening the stream Recvs the first message, so if retrying is
- // successful, the next logical chunk will be returned.
- msg, err = r.reopenStream()
- }
- return msg, err
- }
- // reopenStream "closes" the existing stream and attempts to reopen a stream and
- // sets the Reader's stream and cancelStream properties in the process.
- func (r *gRPCReader) reopenStream() (*storagepb.ReadObjectResponse, error) {
- // Close existing stream and initialize new stream with updated offset.
- r.Close()
- res, cancel, err := r.reopen(r.seen)
- if err != nil {
- return nil, err
- }
- r.stream = res.stream
- r.cancel = cancel
- return res.response, nil
- }
- func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) *gRPCWriter {
- size := params.chunkSize
- // Round up chunksize to nearest 256KiB
- if size%googleapi.MinUploadChunkSize != 0 {
- size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize)
- }
- // A completely bufferless upload is not possible as it is in JSON because
- // the buffer must be provided to the message. However use the minimum size
- // possible in this case.
- if params.chunkSize == 0 {
- size = googleapi.MinUploadChunkSize
- }
- return &gRPCWriter{
- buf: make([]byte, size),
- c: c,
- ctx: params.ctx,
- reader: r,
- bucket: params.bucket,
- attrs: params.attrs,
- conds: params.conds,
- encryptionKey: params.encryptionKey,
- sendCRC32C: params.sendCRC32C,
- chunkSize: params.chunkSize,
- forceEmptyContentType: params.forceEmptyContentType,
- }
- }
- // gRPCWriter is a wrapper around the the gRPC client-stream API that manages
- // sending chunks of data provided by the user over the stream.
- type gRPCWriter struct {
- c *grpcStorageClient
- buf []byte
- reader io.Reader
- ctx context.Context
- bucket string
- attrs *ObjectAttrs
- conds *Conditions
- encryptionKey []byte
- settings *settings
- sendCRC32C bool
- chunkSize int
- forceEmptyContentType bool
- // The gRPC client-stream used for sending buffers.
- stream storagepb.Storage_BidiWriteObjectClient
- // The Resumable Upload ID started by a gRPC-based Writer.
- upid string
- }
- // startResumableUpload initializes a Resumable Upload with gRPC and sets the
- // upload ID on the Writer.
- func (w *gRPCWriter) startResumableUpload() error {
- spec, err := w.writeObjectSpec()
- if err != nil {
- return err
- }
- req := &storagepb.StartResumableWriteRequest{
- WriteObjectSpec: spec,
- CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
- }
- // TODO: Currently the checksums are only sent on the request to initialize
- // the upload, but in the future, we must also support sending it
- // on the *last* message of the stream.
- req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
- return run(w.ctx, func(ctx context.Context) error {
- upres, err := w.c.raw.StartResumableWrite(w.ctx, req)
- w.upid = upres.GetUploadId()
- return err
- }, w.settings.retry, w.settings.idempotent)
- }
- // queryProgress is a helper that queries the status of the resumable upload
- // associated with the given upload ID.
- func (w *gRPCWriter) queryProgress() (int64, error) {
- var persistedSize int64
- err := run(w.ctx, func(ctx context.Context) error {
- q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{
- UploadId: w.upid,
- })
- persistedSize = q.GetPersistedSize()
- return err
- }, w.settings.retry, true)
- // q.GetCommittedSize() will return 0 if q is nil.
- return persistedSize, err
- }
- // uploadBuffer uploads the buffer at the given offset using a bi-directional
- // Write stream. It will open a new stream if necessary (on the first call or
- // after resuming from failure). The resulting write offset after uploading the
- // buffer is returned, as well as well as the final Object if the upload is
- // completed.
- //
- // Returns object, persisted size, and any error that is not retriable.
- func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
- var shouldRetry = ShouldRetry
- if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
- shouldRetry = w.settings.retry.shouldRetry
- }
- var err error
- var lastWriteOfEntireObject bool
- sent := 0
- writeOffset := start
- toWrite := w.buf[:recvd]
- // Send a request with as many bytes as possible.
- // Loop until all bytes are sent.
- for {
- bytesNotYetSent := recvd - sent
- remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize
- if remainingDataFitsInSingleReq && doneReading {
- lastWriteOfEntireObject = true
- }
- // Send the maximum amount of bytes we can, unless we don't have that many.
- bytesToSendInCurrReq := maxPerMessageWriteSize
- if remainingDataFitsInSingleReq {
- bytesToSendInCurrReq = bytesNotYetSent
- }
- // Prepare chunk section for upload.
- data := toWrite[sent : sent+bytesToSendInCurrReq]
- req := &storagepb.BidiWriteObjectRequest{
- Data: &storagepb.BidiWriteObjectRequest_ChecksummedData{
- ChecksummedData: &storagepb.ChecksummedData{
- Content: data,
- },
- },
- WriteOffset: writeOffset,
- FinishWrite: lastWriteOfEntireObject,
- Flush: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
- StateLookup: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
- }
- // Open a new stream if necessary and set the first_message field on
- // the request. The first message on the WriteObject stream must either
- // be the Object or the Resumable Upload ID.
- if w.stream == nil {
- hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
- ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
- w.stream, err = w.c.raw.BidiWriteObject(ctx)
- if err != nil {
- return nil, 0, err
- }
- if w.upid != "" { // resumable upload
- req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: w.upid}
- } else { // non-resumable
- spec, err := w.writeObjectSpec()
- if err != nil {
- return nil, 0, err
- }
- req.FirstMessage = &storagepb.BidiWriteObjectRequest_WriteObjectSpec{
- WriteObjectSpec: spec,
- }
- req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
- // For a non-resumable upload, checksums must be sent in this message.
- // TODO: Currently the checksums are only sent on the first message
- // of the stream, but in the future, we must also support sending it
- // on the *last* message of the stream (instead of the first).
- req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
- }
- }
- err = w.stream.Send(req)
- if err == io.EOF {
- // err was io.EOF. The client-side of a stream only gets an EOF on Send
- // when the backend closes the stream and wants to return an error
- // status.
- // Receive from the stream Recv() until it returns a non-nil error
- // to receive the server's status as an error. We may get multiple
- // messages before the error due to buffering.
- err = nil
- for err == nil {
- _, err = w.stream.Recv()
- }
- // Drop the stream reference as a new one will need to be created if
- // we retry.
- w.stream = nil
- // Drop the stream reference as a new one will need to be created if
- // we can retry the upload
- w.stream = nil
- // Retriable errors mean we should start over and attempt to
- // resend the entire buffer via a new stream.
- // If not retriable, falling through will return the error received.
- if shouldRetry(err) {
- // TODO: Add test case for failure modes of querying progress.
- writeOffset, err = w.determineOffset(start)
- if err != nil {
- return nil, 0, err
- }
- sent = int(writeOffset) - int(start)
- // Continue sending requests, opening a new stream and resending
- // any bytes not yet persisted as per QueryWriteStatus
- continue
- }
- }
- if err != nil {
- return nil, 0, err
- }
- // Update the immediate stream's sent total and the upload offset with
- // the data sent.
- sent += len(data)
- writeOffset += int64(len(data))
- // Not done sending data, do not attempt to commit it yet, loop around
- // and send more data.
- if recvd-sent > 0 {
- continue
- }
- // The buffer has been uploaded and there is still more data to be
- // uploaded, but this is not a resumable upload session. Therefore,
- // don't check persisted data.
- if !lastWriteOfEntireObject && w.chunkSize == 0 {
- return nil, writeOffset, nil
- }
- // Done sending the data in the buffer (remainingDataFitsInSingleReq
- // should == true if we reach this code).
- // If we are done sending the whole object, close the stream and get the final
- // object. Otherwise, receive from the stream to confirm the persisted data.
- if !lastWriteOfEntireObject {
- resp, err := w.stream.Recv()
- // Retriable errors mean we should start over and attempt to
- // resend the entire buffer via a new stream.
- // If not retriable, falling through will return the error received
- // from closing the stream.
- if shouldRetry(err) {
- writeOffset, err = w.determineOffset(start)
- if err != nil {
- return nil, 0, err
- }
- sent = int(writeOffset) - int(start)
- // Drop the stream reference as a new one will need to be created.
- w.stream = nil
- continue
- }
- if err != nil {
- return nil, 0, err
- }
- if resp.GetPersistedSize() != writeOffset {
- // Retry if not all bytes were persisted.
- writeOffset = resp.GetPersistedSize()
- sent = int(writeOffset) - int(start)
- continue
- }
- } else {
- // If the object is done uploading, close the send stream to signal
- // to the server that we are done sending so that we can receive
- // from the stream without blocking.
- err = w.stream.CloseSend()
- if err != nil {
- // CloseSend() retries the send internally. It never returns an
- // error in the current implementation, but we check it anyway in
- // case that it does in the future.
- return nil, 0, err
- }
- // Stream receives do not block once send is closed, but we may not
- // receive the response with the object right away; loop until we
- // receive the object or error out.
- var obj *storagepb.Object
- for obj == nil {
- resp, err := w.stream.Recv()
- if err != nil {
- return nil, 0, err
- }
- obj = resp.GetResource()
- }
- // Even though we received the object response, continue reading
- // until we receive a non-nil error, to ensure the stream does not
- // leak even if the context isn't cancelled. See:
- // https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
- for err == nil {
- _, err = w.stream.Recv()
- }
- return obj, writeOffset, nil
- }
- return nil, writeOffset, nil
- }
- }
- // determineOffset either returns the offset given to it in the case of a simple
- // upload, or queries the write status in the case a resumable upload is being
- // used.
- func (w *gRPCWriter) determineOffset(offset int64) (int64, error) {
- // For a Resumable Upload, we must start from however much data
- // was committed.
- if w.upid != "" {
- committed, err := w.queryProgress()
- if err != nil {
- return 0, err
- }
- offset = committed
- }
- return offset, nil
- }
- // writeObjectSpec constructs a WriteObjectSpec proto using the Writer's
- // ObjectAttrs and applies its Conditions. This is only used for gRPC.
- func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
- // To avoid modifying the ObjectAttrs embeded in the calling writer, deref
- // the ObjectAttrs pointer to make a copy, then assign the desired name to
- // the attribute.
- attrs := *w.attrs
- spec := &storagepb.WriteObjectSpec{
- Resource: attrs.toProtoObject(w.bucket),
- }
- // WriteObject doesn't support the generation condition, so use default.
- if err := applyCondsProto("WriteObject", defaultGen, w.conds, spec); err != nil {
- return nil, err
- }
- return spec, nil
- }
- // read copies the data in the reader to the given buffer and reports how much
- // data was read into the buffer and if there is no more data to read (EOF).
- // Furthermore, if the attrs.ContentType is unset, the first bytes of content
- // will be sniffed for a matching content type unless forceEmptyContentType is enabled.
- func (w *gRPCWriter) read() (int, bool, error) {
- if w.attrs.ContentType == "" && !w.forceEmptyContentType {
- w.reader, w.attrs.ContentType = gax.DetermineContentType(w.reader)
- }
- // Set n to -1 to start the Read loop.
- var n, recvd int = -1, 0
- var err error
- for err == nil && n != 0 {
- // The routine blocks here until data is received.
- n, err = w.reader.Read(w.buf[recvd:])
- recvd += n
- }
- var done bool
- if err == io.EOF {
- done = true
- err = nil
- }
- return recvd, done, err
- }
- func checkCanceled(err error) error {
- if status.Code(err) == codes.Canceled {
- return context.Canceled
- }
- return err
- }
|