dial.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // Copyright 2015 Google LLC.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package grpc supports network connections to GRPC servers.
  5. // This package is not intended for use by end developers. Use the
  6. // google.golang.org/api/option package to configure API clients.
  7. package grpc
  8. import (
  9. "context"
  10. "errors"
  11. "log"
  12. "net"
  13. "os"
  14. "strings"
  15. "sync"
  16. "time"
  17. "cloud.google.com/go/compute/metadata"
  18. "go.opencensus.io/plugin/ocgrpc"
  19. "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
  20. "golang.org/x/oauth2"
  21. "golang.org/x/time/rate"
  22. "google.golang.org/api/internal"
  23. "google.golang.org/api/option"
  24. "google.golang.org/grpc"
  25. grpcgoogle "google.golang.org/grpc/credentials/google"
  26. grpcinsecure "google.golang.org/grpc/credentials/insecure"
  27. "google.golang.org/grpc/credentials/oauth"
  28. "google.golang.org/grpc/stats"
  29. // Install grpclb, which is required for direct path.
  30. _ "google.golang.org/grpc/balancer/grpclb"
  31. )
  32. // Check env to disable DirectPath traffic.
  33. const disableDirectPath = "GOOGLE_CLOUD_DISABLE_DIRECT_PATH"
  34. // Check env to decide if using google-c2p resolver for DirectPath traffic.
  35. const enableDirectPathXds = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"
  36. // Set at init time by dial_socketopt.go. If nil, socketopt is not supported.
  37. var timeoutDialerOption grpc.DialOption
  38. // Log rate limiter
  39. var logRateLimiter = rate.Sometimes{Interval: 1 * time.Second}
  40. // Assign to var for unit test replacement
  41. var dialContext = grpc.DialContext
  42. // otelStatsHandler is a singleton otelgrpc.clientHandler to be used across
  43. // all dial connections to avoid the memory leak documented in
  44. // https://github.com/open-telemetry/opentelemetry-go-contrib/issues/4226
  45. //
  46. // TODO: If 4226 has been fixed in opentelemetry-go-contrib, replace this
  47. // singleton with inline usage for simplicity.
  48. var (
  49. initOtelStatsHandlerOnce sync.Once
  50. otelStatsHandler stats.Handler
  51. )
  52. // otelGRPCStatsHandler returns singleton otelStatsHandler for reuse across all
  53. // dial connections.
  54. func otelGRPCStatsHandler() stats.Handler {
  55. initOtelStatsHandlerOnce.Do(func() {
  56. otelStatsHandler = otelgrpc.NewClientHandler()
  57. })
  58. return otelStatsHandler
  59. }
  60. // Dial returns a GRPC connection for use communicating with a Google cloud
  61. // service, configured with the given ClientOptions.
  62. func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
  63. o, err := processAndValidateOpts(opts)
  64. if err != nil {
  65. return nil, err
  66. }
  67. if o.GRPCConnPool != nil {
  68. return o.GRPCConnPool.Conn(), nil
  69. }
  70. // NOTE(cbro): We removed support for option.WithGRPCConnPool (GRPCConnPoolSize)
  71. // on 2020-02-12 because RoundRobin and WithBalancer are deprecated and we need to remove usages of it.
  72. //
  73. // Connection pooling is only done via DialPool.
  74. return dial(ctx, false, o)
  75. }
  76. // DialInsecure returns an insecure GRPC connection for use communicating
  77. // with fake or mock Google cloud service implementations, such as emulators.
  78. // The connection is configured with the given ClientOptions.
  79. func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
  80. o, err := processAndValidateOpts(opts)
  81. if err != nil {
  82. return nil, err
  83. }
  84. return dial(ctx, true, o)
  85. }
  86. // DialPool returns a pool of GRPC connections for the given service.
  87. // This differs from the connection pooling implementation used by Dial, which uses a custom GRPC load balancer.
  88. // DialPool should be used instead of Dial when a pool is used by default or a different custom GRPC load balancer is needed.
  89. // The context and options are shared between each Conn in the pool.
  90. // The pool size is configured using the WithGRPCConnectionPool option.
  91. //
  92. // This API is subject to change as we further refine requirements. It will go away if gRPC stubs accept an interface instead of the concrete ClientConn type. See https://github.com/grpc/grpc-go/issues/1287.
  93. func DialPool(ctx context.Context, opts ...option.ClientOption) (ConnPool, error) {
  94. o, err := processAndValidateOpts(opts)
  95. if err != nil {
  96. return nil, err
  97. }
  98. if o.GRPCConnPool != nil {
  99. return o.GRPCConnPool, nil
  100. }
  101. poolSize := o.GRPCConnPoolSize
  102. if o.GRPCConn != nil {
  103. // WithGRPCConn is technically incompatible with WithGRPCConnectionPool.
  104. // Always assume pool size is 1 when a grpc.ClientConn is explicitly used.
  105. poolSize = 1
  106. }
  107. o.GRPCConnPoolSize = 0 // we don't *need* to set this to zero, but it's safe to.
  108. if poolSize == 0 || poolSize == 1 {
  109. // Fast path for common case for a connection pool with a single connection.
  110. conn, err := dial(ctx, false, o)
  111. if err != nil {
  112. return nil, err
  113. }
  114. return &singleConnPool{conn}, nil
  115. }
  116. pool := &roundRobinConnPool{}
  117. for i := 0; i < poolSize; i++ {
  118. conn, err := dial(ctx, false, o)
  119. if err != nil {
  120. defer pool.Close() // NOTE: error from Close is ignored.
  121. return nil, err
  122. }
  123. pool.conns = append(pool.conns, conn)
  124. }
  125. return pool, nil
  126. }
  127. func dial(ctx context.Context, insecure bool, o *internal.DialSettings) (*grpc.ClientConn, error) {
  128. if o.HTTPClient != nil {
  129. return nil, errors.New("unsupported HTTP client specified")
  130. }
  131. if o.GRPCConn != nil {
  132. return o.GRPCConn, nil
  133. }
  134. transportCreds, endpoint, err := internal.GetGRPCTransportConfigAndEndpoint(o)
  135. if err != nil {
  136. return nil, err
  137. }
  138. if insecure {
  139. transportCreds = grpcinsecure.NewCredentials()
  140. }
  141. // Initialize gRPC dial options with transport-level security options.
  142. grpcOpts := []grpc.DialOption{
  143. grpc.WithTransportCredentials(transportCreds),
  144. }
  145. // Authentication can only be sent when communicating over a secure connection.
  146. //
  147. // TODO: Should we be more lenient in the future and allow sending credentials
  148. // when dialing an insecure connection?
  149. if !o.NoAuth && !insecure {
  150. if o.APIKey != "" {
  151. grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(grpcAPIKey{
  152. apiKey: o.APIKey,
  153. requestReason: o.RequestReason,
  154. }))
  155. } else {
  156. creds, err := internal.Creds(ctx, o)
  157. if err != nil {
  158. return nil, err
  159. }
  160. if o.TokenSource == nil {
  161. // We only validate non-tokensource creds, as TokenSource-based credentials
  162. // don't propagate universe.
  163. credsUniverseDomain, err := internal.GetUniverseDomain(creds)
  164. if err != nil {
  165. return nil, err
  166. }
  167. if o.GetUniverseDomain() != credsUniverseDomain {
  168. return nil, internal.ErrUniverseNotMatch(o.GetUniverseDomain(), credsUniverseDomain)
  169. }
  170. }
  171. grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(grpcTokenSource{
  172. TokenSource: oauth.TokenSource{TokenSource: creds.TokenSource},
  173. quotaProject: internal.GetQuotaProject(creds, o.QuotaProject),
  174. requestReason: o.RequestReason,
  175. }))
  176. // Attempt Direct Path:
  177. logRateLimiter.Do(func() {
  178. logDirectPathMisconfig(endpoint, creds.TokenSource, o)
  179. })
  180. if isDirectPathEnabled(endpoint, o) && isTokenSourceDirectPathCompatible(creds.TokenSource, o) && metadata.OnGCE() {
  181. // Overwrite all of the previously specific DialOptions, DirectPath uses its own set of credentials and certificates.
  182. grpcOpts = []grpc.DialOption{
  183. grpc.WithCredentialsBundle(grpcgoogle.NewDefaultCredentialsWithOptions(
  184. grpcgoogle.DefaultCredentialsOptions{
  185. PerRPCCreds: oauth.TokenSource{TokenSource: creds.TokenSource},
  186. })),
  187. }
  188. if timeoutDialerOption != nil {
  189. grpcOpts = append(grpcOpts, timeoutDialerOption)
  190. }
  191. // Check if google-c2p resolver is enabled for DirectPath
  192. if isDirectPathXdsUsed(o) {
  193. // google-c2p resolver target must not have a port number
  194. if addr, _, err := net.SplitHostPort(endpoint); err == nil {
  195. endpoint = "google-c2p:///" + addr
  196. } else {
  197. endpoint = "google-c2p:///" + endpoint
  198. }
  199. } else {
  200. if !strings.HasPrefix(endpoint, "dns:///") {
  201. endpoint = "dns:///" + endpoint
  202. }
  203. grpcOpts = append(grpcOpts,
  204. // For now all DirectPath go clients will be using the following lb config, but in future
  205. // when different services need different configs, then we should change this to a
  206. // per-service config.
  207. grpc.WithDisableServiceConfig(),
  208. grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`))
  209. }
  210. // TODO(cbro): add support for system parameters (quota project, request reason) via chained interceptor.
  211. }
  212. }
  213. }
  214. // Add tracing, but before the other options, so that clients can override the
  215. // gRPC stats handler.
  216. // This assumes that gRPC options are processed in order, left to right.
  217. grpcOpts = addOCStatsHandler(grpcOpts, o)
  218. grpcOpts = addOpenTelemetryStatsHandler(grpcOpts, o)
  219. grpcOpts = append(grpcOpts, o.GRPCDialOpts...)
  220. if o.UserAgent != "" {
  221. grpcOpts = append(grpcOpts, grpc.WithUserAgent(o.UserAgent))
  222. }
  223. return dialContext(ctx, endpoint, grpcOpts...)
  224. }
  225. func addOCStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
  226. if settings.TelemetryDisabled {
  227. return opts
  228. }
  229. return append(opts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
  230. }
  231. func addOpenTelemetryStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
  232. if settings.TelemetryDisabled {
  233. return opts
  234. }
  235. return append(opts, grpc.WithStatsHandler(otelGRPCStatsHandler()))
  236. }
  237. // grpcTokenSource supplies PerRPCCredentials from an oauth.TokenSource.
  238. type grpcTokenSource struct {
  239. oauth.TokenSource
  240. // Additional metadata attached as headers.
  241. quotaProject string
  242. requestReason string
  243. }
  244. // GetRequestMetadata gets the request metadata as a map from a grpcTokenSource.
  245. func (ts grpcTokenSource) GetRequestMetadata(ctx context.Context, uri ...string) (
  246. map[string]string, error) {
  247. metadata, err := ts.TokenSource.GetRequestMetadata(ctx, uri...)
  248. if err != nil {
  249. return nil, err
  250. }
  251. // Attach system parameter
  252. if ts.quotaProject != "" {
  253. metadata["X-goog-user-project"] = ts.quotaProject
  254. }
  255. if ts.requestReason != "" {
  256. metadata["X-goog-request-reason"] = ts.requestReason
  257. }
  258. return metadata, nil
  259. }
  260. // grpcAPIKey supplies PerRPCCredentials from an API Key.
  261. type grpcAPIKey struct {
  262. apiKey string
  263. // Additional metadata attached as headers.
  264. requestReason string
  265. }
  266. // GetRequestMetadata gets the request metadata as a map from a grpcAPIKey.
  267. func (ts grpcAPIKey) GetRequestMetadata(ctx context.Context, uri ...string) (
  268. map[string]string, error) {
  269. metadata := map[string]string{
  270. "X-goog-api-key": ts.apiKey,
  271. }
  272. if ts.requestReason != "" {
  273. metadata["X-goog-request-reason"] = ts.requestReason
  274. }
  275. return metadata, nil
  276. }
  277. // RequireTransportSecurity indicates whether the credentials requires transport security.
  278. func (ts grpcAPIKey) RequireTransportSecurity() bool {
  279. return true
  280. }
  281. func isDirectPathEnabled(endpoint string, o *internal.DialSettings) bool {
  282. if !o.EnableDirectPath {
  283. return false
  284. }
  285. if !checkDirectPathEndPoint(endpoint) {
  286. return false
  287. }
  288. if strings.EqualFold(os.Getenv(disableDirectPath), "true") {
  289. return false
  290. }
  291. return true
  292. }
  293. func isDirectPathXdsUsed(o *internal.DialSettings) bool {
  294. // Method 1: Enable DirectPath xDS by env;
  295. if strings.EqualFold(os.Getenv(enableDirectPathXds), "true") {
  296. return true
  297. }
  298. // Method 2: Enable DirectPath xDS by option;
  299. if o.EnableDirectPathXds {
  300. return true
  301. }
  302. return false
  303. }
  304. func isTokenSourceDirectPathCompatible(ts oauth2.TokenSource, o *internal.DialSettings) bool {
  305. if ts == nil {
  306. return false
  307. }
  308. tok, err := ts.Token()
  309. if err != nil {
  310. return false
  311. }
  312. if tok == nil {
  313. return false
  314. }
  315. if o.AllowNonDefaultServiceAccount {
  316. return true
  317. }
  318. if source, _ := tok.Extra("oauth2.google.tokenSource").(string); source != "compute-metadata" {
  319. return false
  320. }
  321. if acct, _ := tok.Extra("oauth2.google.serviceAccount").(string); acct != "default" {
  322. return false
  323. }
  324. return true
  325. }
  326. func checkDirectPathEndPoint(endpoint string) bool {
  327. // Only [dns:///]host[:port] is supported, not other schemes (e.g., "tcp://" or "unix://").
  328. // Also don't try direct path if the user has chosen an alternate name resolver
  329. // (i.e., via ":///" prefix).
  330. //
  331. // TODO(cbro): once gRPC has introspectible options, check the user hasn't
  332. // provided a custom dialer in gRPC options.
  333. if strings.Contains(endpoint, "://") && !strings.HasPrefix(endpoint, "dns:///") {
  334. return false
  335. }
  336. if endpoint == "" {
  337. return false
  338. }
  339. return true
  340. }
  341. func logDirectPathMisconfig(endpoint string, ts oauth2.TokenSource, o *internal.DialSettings) {
  342. if isDirectPathXdsUsed(o) {
  343. // Case 1: does not enable DirectPath
  344. if !isDirectPathEnabled(endpoint, o) {
  345. log.Println("WARNING: DirectPath is misconfigured. Please set the EnableDirectPath option along with the EnableDirectPathXds option.")
  346. } else {
  347. // Case 2: credential is not correctly set
  348. if !isTokenSourceDirectPathCompatible(ts, o) {
  349. log.Println("WARNING: DirectPath is misconfigured. Please make sure the token source is fetched from GCE metadata server and the default service account is used.")
  350. }
  351. // Case 3: not running on GCE
  352. if !metadata.OnGCE() {
  353. log.Println("WARNING: DirectPath is misconfigured. DirectPath is only available in a GCE environment.")
  354. }
  355. }
  356. }
  357. }
  358. func processAndValidateOpts(opts []option.ClientOption) (*internal.DialSettings, error) {
  359. var o internal.DialSettings
  360. for _, opt := range opts {
  361. opt.Apply(&o)
  362. }
  363. if err := o.Validate(); err != nil {
  364. return nil, err
  365. }
  366. return &o, nil
  367. }
  368. type connPoolOption struct{ ConnPool }
  369. // WithConnPool returns a ClientOption that specifies the ConnPool
  370. // connection to use as the basis of communications.
  371. //
  372. // This is only to be used by Google client libraries internally, for example
  373. // when creating a longrunning API client that shares the same connection pool
  374. // as a service client.
  375. func WithConnPool(p ConnPool) option.ClientOption {
  376. return connPoolOption{p}
  377. }
  378. func (o connPoolOption) Apply(s *internal.DialSettings) {
  379. s.GRPCConnPool = o.ConnPool
  380. }