grpc_client.go 62 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889
  1. // Copyright 2022 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "context"
  17. "encoding/base64"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net/url"
  22. "os"
  23. "cloud.google.com/go/iam/apiv1/iampb"
  24. "cloud.google.com/go/internal/trace"
  25. gapic "cloud.google.com/go/storage/internal/apiv2"
  26. "cloud.google.com/go/storage/internal/apiv2/storagepb"
  27. "github.com/googleapis/gax-go/v2"
  28. "google.golang.org/api/googleapi"
  29. "google.golang.org/api/iterator"
  30. "google.golang.org/api/option"
  31. "google.golang.org/api/option/internaloption"
  32. "google.golang.org/grpc"
  33. "google.golang.org/grpc/codes"
  34. "google.golang.org/grpc/metadata"
  35. "google.golang.org/grpc/status"
  36. fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb"
  37. )
  38. const (
  39. // defaultConnPoolSize is the default number of channels
  40. // to initialize in the GAPIC gRPC connection pool. A larger
  41. // connection pool may be necessary for jobs that require
  42. // high throughput and/or leverage many concurrent streams
  43. // if not running via DirectPath.
  44. //
  45. // This is only used for the gRPC client.
  46. defaultConnPoolSize = 1
  47. // maxPerMessageWriteSize is the maximum amount of content that can be sent
  48. // per WriteObjectRequest message. A buffer reaching this amount will
  49. // precipitate a flush of the buffer. It is only used by the gRPC Writer
  50. // implementation.
  51. maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)
  52. // globalProjectAlias is the project ID alias used for global buckets.
  53. //
  54. // This is only used for the gRPC API.
  55. globalProjectAlias = "_"
  56. // msgEntityNotSupported indicates ACL entites using project ID are not currently supported.
  57. //
  58. // This is only used for the gRPC API.
  59. msgEntityNotSupported = "The gRPC API currently does not support ACL entities using project ID, use project numbers instead"
  60. )
  61. // defaultGRPCOptions returns a set of the default client options
  62. // for gRPC client initialization.
  63. func defaultGRPCOptions() []option.ClientOption {
  64. defaults := []option.ClientOption{
  65. option.WithGRPCConnectionPool(defaultConnPoolSize),
  66. }
  67. // Set emulator options for gRPC if an emulator was specified. Note that in a
  68. // hybrid client, STORAGE_EMULATOR_HOST will set the host to use for HTTP and
  69. // STORAGE_EMULATOR_HOST_GRPC will set the host to use for gRPC (when using a
  70. // local emulator, HTTP and gRPC must use different ports, so this is
  71. // necessary).
  72. //
  73. // TODO: When the newHybridClient is not longer used, remove
  74. // STORAGE_EMULATOR_HOST_GRPC and use STORAGE_EMULATOR_HOST for both the
  75. // HTTP and gRPC based clients.
  76. if host := os.Getenv("STORAGE_EMULATOR_HOST_GRPC"); host != "" {
  77. // Strip the scheme from the emulator host. WithEndpoint does not take a
  78. // scheme for gRPC.
  79. host = stripScheme(host)
  80. defaults = append(defaults,
  81. option.WithEndpoint(host),
  82. option.WithGRPCDialOption(grpc.WithInsecure()),
  83. option.WithoutAuthentication(),
  84. )
  85. } else {
  86. // Only enable DirectPath when the emulator is not being targeted.
  87. defaults = append(defaults, internaloption.EnableDirectPath(true))
  88. }
  89. return defaults
  90. }
  91. // grpcStorageClient is the gRPC API implementation of the transport-agnostic
  92. // storageClient interface.
  93. type grpcStorageClient struct {
  94. raw *gapic.Client
  95. settings *settings
  96. }
  97. // newGRPCStorageClient initializes a new storageClient that uses the gRPC
  98. // Storage API.
  99. func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) {
  100. s := initSettings(opts...)
  101. s.clientOption = append(defaultGRPCOptions(), s.clientOption...)
  102. config := newStorageConfig(s.clientOption...)
  103. if config.readAPIWasSet {
  104. return nil, errors.New("storage: GRPC is incompatible with any option that specifies an API for reads")
  105. }
  106. g, err := gapic.NewClient(ctx, s.clientOption...)
  107. if err != nil {
  108. return nil, err
  109. }
  110. return &grpcStorageClient{
  111. raw: g,
  112. settings: s,
  113. }, nil
  114. }
  115. func (c *grpcStorageClient) Close() error {
  116. return c.raw.Close()
  117. }
  118. // Top-level methods.
  119. func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) {
  120. s := callSettings(c.settings, opts...)
  121. req := &storagepb.GetServiceAccountRequest{
  122. Project: toProjectResource(project),
  123. }
  124. var resp *storagepb.ServiceAccount
  125. err := run(ctx, func(ctx context.Context) error {
  126. var err error
  127. resp, err = c.raw.GetServiceAccount(ctx, req, s.gax...)
  128. return err
  129. }, s.retry, s.idempotent)
  130. if err != nil {
  131. return "", err
  132. }
  133. return resp.EmailAddress, err
  134. }
  135. func (c *grpcStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, enableObjectRetention *bool, opts ...storageOption) (*BucketAttrs, error) {
  136. if enableObjectRetention != nil {
  137. // TO-DO: implement ObjectRetention once available - see b/308194853
  138. return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
  139. }
  140. s := callSettings(c.settings, opts...)
  141. b := attrs.toProtoBucket()
  142. b.Project = toProjectResource(project)
  143. // If there is lifecycle information but no location, explicitly set
  144. // the location. This is a GCS quirk/bug.
  145. if b.GetLocation() == "" && b.GetLifecycle() != nil {
  146. b.Location = "US"
  147. }
  148. req := &storagepb.CreateBucketRequest{
  149. Parent: fmt.Sprintf("projects/%s", globalProjectAlias),
  150. Bucket: b,
  151. BucketId: bucket,
  152. }
  153. if attrs != nil {
  154. req.PredefinedAcl = attrs.PredefinedACL
  155. req.PredefinedDefaultObjectAcl = attrs.PredefinedDefaultObjectACL
  156. }
  157. var battrs *BucketAttrs
  158. err := run(ctx, func(ctx context.Context) error {
  159. res, err := c.raw.CreateBucket(ctx, req, s.gax...)
  160. battrs = newBucketFromProto(res)
  161. return err
  162. }, s.retry, s.idempotent)
  163. return battrs, err
  164. }
  165. func (c *grpcStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator {
  166. s := callSettings(c.settings, opts...)
  167. it := &BucketIterator{
  168. ctx: ctx,
  169. projectID: project,
  170. }
  171. var gitr *gapic.BucketIterator
  172. fetch := func(pageSize int, pageToken string) (token string, err error) {
  173. var buckets []*storagepb.Bucket
  174. var next string
  175. err = run(it.ctx, func(ctx context.Context) error {
  176. // Initialize GAPIC-based iterator when pageToken is empty, which
  177. // indicates that this fetch call is attempting to get the first page.
  178. //
  179. // Note: Initializing the GAPIC-based iterator lazily is necessary to
  180. // capture the BucketIterator.Prefix set by the user *after* the
  181. // BucketIterator is returned to them from the veneer.
  182. if pageToken == "" {
  183. req := &storagepb.ListBucketsRequest{
  184. Parent: toProjectResource(it.projectID),
  185. Prefix: it.Prefix,
  186. }
  187. gitr = c.raw.ListBuckets(ctx, req, s.gax...)
  188. }
  189. buckets, next, err = gitr.InternalFetch(pageSize, pageToken)
  190. return err
  191. }, s.retry, s.idempotent)
  192. if err != nil {
  193. return "", err
  194. }
  195. for _, bkt := range buckets {
  196. b := newBucketFromProto(bkt)
  197. it.buckets = append(it.buckets, b)
  198. }
  199. return next, nil
  200. }
  201. it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  202. fetch,
  203. func() int { return len(it.buckets) },
  204. func() interface{} { b := it.buckets; it.buckets = nil; return b })
  205. return it
  206. }
  207. // Bucket methods.
  208. func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
  209. s := callSettings(c.settings, opts...)
  210. req := &storagepb.DeleteBucketRequest{
  211. Name: bucketResourceName(globalProjectAlias, bucket),
  212. }
  213. if err := applyBucketCondsProto("grpcStorageClient.DeleteBucket", conds, req); err != nil {
  214. return err
  215. }
  216. if s.userProject != "" {
  217. ctx = setUserProjectMetadata(ctx, s.userProject)
  218. }
  219. return run(ctx, func(ctx context.Context) error {
  220. return c.raw.DeleteBucket(ctx, req, s.gax...)
  221. }, s.retry, s.idempotent)
  222. }
  223. func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
  224. s := callSettings(c.settings, opts...)
  225. req := &storagepb.GetBucketRequest{
  226. Name: bucketResourceName(globalProjectAlias, bucket),
  227. ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}},
  228. }
  229. if err := applyBucketCondsProto("grpcStorageClient.GetBucket", conds, req); err != nil {
  230. return nil, err
  231. }
  232. if s.userProject != "" {
  233. ctx = setUserProjectMetadata(ctx, s.userProject)
  234. }
  235. var battrs *BucketAttrs
  236. err := run(ctx, func(ctx context.Context) error {
  237. res, err := c.raw.GetBucket(ctx, req, s.gax...)
  238. battrs = newBucketFromProto(res)
  239. return err
  240. }, s.retry, s.idempotent)
  241. if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
  242. return nil, ErrBucketNotExist
  243. }
  244. return battrs, err
  245. }
  246. func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
  247. s := callSettings(c.settings, opts...)
  248. b := uattrs.toProtoBucket()
  249. b.Name = bucketResourceName(globalProjectAlias, bucket)
  250. req := &storagepb.UpdateBucketRequest{
  251. Bucket: b,
  252. PredefinedAcl: uattrs.PredefinedACL,
  253. PredefinedDefaultObjectAcl: uattrs.PredefinedDefaultObjectACL,
  254. }
  255. if err := applyBucketCondsProto("grpcStorageClient.UpdateBucket", conds, req); err != nil {
  256. return nil, err
  257. }
  258. if s.userProject != "" {
  259. ctx = setUserProjectMetadata(ctx, s.userProject)
  260. }
  261. var paths []string
  262. fieldMask := &fieldmaskpb.FieldMask{
  263. Paths: paths,
  264. }
  265. if uattrs.CORS != nil {
  266. fieldMask.Paths = append(fieldMask.Paths, "cors")
  267. }
  268. if uattrs.DefaultEventBasedHold != nil {
  269. fieldMask.Paths = append(fieldMask.Paths, "default_event_based_hold")
  270. }
  271. if uattrs.RetentionPolicy != nil {
  272. fieldMask.Paths = append(fieldMask.Paths, "retention_policy")
  273. }
  274. if uattrs.VersioningEnabled != nil {
  275. fieldMask.Paths = append(fieldMask.Paths, "versioning")
  276. }
  277. if uattrs.RequesterPays != nil {
  278. fieldMask.Paths = append(fieldMask.Paths, "billing")
  279. }
  280. if uattrs.BucketPolicyOnly != nil || uattrs.UniformBucketLevelAccess != nil || uattrs.PublicAccessPrevention != PublicAccessPreventionUnknown {
  281. fieldMask.Paths = append(fieldMask.Paths, "iam_config")
  282. }
  283. if uattrs.Encryption != nil {
  284. fieldMask.Paths = append(fieldMask.Paths, "encryption")
  285. }
  286. if uattrs.Lifecycle != nil {
  287. fieldMask.Paths = append(fieldMask.Paths, "lifecycle")
  288. }
  289. if uattrs.Logging != nil {
  290. fieldMask.Paths = append(fieldMask.Paths, "logging")
  291. }
  292. if uattrs.Website != nil {
  293. fieldMask.Paths = append(fieldMask.Paths, "website")
  294. }
  295. if uattrs.PredefinedACL != "" {
  296. // In cases where PredefinedACL is set, Acl is cleared.
  297. fieldMask.Paths = append(fieldMask.Paths, "acl")
  298. }
  299. if uattrs.PredefinedDefaultObjectACL != "" {
  300. // In cases where PredefinedDefaultObjectACL is set, DefaultObjectAcl is cleared.
  301. fieldMask.Paths = append(fieldMask.Paths, "default_object_acl")
  302. }
  303. // Note: This API currently does not support entites using project ID.
  304. // Use project numbers in ACL entities. Pending b/233617896.
  305. if uattrs.acl != nil {
  306. // In cases where acl is set by UpdateBucketACL method.
  307. fieldMask.Paths = append(fieldMask.Paths, "acl")
  308. }
  309. if uattrs.defaultObjectACL != nil {
  310. // In cases where defaultObjectACL is set by UpdateBucketACL method.
  311. fieldMask.Paths = append(fieldMask.Paths, "default_object_acl")
  312. }
  313. if uattrs.StorageClass != "" {
  314. fieldMask.Paths = append(fieldMask.Paths, "storage_class")
  315. }
  316. if uattrs.RPO != RPOUnknown {
  317. fieldMask.Paths = append(fieldMask.Paths, "rpo")
  318. }
  319. if uattrs.Autoclass != nil {
  320. fieldMask.Paths = append(fieldMask.Paths, "autoclass")
  321. }
  322. for label := range uattrs.setLabels {
  323. fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("labels.%s", label))
  324. }
  325. // Delete a label by not including it in Bucket.Labels but adding the key to the update mask.
  326. for label := range uattrs.deleteLabels {
  327. fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("labels.%s", label))
  328. }
  329. req.UpdateMask = fieldMask
  330. var battrs *BucketAttrs
  331. err := run(ctx, func(ctx context.Context) error {
  332. res, err := c.raw.UpdateBucket(ctx, req, s.gax...)
  333. battrs = newBucketFromProto(res)
  334. return err
  335. }, s.retry, s.idempotent)
  336. return battrs, err
  337. }
  338. func (c *grpcStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
  339. s := callSettings(c.settings, opts...)
  340. req := &storagepb.LockBucketRetentionPolicyRequest{
  341. Bucket: bucketResourceName(globalProjectAlias, bucket),
  342. }
  343. if err := applyBucketCondsProto("grpcStorageClient.LockBucketRetentionPolicy", conds, req); err != nil {
  344. return err
  345. }
  346. return run(ctx, func(ctx context.Context) error {
  347. _, err := c.raw.LockBucketRetentionPolicy(ctx, req, s.gax...)
  348. return err
  349. }, s.retry, s.idempotent)
  350. }
  351. func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator {
  352. s := callSettings(c.settings, opts...)
  353. it := &ObjectIterator{
  354. ctx: ctx,
  355. }
  356. if q != nil {
  357. it.query = *q
  358. }
  359. req := &storagepb.ListObjectsRequest{
  360. Parent: bucketResourceName(globalProjectAlias, bucket),
  361. Prefix: it.query.Prefix,
  362. Delimiter: it.query.Delimiter,
  363. Versions: it.query.Versions,
  364. LexicographicStart: it.query.StartOffset,
  365. LexicographicEnd: it.query.EndOffset,
  366. IncludeTrailingDelimiter: it.query.IncludeTrailingDelimiter,
  367. MatchGlob: it.query.MatchGlob,
  368. ReadMask: q.toFieldMask(), // a nil Query still results in a "*" FieldMask
  369. }
  370. if s.userProject != "" {
  371. ctx = setUserProjectMetadata(ctx, s.userProject)
  372. }
  373. fetch := func(pageSize int, pageToken string) (token string, err error) {
  374. // IncludeFoldersAsPrefixes is not supported for gRPC
  375. // TODO: remove this when support is added in the proto.
  376. if it.query.IncludeFoldersAsPrefixes {
  377. return "", status.Errorf(codes.Unimplemented, "storage: IncludeFoldersAsPrefixes is not supported in gRPC")
  378. }
  379. var objects []*storagepb.Object
  380. var gitr *gapic.ObjectIterator
  381. err = run(it.ctx, func(ctx context.Context) error {
  382. gitr = c.raw.ListObjects(ctx, req, s.gax...)
  383. it.ctx = ctx
  384. objects, token, err = gitr.InternalFetch(pageSize, pageToken)
  385. return err
  386. }, s.retry, s.idempotent)
  387. if err != nil {
  388. if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
  389. err = ErrBucketNotExist
  390. }
  391. return "", err
  392. }
  393. for _, obj := range objects {
  394. b := newObjectFromProto(obj)
  395. it.items = append(it.items, b)
  396. }
  397. // Response is always non-nil after a successful request.
  398. res := gitr.Response.(*storagepb.ListObjectsResponse)
  399. for _, prefix := range res.GetPrefixes() {
  400. it.items = append(it.items, &ObjectAttrs{Prefix: prefix})
  401. }
  402. return token, nil
  403. }
  404. it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  405. fetch,
  406. func() int { return len(it.items) },
  407. func() interface{} { b := it.items; it.items = nil; return b })
  408. return it
  409. }
  410. // Object metadata methods.
  411. func (c *grpcStorageClient) DeleteObject(ctx context.Context, bucket, object string, gen int64, conds *Conditions, opts ...storageOption) error {
  412. s := callSettings(c.settings, opts...)
  413. req := &storagepb.DeleteObjectRequest{
  414. Bucket: bucketResourceName(globalProjectAlias, bucket),
  415. Object: object,
  416. }
  417. if err := applyCondsProto("grpcStorageClient.DeleteObject", gen, conds, req); err != nil {
  418. return err
  419. }
  420. if s.userProject != "" {
  421. ctx = setUserProjectMetadata(ctx, s.userProject)
  422. }
  423. err := run(ctx, func(ctx context.Context) error {
  424. return c.raw.DeleteObject(ctx, req, s.gax...)
  425. }, s.retry, s.idempotent)
  426. if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
  427. return ErrObjectNotExist
  428. }
  429. return err
  430. }
  431. func (c *grpcStorageClient) GetObject(ctx context.Context, bucket, object string, gen int64, encryptionKey []byte, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) {
  432. s := callSettings(c.settings, opts...)
  433. req := &storagepb.GetObjectRequest{
  434. Bucket: bucketResourceName(globalProjectAlias, bucket),
  435. Object: object,
  436. // ProjectionFull by default.
  437. ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}},
  438. }
  439. if err := applyCondsProto("grpcStorageClient.GetObject", gen, conds, req); err != nil {
  440. return nil, err
  441. }
  442. if s.userProject != "" {
  443. ctx = setUserProjectMetadata(ctx, s.userProject)
  444. }
  445. if encryptionKey != nil {
  446. req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(encryptionKey)
  447. }
  448. var attrs *ObjectAttrs
  449. err := run(ctx, func(ctx context.Context) error {
  450. res, err := c.raw.GetObject(ctx, req, s.gax...)
  451. attrs = newObjectFromProto(res)
  452. return err
  453. }, s.retry, s.idempotent)
  454. if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
  455. return nil, ErrObjectNotExist
  456. }
  457. return attrs, err
  458. }
  459. func (c *grpcStorageClient) UpdateObject(ctx context.Context, params *updateObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
  460. uattrs := params.uattrs
  461. if params.overrideRetention != nil || uattrs.Retention != nil {
  462. // TO-DO: implement ObjectRetention once available - see b/308194853
  463. return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
  464. }
  465. s := callSettings(c.settings, opts...)
  466. o := uattrs.toProtoObject(bucketResourceName(globalProjectAlias, params.bucket), params.object)
  467. // For Update, generation is passed via the object message rather than a field on the request.
  468. if params.gen >= 0 {
  469. o.Generation = params.gen
  470. }
  471. req := &storagepb.UpdateObjectRequest{
  472. Object: o,
  473. PredefinedAcl: uattrs.PredefinedACL,
  474. }
  475. if err := applyCondsProto("grpcStorageClient.UpdateObject", defaultGen, params.conds, req); err != nil {
  476. return nil, err
  477. }
  478. if s.userProject != "" {
  479. ctx = setUserProjectMetadata(ctx, s.userProject)
  480. }
  481. if params.encryptionKey != nil {
  482. req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(params.encryptionKey)
  483. }
  484. fieldMask := &fieldmaskpb.FieldMask{Paths: nil}
  485. if uattrs.EventBasedHold != nil {
  486. fieldMask.Paths = append(fieldMask.Paths, "event_based_hold")
  487. }
  488. if uattrs.TemporaryHold != nil {
  489. fieldMask.Paths = append(fieldMask.Paths, "temporary_hold")
  490. }
  491. if uattrs.ContentType != nil {
  492. fieldMask.Paths = append(fieldMask.Paths, "content_type")
  493. }
  494. if uattrs.ContentLanguage != nil {
  495. fieldMask.Paths = append(fieldMask.Paths, "content_language")
  496. }
  497. if uattrs.ContentEncoding != nil {
  498. fieldMask.Paths = append(fieldMask.Paths, "content_encoding")
  499. }
  500. if uattrs.ContentDisposition != nil {
  501. fieldMask.Paths = append(fieldMask.Paths, "content_disposition")
  502. }
  503. if uattrs.CacheControl != nil {
  504. fieldMask.Paths = append(fieldMask.Paths, "cache_control")
  505. }
  506. if !uattrs.CustomTime.IsZero() {
  507. fieldMask.Paths = append(fieldMask.Paths, "custom_time")
  508. }
  509. // Note: This API currently does not support entites using project ID.
  510. // Use project numbers in ACL entities. Pending b/233617896.
  511. if uattrs.ACL != nil || len(uattrs.PredefinedACL) > 0 {
  512. fieldMask.Paths = append(fieldMask.Paths, "acl")
  513. }
  514. if uattrs.Metadata != nil {
  515. // We don't support deleting a specific metadata key; metadata is deleted
  516. // as a whole if provided an empty map, so we do not use dot notation here
  517. if len(uattrs.Metadata) == 0 {
  518. fieldMask.Paths = append(fieldMask.Paths, "metadata")
  519. } else {
  520. // We can, however, use dot notation for adding keys
  521. for key := range uattrs.Metadata {
  522. fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("metadata.%s", key))
  523. }
  524. }
  525. }
  526. req.UpdateMask = fieldMask
  527. var attrs *ObjectAttrs
  528. err := run(ctx, func(ctx context.Context) error {
  529. res, err := c.raw.UpdateObject(ctx, req, s.gax...)
  530. attrs = newObjectFromProto(res)
  531. return err
  532. }, s.retry, s.idempotent)
  533. if e, ok := status.FromError(err); ok && e.Code() == codes.NotFound {
  534. return nil, ErrObjectNotExist
  535. }
  536. return attrs, err
  537. }
  538. // Default Object ACL methods.
  539. func (c *grpcStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
  540. // There is no separate API for PATCH in gRPC.
  541. // Make a GET call first to retrieve BucketAttrs.
  542. attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
  543. if err != nil {
  544. return err
  545. }
  546. // Delete the entity and copy other remaining ACL entities.
  547. // Note: This API currently does not support entites using project ID.
  548. // Use project numbers in ACL entities. Pending b/233617896.
  549. // Return error if entity is not found or a project ID is used.
  550. invalidEntity := true
  551. var acl []ACLRule
  552. for _, a := range attrs.DefaultObjectACL {
  553. if a.Entity != entity {
  554. acl = append(acl, a)
  555. }
  556. if a.Entity == entity {
  557. invalidEntity = false
  558. }
  559. }
  560. if invalidEntity {
  561. return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.DefaultObjectACL, msgEntityNotSupported)
  562. }
  563. uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl}
  564. // Call UpdateBucket with a MetagenerationMatch precondition set.
  565. if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
  566. return err
  567. }
  568. return nil
  569. }
  570. func (c *grpcStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
  571. attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
  572. if err != nil {
  573. return nil, err
  574. }
  575. return attrs.DefaultObjectACL, nil
  576. }
  577. func (c *grpcStorageClient) UpdateDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
  578. // There is no separate API for PATCH in gRPC.
  579. // Make a GET call first to retrieve BucketAttrs.
  580. attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
  581. if err != nil {
  582. return err
  583. }
  584. // Note: This API currently does not support entites using project ID.
  585. // Use project numbers in ACL entities. Pending b/233617896.
  586. var acl []ACLRule
  587. aclRule := ACLRule{Entity: entity, Role: role}
  588. acl = append(attrs.DefaultObjectACL, aclRule)
  589. uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl}
  590. // Call UpdateBucket with a MetagenerationMatch precondition set.
  591. if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
  592. return err
  593. }
  594. return nil
  595. }
  596. // Bucket ACL methods.
  597. func (c *grpcStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
  598. // There is no separate API for PATCH in gRPC.
  599. // Make a GET call first to retrieve BucketAttrs.
  600. attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
  601. if err != nil {
  602. return err
  603. }
  604. // Delete the entity and copy other remaining ACL entities.
  605. // Note: This API currently does not support entites using project ID.
  606. // Use project numbers in ACL entities. Pending b/233617896.
  607. // Return error if entity is not found or a project ID is used.
  608. invalidEntity := true
  609. var acl []ACLRule
  610. for _, a := range attrs.ACL {
  611. if a.Entity != entity {
  612. acl = append(acl, a)
  613. }
  614. if a.Entity == entity {
  615. invalidEntity = false
  616. }
  617. }
  618. if invalidEntity {
  619. return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.ACL, msgEntityNotSupported)
  620. }
  621. uattrs := &BucketAttrsToUpdate{acl: acl}
  622. // Call UpdateBucket with a MetagenerationMatch precondition set.
  623. if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
  624. return err
  625. }
  626. return nil
  627. }
  628. func (c *grpcStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
  629. attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
  630. if err != nil {
  631. return nil, err
  632. }
  633. return attrs.ACL, nil
  634. }
  635. func (c *grpcStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
  636. // There is no separate API for PATCH in gRPC.
  637. // Make a GET call first to retrieve BucketAttrs.
  638. attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
  639. if err != nil {
  640. return err
  641. }
  642. // Note: This API currently does not support entites using project ID.
  643. // Use project numbers in ACL entities. Pending b/233617896.
  644. var acl []ACLRule
  645. aclRule := ACLRule{Entity: entity, Role: role}
  646. acl = append(attrs.ACL, aclRule)
  647. uattrs := &BucketAttrsToUpdate{acl: acl}
  648. // Call UpdateBucket with a MetagenerationMatch precondition set.
  649. if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
  650. return err
  651. }
  652. return nil
  653. }
  654. // Object ACL methods.
  655. func (c *grpcStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error {
  656. // There is no separate API for PATCH in gRPC.
  657. // Make a GET call first to retrieve ObjectAttrs.
  658. attrs, err := c.GetObject(ctx, bucket, object, defaultGen, nil, nil, opts...)
  659. if err != nil {
  660. return err
  661. }
  662. // Delete the entity and copy other remaining ACL entities.
  663. // Note: This API currently does not support entites using project ID.
  664. // Use project numbers in ACL entities. Pending b/233617896.
  665. // Return error if entity is not found or a project ID is used.
  666. invalidEntity := true
  667. var acl []ACLRule
  668. for _, a := range attrs.ACL {
  669. if a.Entity != entity {
  670. acl = append(acl, a)
  671. }
  672. if a.Entity == entity {
  673. invalidEntity = false
  674. }
  675. }
  676. if invalidEntity {
  677. return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.ACL, msgEntityNotSupported)
  678. }
  679. uattrs := &ObjectAttrsToUpdate{ACL: acl}
  680. // Call UpdateObject with the specified metageneration.
  681. params := &updateObjectParams{bucket: bucket, object: object, uattrs: uattrs, gen: defaultGen, conds: &Conditions{MetagenerationMatch: attrs.Metageneration}}
  682. if _, err = c.UpdateObject(ctx, params, opts...); err != nil {
  683. return err
  684. }
  685. return nil
  686. }
  687. // ListObjectACLs retrieves object ACL entries. By default, it operates on the latest generation of this object.
  688. // Selecting a specific generation of this object is not currently supported by the client.
  689. func (c *grpcStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) {
  690. o, err := c.GetObject(ctx, bucket, object, defaultGen, nil, nil, opts...)
  691. if err != nil {
  692. return nil, err
  693. }
  694. return o.ACL, nil
  695. }
  696. func (c *grpcStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
  697. // There is no separate API for PATCH in gRPC.
  698. // Make a GET call first to retrieve ObjectAttrs.
  699. attrs, err := c.GetObject(ctx, bucket, object, defaultGen, nil, nil, opts...)
  700. if err != nil {
  701. return err
  702. }
  703. // Note: This API currently does not support entites using project ID.
  704. // Use project numbers in ACL entities. Pending b/233617896.
  705. var acl []ACLRule
  706. aclRule := ACLRule{Entity: entity, Role: role}
  707. acl = append(attrs.ACL, aclRule)
  708. uattrs := &ObjectAttrsToUpdate{ACL: acl}
  709. // Call UpdateObject with the specified metageneration.
  710. params := &updateObjectParams{bucket: bucket, object: object, uattrs: uattrs, gen: defaultGen, conds: &Conditions{MetagenerationMatch: attrs.Metageneration}}
  711. if _, err = c.UpdateObject(ctx, params, opts...); err != nil {
  712. return err
  713. }
  714. return nil
  715. }
  716. // Media operations.
  717. func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) {
  718. s := callSettings(c.settings, opts...)
  719. if s.userProject != "" {
  720. ctx = setUserProjectMetadata(ctx, s.userProject)
  721. }
  722. dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket)
  723. dstObjPb.Name = req.dstObject.name
  724. if req.sendCRC32C {
  725. dstObjPb.Checksums.Crc32C = &req.dstObject.attrs.CRC32C
  726. }
  727. srcs := []*storagepb.ComposeObjectRequest_SourceObject{}
  728. for _, src := range req.srcs {
  729. srcObjPb := &storagepb.ComposeObjectRequest_SourceObject{Name: src.name, ObjectPreconditions: &storagepb.ComposeObjectRequest_SourceObject_ObjectPreconditions{}}
  730. if src.gen >= 0 {
  731. srcObjPb.Generation = src.gen
  732. }
  733. if err := applyCondsProto("ComposeObject source", defaultGen, src.conds, srcObjPb.ObjectPreconditions); err != nil {
  734. return nil, err
  735. }
  736. srcs = append(srcs, srcObjPb)
  737. }
  738. rawReq := &storagepb.ComposeObjectRequest{
  739. Destination: dstObjPb,
  740. SourceObjects: srcs,
  741. }
  742. if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, rawReq); err != nil {
  743. return nil, err
  744. }
  745. if req.predefinedACL != "" {
  746. rawReq.DestinationPredefinedAcl = req.predefinedACL
  747. }
  748. if req.dstObject.encryptionKey != nil {
  749. rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
  750. }
  751. var obj *storagepb.Object
  752. var err error
  753. if err := run(ctx, func(ctx context.Context) error {
  754. obj, err = c.raw.ComposeObject(ctx, rawReq, s.gax...)
  755. return err
  756. }, s.retry, s.idempotent); err != nil {
  757. return nil, err
  758. }
  759. return newObjectFromProto(obj), nil
  760. }
  761. func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
  762. s := callSettings(c.settings, opts...)
  763. obj := req.dstObject.attrs.toProtoObject("")
  764. call := &storagepb.RewriteObjectRequest{
  765. SourceBucket: bucketResourceName(globalProjectAlias, req.srcObject.bucket),
  766. SourceObject: req.srcObject.name,
  767. RewriteToken: req.token,
  768. DestinationBucket: bucketResourceName(globalProjectAlias, req.dstObject.bucket),
  769. DestinationName: req.dstObject.name,
  770. Destination: obj,
  771. DestinationKmsKey: req.dstObject.keyName,
  772. DestinationPredefinedAcl: req.predefinedACL,
  773. CommonObjectRequestParams: toProtoCommonObjectRequestParams(req.dstObject.encryptionKey),
  774. }
  775. // The userProject, whether source or destination project, is decided by the code calling the interface.
  776. if s.userProject != "" {
  777. ctx = setUserProjectMetadata(ctx, s.userProject)
  778. }
  779. if err := applyCondsProto("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
  780. return nil, err
  781. }
  782. if err := applySourceCondsProto(req.srcObject.gen, req.srcObject.conds, call); err != nil {
  783. return nil, err
  784. }
  785. if len(req.dstObject.encryptionKey) > 0 {
  786. call.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
  787. }
  788. if len(req.srcObject.encryptionKey) > 0 {
  789. srcParams := toProtoCommonObjectRequestParams(req.srcObject.encryptionKey)
  790. call.CopySourceEncryptionAlgorithm = srcParams.GetEncryptionAlgorithm()
  791. call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes()
  792. call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes()
  793. }
  794. call.MaxBytesRewrittenPerCall = req.maxBytesRewrittenPerCall
  795. var res *storagepb.RewriteResponse
  796. var err error
  797. retryCall := func(ctx context.Context) error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err }
  798. if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
  799. return nil, err
  800. }
  801. r := &rewriteObjectResponse{
  802. done: res.GetDone(),
  803. written: res.GetTotalBytesRewritten(),
  804. size: res.GetObjectSize(),
  805. token: res.GetRewriteToken(),
  806. resource: newObjectFromProto(res.GetResource()),
  807. }
  808. return r, nil
  809. }
  810. func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
  811. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader")
  812. defer func() { trace.EndSpan(ctx, err) }()
  813. s := callSettings(c.settings, opts...)
  814. if s.userProject != "" {
  815. ctx = setUserProjectMetadata(ctx, s.userProject)
  816. }
  817. b := bucketResourceName(globalProjectAlias, params.bucket)
  818. req := &storagepb.ReadObjectRequest{
  819. Bucket: b,
  820. Object: params.object,
  821. CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey),
  822. }
  823. // The default is a negative value, which means latest.
  824. if params.gen >= 0 {
  825. req.Generation = params.gen
  826. }
  827. // Define a function that initiates a Read with offset and length, assuming
  828. // we have already read seen bytes.
  829. reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
  830. // If the context has already expired, return immediately without making
  831. // we call.
  832. if err := ctx.Err(); err != nil {
  833. return nil, nil, err
  834. }
  835. cc, cancel := context.WithCancel(ctx)
  836. req.ReadOffset = params.offset + seen
  837. // Only set a ReadLimit if length is greater than zero, because <= 0 means
  838. // to read it all.
  839. if params.length > 0 {
  840. req.ReadLimit = params.length - seen
  841. }
  842. if err := applyCondsProto("gRPCReader.reopen", params.gen, params.conds, req); err != nil {
  843. cancel()
  844. return nil, nil, err
  845. }
  846. var stream storagepb.Storage_ReadObjectClient
  847. var msg *storagepb.ReadObjectResponse
  848. var err error
  849. err = run(cc, func(ctx context.Context) error {
  850. stream, err = c.raw.ReadObject(cc, req, s.gax...)
  851. if err != nil {
  852. return err
  853. }
  854. msg, err = stream.Recv()
  855. // These types of errors show up on the Recv call, rather than the
  856. // initialization of the stream via ReadObject above.
  857. if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
  858. return ErrObjectNotExist
  859. }
  860. return err
  861. }, s.retry, s.idempotent)
  862. if err != nil {
  863. // Close the stream context we just created to ensure we don't leak
  864. // resources.
  865. cancel()
  866. return nil, nil, err
  867. }
  868. return &readStreamResponse{stream, msg}, cancel, nil
  869. }
  870. res, cancel, err := reopen(0)
  871. if err != nil {
  872. return nil, err
  873. }
  874. // The first message was Recv'd on stream open, use it to populate the
  875. // object metadata.
  876. msg := res.response
  877. obj := msg.GetMetadata()
  878. // This is the size of the entire object, even if only a range was requested.
  879. size := obj.GetSize()
  880. r = &Reader{
  881. Attrs: ReaderObjectAttrs{
  882. Size: size,
  883. ContentType: obj.GetContentType(),
  884. ContentEncoding: obj.GetContentEncoding(),
  885. CacheControl: obj.GetCacheControl(),
  886. LastModified: obj.GetUpdateTime().AsTime(),
  887. Metageneration: obj.GetMetageneration(),
  888. Generation: obj.GetGeneration(),
  889. },
  890. reader: &gRPCReader{
  891. stream: res.stream,
  892. reopen: reopen,
  893. cancel: cancel,
  894. size: size,
  895. // Store the content from the first Recv in the
  896. // client buffer for reading later.
  897. leftovers: msg.GetChecksummedData().GetContent(),
  898. settings: s,
  899. zeroRange: params.length == 0,
  900. },
  901. }
  902. cr := msg.GetContentRange()
  903. if cr != nil {
  904. r.Attrs.StartOffset = cr.GetStart()
  905. r.remain = cr.GetEnd() - cr.GetStart()
  906. } else {
  907. r.remain = size
  908. }
  909. // For a zero-length request, explicitly close the stream and set remaining
  910. // bytes to zero.
  911. if params.length == 0 {
  912. r.remain = 0
  913. r.reader.Close()
  914. }
  915. // Only support checksums when reading an entire object, not a range.
  916. if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
  917. r.wantCRC = checksums.GetCrc32C()
  918. r.checkCRC = true
  919. }
  920. return r, nil
  921. }
  922. func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
  923. s := callSettings(c.settings, opts...)
  924. var offset int64
  925. errorf := params.setError
  926. progress := params.progress
  927. setObj := params.setObj
  928. pr, pw := io.Pipe()
  929. gw := newGRPCWriter(c, params, pr)
  930. gw.settings = s
  931. if s.userProject != "" {
  932. gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject)
  933. }
  934. // This function reads the data sent to the pipe and sends sets of messages
  935. // on the gRPC client-stream as the buffer is filled.
  936. go func() {
  937. defer close(params.donec)
  938. // Loop until there is an error or the Object has been finalized.
  939. for {
  940. // Note: This blocks until either the buffer is full or EOF is read.
  941. recvd, doneReading, err := gw.read()
  942. if err != nil {
  943. err = checkCanceled(err)
  944. errorf(err)
  945. pr.CloseWithError(err)
  946. return
  947. }
  948. if params.attrs.Retention != nil {
  949. // TO-DO: remove once ObjectRetention is available - see b/308194853
  950. err = status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
  951. errorf(err)
  952. pr.CloseWithError(err)
  953. return
  954. }
  955. // The chunk buffer is full, but there is no end in sight. This
  956. // means that either:
  957. // 1. A resumable upload will need to be used to send
  958. // multiple chunks, until we are done reading data. Start a
  959. // resumable upload if it has not already been started.
  960. // 2. ChunkSize of zero may also have a full buffer, but a resumable
  961. // session should not be initiated in this case.
  962. if !doneReading && gw.upid == "" && params.chunkSize != 0 {
  963. err = gw.startResumableUpload()
  964. if err != nil {
  965. err = checkCanceled(err)
  966. errorf(err)
  967. pr.CloseWithError(err)
  968. return
  969. }
  970. }
  971. o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
  972. if err != nil {
  973. err = checkCanceled(err)
  974. errorf(err)
  975. pr.CloseWithError(err)
  976. return
  977. }
  978. // At this point, the current buffer has been uploaded. For resumable
  979. // uploads and chunkSize = 0, capture the committed offset here in case
  980. // the upload was not finalized and another chunk is to be uploaded. Call
  981. // the progress function for resumable uploads only.
  982. if gw.upid != "" || gw.chunkSize == 0 {
  983. offset = off
  984. }
  985. if gw.upid != "" {
  986. progress(offset)
  987. }
  988. // When we are done reading data without errors, set the object and
  989. // finish.
  990. if doneReading {
  991. // Build Object from server's response.
  992. setObj(newObjectFromProto(o))
  993. return
  994. }
  995. }
  996. }()
  997. return pw, nil
  998. }
  999. // IAM methods.
  1000. func (c *grpcStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) {
  1001. // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
  1002. s := callSettings(c.settings, opts...)
  1003. req := &iampb.GetIamPolicyRequest{
  1004. Resource: bucketResourceName(globalProjectAlias, resource),
  1005. Options: &iampb.GetPolicyOptions{
  1006. RequestedPolicyVersion: version,
  1007. },
  1008. }
  1009. var rp *iampb.Policy
  1010. err := run(ctx, func(ctx context.Context) error {
  1011. var err error
  1012. rp, err = c.raw.GetIamPolicy(ctx, req, s.gax...)
  1013. return err
  1014. }, s.retry, s.idempotent)
  1015. return rp, err
  1016. }
  1017. func (c *grpcStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error {
  1018. // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
  1019. s := callSettings(c.settings, opts...)
  1020. req := &iampb.SetIamPolicyRequest{
  1021. Resource: bucketResourceName(globalProjectAlias, resource),
  1022. Policy: policy,
  1023. }
  1024. return run(ctx, func(ctx context.Context) error {
  1025. _, err := c.raw.SetIamPolicy(ctx, req, s.gax...)
  1026. return err
  1027. }, s.retry, s.idempotent)
  1028. }
  1029. func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) {
  1030. // TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
  1031. s := callSettings(c.settings, opts...)
  1032. req := &iampb.TestIamPermissionsRequest{
  1033. Resource: bucketResourceName(globalProjectAlias, resource),
  1034. Permissions: permissions,
  1035. }
  1036. var res *iampb.TestIamPermissionsResponse
  1037. err := run(ctx, func(ctx context.Context) error {
  1038. var err error
  1039. res, err = c.raw.TestIamPermissions(ctx, req, s.gax...)
  1040. return err
  1041. }, s.retry, s.idempotent)
  1042. if err != nil {
  1043. return nil, err
  1044. }
  1045. return res.Permissions, nil
  1046. }
  1047. // HMAC Key methods.
  1048. func (c *grpcStorageClient) GetHMACKey(ctx context.Context, project, accessID string, opts ...storageOption) (*HMACKey, error) {
  1049. s := callSettings(c.settings, opts...)
  1050. req := &storagepb.GetHmacKeyRequest{
  1051. AccessId: accessID,
  1052. Project: toProjectResource(project),
  1053. }
  1054. if s.userProject != "" {
  1055. ctx = setUserProjectMetadata(ctx, s.userProject)
  1056. }
  1057. var metadata *storagepb.HmacKeyMetadata
  1058. err := run(ctx, func(ctx context.Context) error {
  1059. var err error
  1060. metadata, err = c.raw.GetHmacKey(ctx, req, s.gax...)
  1061. return err
  1062. }, s.retry, s.idempotent)
  1063. if err != nil {
  1064. return nil, err
  1065. }
  1066. return toHMACKeyFromProto(metadata), nil
  1067. }
  1068. func (c *grpcStorageClient) ListHMACKeys(ctx context.Context, project, serviceAccountEmail string, showDeletedKeys bool, opts ...storageOption) *HMACKeysIterator {
  1069. s := callSettings(c.settings, opts...)
  1070. req := &storagepb.ListHmacKeysRequest{
  1071. Project: toProjectResource(project),
  1072. ServiceAccountEmail: serviceAccountEmail,
  1073. ShowDeletedKeys: showDeletedKeys,
  1074. }
  1075. if s.userProject != "" {
  1076. ctx = setUserProjectMetadata(ctx, s.userProject)
  1077. }
  1078. it := &HMACKeysIterator{
  1079. ctx: ctx,
  1080. projectID: project,
  1081. retry: s.retry,
  1082. }
  1083. fetch := func(pageSize int, pageToken string) (token string, err error) {
  1084. var hmacKeys []*storagepb.HmacKeyMetadata
  1085. err = run(it.ctx, func(ctx context.Context) error {
  1086. gitr := c.raw.ListHmacKeys(ctx, req, s.gax...)
  1087. hmacKeys, token, err = gitr.InternalFetch(pageSize, pageToken)
  1088. return err
  1089. }, s.retry, s.idempotent)
  1090. if err != nil {
  1091. return "", err
  1092. }
  1093. for _, hkmd := range hmacKeys {
  1094. hk := toHMACKeyFromProto(hkmd)
  1095. it.hmacKeys = append(it.hmacKeys, hk)
  1096. }
  1097. return token, nil
  1098. }
  1099. it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  1100. fetch,
  1101. func() int { return len(it.hmacKeys) - it.index },
  1102. func() interface{} {
  1103. prev := it.hmacKeys
  1104. it.hmacKeys = it.hmacKeys[:0]
  1105. it.index = 0
  1106. return prev
  1107. })
  1108. return it
  1109. }
  1110. func (c *grpcStorageClient) UpdateHMACKey(ctx context.Context, project, serviceAccountEmail, accessID string, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) {
  1111. s := callSettings(c.settings, opts...)
  1112. hk := &storagepb.HmacKeyMetadata{
  1113. AccessId: accessID,
  1114. Project: toProjectResource(project),
  1115. ServiceAccountEmail: serviceAccountEmail,
  1116. State: string(attrs.State),
  1117. Etag: attrs.Etag,
  1118. }
  1119. var paths []string
  1120. fieldMask := &fieldmaskpb.FieldMask{
  1121. Paths: paths,
  1122. }
  1123. if attrs.State != "" {
  1124. fieldMask.Paths = append(fieldMask.Paths, "state")
  1125. }
  1126. req := &storagepb.UpdateHmacKeyRequest{
  1127. HmacKey: hk,
  1128. UpdateMask: fieldMask,
  1129. }
  1130. if s.userProject != "" {
  1131. ctx = setUserProjectMetadata(ctx, s.userProject)
  1132. }
  1133. var metadata *storagepb.HmacKeyMetadata
  1134. err := run(ctx, func(ctx context.Context) error {
  1135. var err error
  1136. metadata, err = c.raw.UpdateHmacKey(ctx, req, s.gax...)
  1137. return err
  1138. }, s.retry, s.idempotent)
  1139. if err != nil {
  1140. return nil, err
  1141. }
  1142. return toHMACKeyFromProto(metadata), nil
  1143. }
  1144. func (c *grpcStorageClient) CreateHMACKey(ctx context.Context, project, serviceAccountEmail string, opts ...storageOption) (*HMACKey, error) {
  1145. s := callSettings(c.settings, opts...)
  1146. req := &storagepb.CreateHmacKeyRequest{
  1147. Project: toProjectResource(project),
  1148. ServiceAccountEmail: serviceAccountEmail,
  1149. }
  1150. if s.userProject != "" {
  1151. ctx = setUserProjectMetadata(ctx, s.userProject)
  1152. }
  1153. var res *storagepb.CreateHmacKeyResponse
  1154. err := run(ctx, func(ctx context.Context) error {
  1155. var err error
  1156. res, err = c.raw.CreateHmacKey(ctx, req, s.gax...)
  1157. return err
  1158. }, s.retry, s.idempotent)
  1159. if err != nil {
  1160. return nil, err
  1161. }
  1162. key := toHMACKeyFromProto(res.Metadata)
  1163. key.Secret = base64.StdEncoding.EncodeToString(res.SecretKeyBytes)
  1164. return key, nil
  1165. }
  1166. func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, project string, accessID string, opts ...storageOption) error {
  1167. s := callSettings(c.settings, opts...)
  1168. req := &storagepb.DeleteHmacKeyRequest{
  1169. AccessId: accessID,
  1170. Project: toProjectResource(project),
  1171. }
  1172. if s.userProject != "" {
  1173. ctx = setUserProjectMetadata(ctx, s.userProject)
  1174. }
  1175. return run(ctx, func(ctx context.Context) error {
  1176. return c.raw.DeleteHmacKey(ctx, req, s.gax...)
  1177. }, s.retry, s.idempotent)
  1178. }
  1179. // Notification methods.
  1180. func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
  1181. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.ListNotifications")
  1182. defer func() { trace.EndSpan(ctx, err) }()
  1183. s := callSettings(c.settings, opts...)
  1184. if s.userProject != "" {
  1185. ctx = setUserProjectMetadata(ctx, s.userProject)
  1186. }
  1187. req := &storagepb.ListNotificationConfigsRequest{
  1188. Parent: bucketResourceName(globalProjectAlias, bucket),
  1189. }
  1190. var notifications []*storagepb.NotificationConfig
  1191. err = run(ctx, func(ctx context.Context) error {
  1192. gitr := c.raw.ListNotificationConfigs(ctx, req, s.gax...)
  1193. for {
  1194. // PageSize is not set and fallbacks to the API default pageSize of 100.
  1195. items, nextPageToken, err := gitr.InternalFetch(int(req.GetPageSize()), req.GetPageToken())
  1196. if err != nil {
  1197. return err
  1198. }
  1199. notifications = append(notifications, items...)
  1200. // If there are no more results, nextPageToken is empty and err is nil.
  1201. if nextPageToken == "" {
  1202. return err
  1203. }
  1204. req.PageToken = nextPageToken
  1205. }
  1206. }, s.retry, s.idempotent)
  1207. if err != nil {
  1208. return nil, err
  1209. }
  1210. return notificationsToMapFromProto(notifications), nil
  1211. }
  1212. func (c *grpcStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
  1213. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.CreateNotification")
  1214. defer func() { trace.EndSpan(ctx, err) }()
  1215. s := callSettings(c.settings, opts...)
  1216. req := &storagepb.CreateNotificationConfigRequest{
  1217. Parent: bucketResourceName(globalProjectAlias, bucket),
  1218. NotificationConfig: toProtoNotification(n),
  1219. }
  1220. var pbn *storagepb.NotificationConfig
  1221. err = run(ctx, func(ctx context.Context) error {
  1222. var err error
  1223. pbn, err = c.raw.CreateNotificationConfig(ctx, req, s.gax...)
  1224. return err
  1225. }, s.retry, s.idempotent)
  1226. if err != nil {
  1227. return nil, err
  1228. }
  1229. return toNotificationFromProto(pbn), err
  1230. }
  1231. func (c *grpcStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
  1232. ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.DeleteNotification")
  1233. defer func() { trace.EndSpan(ctx, err) }()
  1234. s := callSettings(c.settings, opts...)
  1235. req := &storagepb.DeleteNotificationConfigRequest{Name: id}
  1236. return run(ctx, func(ctx context.Context) error {
  1237. return c.raw.DeleteNotificationConfig(ctx, req, s.gax...)
  1238. }, s.retry, s.idempotent)
  1239. }
  1240. // setUserProjectMetadata appends a project ID to the outgoing Context metadata
  1241. // via the x-goog-user-project system parameter defined at
  1242. // https://cloud.google.com/apis/docs/system-parameters. This is only for
  1243. // billing purposes, and is generally optional, except for requester-pays
  1244. // buckets.
  1245. func setUserProjectMetadata(ctx context.Context, project string) context.Context {
  1246. return metadata.AppendToOutgoingContext(ctx, "x-goog-user-project", project)
  1247. }
  1248. type readStreamResponse struct {
  1249. stream storagepb.Storage_ReadObjectClient
  1250. response *storagepb.ReadObjectResponse
  1251. }
  1252. type gRPCReader struct {
  1253. seen, size int64
  1254. zeroRange bool
  1255. stream storagepb.Storage_ReadObjectClient
  1256. reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
  1257. leftovers []byte
  1258. cancel context.CancelFunc
  1259. settings *settings
  1260. }
  1261. // Read reads bytes into the user's buffer from an open gRPC stream.
  1262. func (r *gRPCReader) Read(p []byte) (int, error) {
  1263. // The entire object has been read by this reader, return EOF.
  1264. if r.size == r.seen || r.zeroRange {
  1265. return 0, io.EOF
  1266. }
  1267. // No stream to read from, either never initialized or Close was called.
  1268. // Note: There is a potential concurrency issue if multiple routines are
  1269. // using the same reader. One encounters an error and the stream is closed
  1270. // and then reopened while the other routine attempts to read from it.
  1271. if r.stream == nil {
  1272. return 0, fmt.Errorf("reader has been closed")
  1273. }
  1274. var n int
  1275. // Read leftovers and return what was available to conform to the Reader
  1276. // interface: https://pkg.go.dev/io#Reader.
  1277. if len(r.leftovers) > 0 {
  1278. n = copy(p, r.leftovers)
  1279. r.seen += int64(n)
  1280. r.leftovers = r.leftovers[n:]
  1281. return n, nil
  1282. }
  1283. // Attempt to Recv the next message on the stream.
  1284. msg, err := r.recv()
  1285. if err != nil {
  1286. return 0, err
  1287. }
  1288. // TODO: Determine if we need to capture incremental CRC32C for this
  1289. // chunk. The Object CRC32C checksum is captured when directed to read
  1290. // the entire Object. If directed to read a range, we may need to
  1291. // calculate the range's checksum for verification if the checksum is
  1292. // present in the response here.
  1293. // TODO: Figure out if we need to support decompressive transcoding
  1294. // https://cloud.google.com/storage/docs/transcoding.
  1295. content := msg.GetChecksummedData().GetContent()
  1296. n = copy(p[n:], content)
  1297. leftover := len(content) - n
  1298. if leftover > 0 {
  1299. // Wasn't able to copy all of the data in the message, store for
  1300. // future Read calls.
  1301. r.leftovers = content[n:]
  1302. }
  1303. r.seen += int64(n)
  1304. return n, nil
  1305. }
  1306. // Close cancels the read stream's context in order for it to be closed and
  1307. // collected.
  1308. func (r *gRPCReader) Close() error {
  1309. if r.cancel != nil {
  1310. r.cancel()
  1311. }
  1312. r.stream = nil
  1313. return nil
  1314. }
  1315. // recv attempts to Recv the next message on the stream. In the event
  1316. // that a retryable error is encountered, the stream will be closed, reopened,
  1317. // and Recv again. This will attempt to Recv until one of the following is true:
  1318. //
  1319. // * Recv is successful
  1320. // * A non-retryable error is encountered
  1321. // * The Reader's context is canceled
  1322. //
  1323. // The last error received is the one that is returned, which could be from
  1324. // an attempt to reopen the stream.
  1325. func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
  1326. msg, err := r.stream.Recv()
  1327. var shouldRetry = ShouldRetry
  1328. if r.settings.retry != nil && r.settings.retry.shouldRetry != nil {
  1329. shouldRetry = r.settings.retry.shouldRetry
  1330. }
  1331. if err != nil && shouldRetry(err) {
  1332. // This will "close" the existing stream and immediately attempt to
  1333. // reopen the stream, but will backoff if further attempts are necessary.
  1334. // Reopening the stream Recvs the first message, so if retrying is
  1335. // successful, the next logical chunk will be returned.
  1336. msg, err = r.reopenStream()
  1337. }
  1338. return msg, err
  1339. }
  1340. // reopenStream "closes" the existing stream and attempts to reopen a stream and
  1341. // sets the Reader's stream and cancelStream properties in the process.
  1342. func (r *gRPCReader) reopenStream() (*storagepb.ReadObjectResponse, error) {
  1343. // Close existing stream and initialize new stream with updated offset.
  1344. r.Close()
  1345. res, cancel, err := r.reopen(r.seen)
  1346. if err != nil {
  1347. return nil, err
  1348. }
  1349. r.stream = res.stream
  1350. r.cancel = cancel
  1351. return res.response, nil
  1352. }
  1353. func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) *gRPCWriter {
  1354. size := params.chunkSize
  1355. // Round up chunksize to nearest 256KiB
  1356. if size%googleapi.MinUploadChunkSize != 0 {
  1357. size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize)
  1358. }
  1359. // A completely bufferless upload is not possible as it is in JSON because
  1360. // the buffer must be provided to the message. However use the minimum size
  1361. // possible in this case.
  1362. if params.chunkSize == 0 {
  1363. size = googleapi.MinUploadChunkSize
  1364. }
  1365. return &gRPCWriter{
  1366. buf: make([]byte, size),
  1367. c: c,
  1368. ctx: params.ctx,
  1369. reader: r,
  1370. bucket: params.bucket,
  1371. attrs: params.attrs,
  1372. conds: params.conds,
  1373. encryptionKey: params.encryptionKey,
  1374. sendCRC32C: params.sendCRC32C,
  1375. chunkSize: params.chunkSize,
  1376. forceEmptyContentType: params.forceEmptyContentType,
  1377. }
  1378. }
  1379. // gRPCWriter is a wrapper around the the gRPC client-stream API that manages
  1380. // sending chunks of data provided by the user over the stream.
  1381. type gRPCWriter struct {
  1382. c *grpcStorageClient
  1383. buf []byte
  1384. reader io.Reader
  1385. ctx context.Context
  1386. bucket string
  1387. attrs *ObjectAttrs
  1388. conds *Conditions
  1389. encryptionKey []byte
  1390. settings *settings
  1391. sendCRC32C bool
  1392. chunkSize int
  1393. forceEmptyContentType bool
  1394. // The gRPC client-stream used for sending buffers.
  1395. stream storagepb.Storage_BidiWriteObjectClient
  1396. // The Resumable Upload ID started by a gRPC-based Writer.
  1397. upid string
  1398. }
  1399. // startResumableUpload initializes a Resumable Upload with gRPC and sets the
  1400. // upload ID on the Writer.
  1401. func (w *gRPCWriter) startResumableUpload() error {
  1402. spec, err := w.writeObjectSpec()
  1403. if err != nil {
  1404. return err
  1405. }
  1406. req := &storagepb.StartResumableWriteRequest{
  1407. WriteObjectSpec: spec,
  1408. CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
  1409. }
  1410. // TODO: Currently the checksums are only sent on the request to initialize
  1411. // the upload, but in the future, we must also support sending it
  1412. // on the *last* message of the stream.
  1413. req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
  1414. return run(w.ctx, func(ctx context.Context) error {
  1415. upres, err := w.c.raw.StartResumableWrite(w.ctx, req)
  1416. w.upid = upres.GetUploadId()
  1417. return err
  1418. }, w.settings.retry, w.settings.idempotent)
  1419. }
  1420. // queryProgress is a helper that queries the status of the resumable upload
  1421. // associated with the given upload ID.
  1422. func (w *gRPCWriter) queryProgress() (int64, error) {
  1423. var persistedSize int64
  1424. err := run(w.ctx, func(ctx context.Context) error {
  1425. q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{
  1426. UploadId: w.upid,
  1427. })
  1428. persistedSize = q.GetPersistedSize()
  1429. return err
  1430. }, w.settings.retry, true)
  1431. // q.GetCommittedSize() will return 0 if q is nil.
  1432. return persistedSize, err
  1433. }
  1434. // uploadBuffer uploads the buffer at the given offset using a bi-directional
  1435. // Write stream. It will open a new stream if necessary (on the first call or
  1436. // after resuming from failure). The resulting write offset after uploading the
  1437. // buffer is returned, as well as well as the final Object if the upload is
  1438. // completed.
  1439. //
  1440. // Returns object, persisted size, and any error that is not retriable.
  1441. func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
  1442. var shouldRetry = ShouldRetry
  1443. if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
  1444. shouldRetry = w.settings.retry.shouldRetry
  1445. }
  1446. var err error
  1447. var lastWriteOfEntireObject bool
  1448. sent := 0
  1449. writeOffset := start
  1450. toWrite := w.buf[:recvd]
  1451. // Send a request with as many bytes as possible.
  1452. // Loop until all bytes are sent.
  1453. for {
  1454. bytesNotYetSent := recvd - sent
  1455. remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize
  1456. if remainingDataFitsInSingleReq && doneReading {
  1457. lastWriteOfEntireObject = true
  1458. }
  1459. // Send the maximum amount of bytes we can, unless we don't have that many.
  1460. bytesToSendInCurrReq := maxPerMessageWriteSize
  1461. if remainingDataFitsInSingleReq {
  1462. bytesToSendInCurrReq = bytesNotYetSent
  1463. }
  1464. // Prepare chunk section for upload.
  1465. data := toWrite[sent : sent+bytesToSendInCurrReq]
  1466. req := &storagepb.BidiWriteObjectRequest{
  1467. Data: &storagepb.BidiWriteObjectRequest_ChecksummedData{
  1468. ChecksummedData: &storagepb.ChecksummedData{
  1469. Content: data,
  1470. },
  1471. },
  1472. WriteOffset: writeOffset,
  1473. FinishWrite: lastWriteOfEntireObject,
  1474. Flush: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
  1475. StateLookup: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
  1476. }
  1477. // Open a new stream if necessary and set the first_message field on
  1478. // the request. The first message on the WriteObject stream must either
  1479. // be the Object or the Resumable Upload ID.
  1480. if w.stream == nil {
  1481. hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
  1482. ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
  1483. w.stream, err = w.c.raw.BidiWriteObject(ctx)
  1484. if err != nil {
  1485. return nil, 0, err
  1486. }
  1487. if w.upid != "" { // resumable upload
  1488. req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: w.upid}
  1489. } else { // non-resumable
  1490. spec, err := w.writeObjectSpec()
  1491. if err != nil {
  1492. return nil, 0, err
  1493. }
  1494. req.FirstMessage = &storagepb.BidiWriteObjectRequest_WriteObjectSpec{
  1495. WriteObjectSpec: spec,
  1496. }
  1497. req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
  1498. // For a non-resumable upload, checksums must be sent in this message.
  1499. // TODO: Currently the checksums are only sent on the first message
  1500. // of the stream, but in the future, we must also support sending it
  1501. // on the *last* message of the stream (instead of the first).
  1502. req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
  1503. }
  1504. }
  1505. err = w.stream.Send(req)
  1506. if err == io.EOF {
  1507. // err was io.EOF. The client-side of a stream only gets an EOF on Send
  1508. // when the backend closes the stream and wants to return an error
  1509. // status.
  1510. // Receive from the stream Recv() until it returns a non-nil error
  1511. // to receive the server's status as an error. We may get multiple
  1512. // messages before the error due to buffering.
  1513. err = nil
  1514. for err == nil {
  1515. _, err = w.stream.Recv()
  1516. }
  1517. // Drop the stream reference as a new one will need to be created if
  1518. // we retry.
  1519. w.stream = nil
  1520. // Drop the stream reference as a new one will need to be created if
  1521. // we can retry the upload
  1522. w.stream = nil
  1523. // Retriable errors mean we should start over and attempt to
  1524. // resend the entire buffer via a new stream.
  1525. // If not retriable, falling through will return the error received.
  1526. if shouldRetry(err) {
  1527. // TODO: Add test case for failure modes of querying progress.
  1528. writeOffset, err = w.determineOffset(start)
  1529. if err != nil {
  1530. return nil, 0, err
  1531. }
  1532. sent = int(writeOffset) - int(start)
  1533. // Continue sending requests, opening a new stream and resending
  1534. // any bytes not yet persisted as per QueryWriteStatus
  1535. continue
  1536. }
  1537. }
  1538. if err != nil {
  1539. return nil, 0, err
  1540. }
  1541. // Update the immediate stream's sent total and the upload offset with
  1542. // the data sent.
  1543. sent += len(data)
  1544. writeOffset += int64(len(data))
  1545. // Not done sending data, do not attempt to commit it yet, loop around
  1546. // and send more data.
  1547. if recvd-sent > 0 {
  1548. continue
  1549. }
  1550. // The buffer has been uploaded and there is still more data to be
  1551. // uploaded, but this is not a resumable upload session. Therefore,
  1552. // don't check persisted data.
  1553. if !lastWriteOfEntireObject && w.chunkSize == 0 {
  1554. return nil, writeOffset, nil
  1555. }
  1556. // Done sending the data in the buffer (remainingDataFitsInSingleReq
  1557. // should == true if we reach this code).
  1558. // If we are done sending the whole object, close the stream and get the final
  1559. // object. Otherwise, receive from the stream to confirm the persisted data.
  1560. if !lastWriteOfEntireObject {
  1561. resp, err := w.stream.Recv()
  1562. // Retriable errors mean we should start over and attempt to
  1563. // resend the entire buffer via a new stream.
  1564. // If not retriable, falling through will return the error received
  1565. // from closing the stream.
  1566. if shouldRetry(err) {
  1567. writeOffset, err = w.determineOffset(start)
  1568. if err != nil {
  1569. return nil, 0, err
  1570. }
  1571. sent = int(writeOffset) - int(start)
  1572. // Drop the stream reference as a new one will need to be created.
  1573. w.stream = nil
  1574. continue
  1575. }
  1576. if err != nil {
  1577. return nil, 0, err
  1578. }
  1579. if resp.GetPersistedSize() != writeOffset {
  1580. // Retry if not all bytes were persisted.
  1581. writeOffset = resp.GetPersistedSize()
  1582. sent = int(writeOffset) - int(start)
  1583. continue
  1584. }
  1585. } else {
  1586. // If the object is done uploading, close the send stream to signal
  1587. // to the server that we are done sending so that we can receive
  1588. // from the stream without blocking.
  1589. err = w.stream.CloseSend()
  1590. if err != nil {
  1591. // CloseSend() retries the send internally. It never returns an
  1592. // error in the current implementation, but we check it anyway in
  1593. // case that it does in the future.
  1594. return nil, 0, err
  1595. }
  1596. // Stream receives do not block once send is closed, but we may not
  1597. // receive the response with the object right away; loop until we
  1598. // receive the object or error out.
  1599. var obj *storagepb.Object
  1600. for obj == nil {
  1601. resp, err := w.stream.Recv()
  1602. if err != nil {
  1603. return nil, 0, err
  1604. }
  1605. obj = resp.GetResource()
  1606. }
  1607. // Even though we received the object response, continue reading
  1608. // until we receive a non-nil error, to ensure the stream does not
  1609. // leak even if the context isn't cancelled. See:
  1610. // https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
  1611. for err == nil {
  1612. _, err = w.stream.Recv()
  1613. }
  1614. return obj, writeOffset, nil
  1615. }
  1616. return nil, writeOffset, nil
  1617. }
  1618. }
  1619. // determineOffset either returns the offset given to it in the case of a simple
  1620. // upload, or queries the write status in the case a resumable upload is being
  1621. // used.
  1622. func (w *gRPCWriter) determineOffset(offset int64) (int64, error) {
  1623. // For a Resumable Upload, we must start from however much data
  1624. // was committed.
  1625. if w.upid != "" {
  1626. committed, err := w.queryProgress()
  1627. if err != nil {
  1628. return 0, err
  1629. }
  1630. offset = committed
  1631. }
  1632. return offset, nil
  1633. }
  1634. // writeObjectSpec constructs a WriteObjectSpec proto using the Writer's
  1635. // ObjectAttrs and applies its Conditions. This is only used for gRPC.
  1636. func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
  1637. // To avoid modifying the ObjectAttrs embeded in the calling writer, deref
  1638. // the ObjectAttrs pointer to make a copy, then assign the desired name to
  1639. // the attribute.
  1640. attrs := *w.attrs
  1641. spec := &storagepb.WriteObjectSpec{
  1642. Resource: attrs.toProtoObject(w.bucket),
  1643. }
  1644. // WriteObject doesn't support the generation condition, so use default.
  1645. if err := applyCondsProto("WriteObject", defaultGen, w.conds, spec); err != nil {
  1646. return nil, err
  1647. }
  1648. return spec, nil
  1649. }
  1650. // read copies the data in the reader to the given buffer and reports how much
  1651. // data was read into the buffer and if there is no more data to read (EOF).
  1652. // Furthermore, if the attrs.ContentType is unset, the first bytes of content
  1653. // will be sniffed for a matching content type unless forceEmptyContentType is enabled.
  1654. func (w *gRPCWriter) read() (int, bool, error) {
  1655. if w.attrs.ContentType == "" && !w.forceEmptyContentType {
  1656. w.reader, w.attrs.ContentType = gax.DetermineContentType(w.reader)
  1657. }
  1658. // Set n to -1 to start the Read loop.
  1659. var n, recvd int = -1, 0
  1660. var err error
  1661. for err == nil && n != 0 {
  1662. // The routine blocks here until data is received.
  1663. n, err = w.reader.Read(w.buf[recvd:])
  1664. recvd += n
  1665. }
  1666. var done bool
  1667. if err == io.EOF {
  1668. done = true
  1669. err = nil
  1670. }
  1671. return recvd, done, err
  1672. }
  1673. func checkCanceled(err error) error {
  1674. if status.Code(err) == codes.Canceled {
  1675. return context.Canceled
  1676. }
  1677. return err
  1678. }