grpclb.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package grpclb defines a grpclb balancer.
  19. //
  20. // To install grpclb balancer, import this package as:
  21. //
  22. // import _ "google.golang.org/grpc/balancer/grpclb"
  23. package grpclb
  24. import (
  25. "context"
  26. "errors"
  27. "fmt"
  28. "sync"
  29. "time"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/balancer"
  32. "google.golang.org/grpc/balancer/base"
  33. grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
  34. "google.golang.org/grpc/connectivity"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/grpclog"
  37. "google.golang.org/grpc/internal"
  38. "google.golang.org/grpc/internal/backoff"
  39. internalgrpclog "google.golang.org/grpc/internal/grpclog"
  40. "google.golang.org/grpc/internal/pretty"
  41. "google.golang.org/grpc/internal/resolver/dns"
  42. "google.golang.org/grpc/resolver"
  43. "google.golang.org/grpc/resolver/manual"
  44. "google.golang.org/protobuf/types/known/durationpb"
  45. lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  46. )
  47. const (
  48. lbTokenKey = "lb-token"
  49. defaultFallbackTimeout = 10 * time.Second
  50. grpclbName = "grpclb"
  51. )
  52. var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
  53. var logger = grpclog.Component("grpclb")
  54. func convertDuration(d *durationpb.Duration) time.Duration {
  55. if d == nil {
  56. return 0
  57. }
  58. return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
  59. }
  60. // Client API for LoadBalancer service.
  61. // Mostly copied from generated pb.go file.
  62. // To avoid circular dependency.
  63. type loadBalancerClient struct {
  64. cc *grpc.ClientConn
  65. }
  66. func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
  67. desc := &grpc.StreamDesc{
  68. StreamName: "BalanceLoad",
  69. ServerStreams: true,
  70. ClientStreams: true,
  71. }
  72. stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
  73. if err != nil {
  74. return nil, err
  75. }
  76. x := &balanceLoadClientStream{stream}
  77. return x, nil
  78. }
  79. type balanceLoadClientStream struct {
  80. grpc.ClientStream
  81. }
  82. func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
  83. return x.ClientStream.SendMsg(m)
  84. }
  85. func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
  86. m := new(lbpb.LoadBalanceResponse)
  87. if err := x.ClientStream.RecvMsg(m); err != nil {
  88. return nil, err
  89. }
  90. return m, nil
  91. }
  92. func init() {
  93. balancer.Register(newLBBuilder())
  94. dns.EnableSRVLookups = true
  95. }
  96. // newLBBuilder creates a builder for grpclb.
  97. func newLBBuilder() balancer.Builder {
  98. return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
  99. }
  100. // newLBBuilderWithFallbackTimeout creates a grpclb builder with the given
  101. // fallbackTimeout. If no response is received from the remote balancer within
  102. // fallbackTimeout, the backend addresses from the resolved address list will be
  103. // used.
  104. //
  105. // Only call this function when a non-default fallback timeout is needed.
  106. func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
  107. return &lbBuilder{
  108. fallbackTimeout: fallbackTimeout,
  109. }
  110. }
  111. type lbBuilder struct {
  112. fallbackTimeout time.Duration
  113. }
  114. func (b *lbBuilder) Name() string {
  115. return grpclbName
  116. }
  117. func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  118. // This generates a manual resolver builder with a fixed scheme. This
  119. // scheme will be used to dial to remote LB, so we can send filtered
  120. // address updates to remote LB ClientConn using this manual resolver.
  121. mr := manual.NewBuilderWithScheme("grpclb-internal")
  122. // ResolveNow() on this manual resolver is forwarded to the parent
  123. // ClientConn, so when grpclb client loses contact with the remote balancer,
  124. // the parent ClientConn's resolver will re-resolve.
  125. mr.ResolveNowCallback = cc.ResolveNow
  126. lb := &lbBalancer{
  127. cc: newLBCacheClientConn(cc),
  128. dialTarget: opt.Target.Endpoint(),
  129. target: opt.Target.Endpoint(),
  130. opt: opt,
  131. fallbackTimeout: b.fallbackTimeout,
  132. doneCh: make(chan struct{}),
  133. manualResolver: mr,
  134. subConns: make(map[resolver.Address]balancer.SubConn),
  135. scStates: make(map[balancer.SubConn]connectivity.State),
  136. picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
  137. clientStats: newRPCStats(),
  138. backoff: backoff.DefaultExponential, // TODO: make backoff configurable.
  139. }
  140. lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[grpclb %p] ", lb))
  141. var err error
  142. if opt.CredsBundle != nil {
  143. lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
  144. if err != nil {
  145. lb.logger.Warningf("Failed to create credentials used for connecting to grpclb: %v", err)
  146. }
  147. lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
  148. if err != nil {
  149. lb.logger.Warningf("Failed to create credentials used for connecting to backends returned by grpclb: %v", err)
  150. }
  151. }
  152. return lb
  153. }
  154. type lbBalancer struct {
  155. cc *lbCacheClientConn
  156. dialTarget string // user's dial target
  157. target string // same as dialTarget unless overridden in service config
  158. opt balancer.BuildOptions
  159. logger *internalgrpclog.PrefixLogger
  160. usePickFirst bool
  161. // grpclbClientConnCreds is the creds bundle to be used to connect to grpclb
  162. // servers. If it's nil, use the TransportCredentials from BuildOptions
  163. // instead.
  164. grpclbClientConnCreds credentials.Bundle
  165. // grpclbBackendCreds is the creds bundle to be used for addresses that are
  166. // returned by grpclb server. If it's nil, don't set anything when creating
  167. // SubConns.
  168. grpclbBackendCreds credentials.Bundle
  169. fallbackTimeout time.Duration
  170. doneCh chan struct{}
  171. // manualResolver is used in the remote LB ClientConn inside grpclb. When
  172. // resolved address updates are received by grpclb, filtered updates will be
  173. // send to remote LB ClientConn through this resolver.
  174. manualResolver *manual.Resolver
  175. // The ClientConn to talk to the remote balancer.
  176. ccRemoteLB *remoteBalancerCCWrapper
  177. // backoff for calling remote balancer.
  178. backoff backoff.Strategy
  179. // Support client side load reporting. Each picker gets a reference to this,
  180. // and will update its content.
  181. clientStats *rpcStats
  182. mu sync.Mutex // guards everything following.
  183. // The full server list including drops, used to check if the newly received
  184. // serverList contains anything new. Each generate picker will also have
  185. // reference to this list to do the first layer pick.
  186. fullServerList []*lbpb.Server
  187. // Backend addresses. It's kept so the addresses are available when
  188. // switching between round_robin and pickfirst.
  189. backendAddrs []resolver.Address
  190. // All backends addresses, with metadata set to nil. This list contains all
  191. // backend addresses in the same order and with the same duplicates as in
  192. // serverlist. When generating picker, a SubConn slice with the same order
  193. // but with only READY SCs will be gerenated.
  194. backendAddrsWithoutMetadata []resolver.Address
  195. // Roundrobin functionalities.
  196. state connectivity.State
  197. subConns map[resolver.Address]balancer.SubConn // Used to new/shutdown SubConn.
  198. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
  199. picker balancer.Picker
  200. // Support fallback to resolved backend addresses if there's no response
  201. // from remote balancer within fallbackTimeout.
  202. remoteBalancerConnected bool
  203. serverListReceived bool
  204. inFallback bool
  205. // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
  206. // when resolved address updates are received, and read in the goroutine
  207. // handling fallback.
  208. resolvedBackendAddrs []resolver.Address
  209. connErr error // the last connection error
  210. }
  211. // regeneratePicker takes a snapshot of the balancer, and generates a picker from
  212. // it. The picker
  213. // - always returns ErrTransientFailure if the balancer is in TransientFailure,
  214. // - does two layer roundrobin pick otherwise.
  215. //
  216. // Caller must hold lb.mu.
  217. func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
  218. if lb.state == connectivity.TransientFailure {
  219. lb.picker = base.NewErrPicker(fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr))
  220. return
  221. }
  222. if lb.state == connectivity.Connecting {
  223. lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  224. return
  225. }
  226. var readySCs []balancer.SubConn
  227. if lb.usePickFirst {
  228. for _, sc := range lb.subConns {
  229. readySCs = append(readySCs, sc)
  230. break
  231. }
  232. } else {
  233. for _, a := range lb.backendAddrsWithoutMetadata {
  234. if sc, ok := lb.subConns[a]; ok {
  235. if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
  236. readySCs = append(readySCs, sc)
  237. }
  238. }
  239. }
  240. }
  241. if len(readySCs) <= 0 {
  242. // If there's no ready SubConns, always re-pick. This is to avoid drops
  243. // unless at least one SubConn is ready. Otherwise we may drop more
  244. // often than want because of drops + re-picks(which become re-drops).
  245. //
  246. // This doesn't seem to be necessary after the connecting check above.
  247. // Kept for safety.
  248. lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  249. return
  250. }
  251. if lb.inFallback {
  252. lb.picker = newRRPicker(readySCs)
  253. return
  254. }
  255. if resetDrop {
  256. lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
  257. return
  258. }
  259. prevLBPicker, ok := lb.picker.(*lbPicker)
  260. if !ok {
  261. lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
  262. return
  263. }
  264. prevLBPicker.updateReadySCs(readySCs)
  265. }
  266. // aggregateSubConnStats calculate the aggregated state of SubConns in
  267. // lb.SubConns. These SubConns are subconns in use (when switching between
  268. // fallback and grpclb). lb.scState contains states for all SubConns, including
  269. // those in cache (SubConns are cached for 10 seconds after shutdown).
  270. //
  271. // The aggregated state is:
  272. // - If at least one SubConn in Ready, the aggregated state is Ready;
  273. // - Else if at least one SubConn in Connecting or IDLE, the aggregated state is Connecting;
  274. // - It's OK to consider IDLE as Connecting. SubConns never stay in IDLE,
  275. // they start to connect immediately. But there's a race between the overall
  276. // state is reported, and when the new SubConn state arrives. And SubConns
  277. // never go back to IDLE.
  278. // - Else the aggregated state is TransientFailure.
  279. func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
  280. var numConnecting uint64
  281. for _, sc := range lb.subConns {
  282. if state, ok := lb.scStates[sc]; ok {
  283. switch state {
  284. case connectivity.Ready:
  285. return connectivity.Ready
  286. case connectivity.Connecting, connectivity.Idle:
  287. numConnecting++
  288. }
  289. }
  290. }
  291. if numConnecting > 0 {
  292. return connectivity.Connecting
  293. }
  294. return connectivity.TransientFailure
  295. }
  296. // UpdateSubConnState is unused; NewSubConn's options always specifies
  297. // updateSubConnState as the listener.
  298. func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
  299. lb.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
  300. }
  301. func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
  302. s := scs.ConnectivityState
  303. if lb.logger.V(2) {
  304. lb.logger.Infof("SubConn state change: %p, %v", sc, s)
  305. }
  306. lb.mu.Lock()
  307. defer lb.mu.Unlock()
  308. oldS, ok := lb.scStates[sc]
  309. if !ok {
  310. if lb.logger.V(2) {
  311. lb.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s)
  312. }
  313. return
  314. }
  315. lb.scStates[sc] = s
  316. switch s {
  317. case connectivity.Idle:
  318. sc.Connect()
  319. case connectivity.Shutdown:
  320. // When an address was removed by resolver, b called Shutdown but kept
  321. // the sc's state in scStates. Remove state for this sc here.
  322. delete(lb.scStates, sc)
  323. case connectivity.TransientFailure:
  324. lb.connErr = scs.ConnectionError
  325. }
  326. // Force regenerate picker if
  327. // - this sc became ready from not-ready
  328. // - this sc became not-ready from ready
  329. lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)
  330. // Enter fallback when the aggregated state is not Ready and the connection
  331. // to remote balancer is lost.
  332. if lb.state != connectivity.Ready {
  333. if !lb.inFallback && !lb.remoteBalancerConnected {
  334. // Enter fallback.
  335. lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
  336. }
  337. }
  338. }
  339. // updateStateAndPicker re-calculate the aggregated state, and regenerate picker
  340. // if overall state is changed.
  341. //
  342. // If forceRegeneratePicker is true, picker will be regenerated.
  343. func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
  344. oldAggrState := lb.state
  345. lb.state = lb.aggregateSubConnStates()
  346. // Regenerate picker when one of the following happens:
  347. // - caller wants to regenerate
  348. // - the aggregated state changed
  349. if forceRegeneratePicker || (lb.state != oldAggrState) {
  350. lb.regeneratePicker(resetDrop)
  351. }
  352. var cc balancer.ClientConn = lb.cc
  353. if lb.usePickFirst {
  354. // Bypass the caching layer that would wrap the picker.
  355. cc = lb.cc.ClientConn
  356. }
  357. cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
  358. }
  359. // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
  360. // resolved backends (backends received from resolver, not from remote balancer)
  361. // if no connection to remote balancers was successful.
  362. func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
  363. timer := time.NewTimer(fallbackTimeout)
  364. defer timer.Stop()
  365. select {
  366. case <-timer.C:
  367. case <-lb.doneCh:
  368. return
  369. }
  370. lb.mu.Lock()
  371. if lb.inFallback || lb.serverListReceived {
  372. lb.mu.Unlock()
  373. return
  374. }
  375. // Enter fallback.
  376. lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
  377. lb.mu.Unlock()
  378. }
  379. func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
  380. lb.mu.Lock()
  381. defer lb.mu.Unlock()
  382. // grpclb uses the user's dial target to populate the `Name` field of the
  383. // `InitialLoadBalanceRequest` message sent to the remote balancer. But when
  384. // grpclb is used a child policy in the context of RLS, we want the `Name`
  385. // field to be populated with the value received from the RLS server. To
  386. // support this use case, an optional "target_name" field has been added to
  387. // the grpclb LB policy's config. If specified, it overrides the name of
  388. // the target to be sent to the remote balancer; if not, the target to be
  389. // sent to the balancer will continue to be obtained from the target URI
  390. // passed to the gRPC client channel. Whenever that target to be sent to the
  391. // balancer is updated, we need to restart the stream to the balancer as
  392. // this target is sent in the first message on the stream.
  393. if gc != nil {
  394. target := lb.dialTarget
  395. if gc.ServiceName != "" {
  396. target = gc.ServiceName
  397. }
  398. if target != lb.target {
  399. lb.target = target
  400. if lb.ccRemoteLB != nil {
  401. lb.ccRemoteLB.cancelRemoteBalancerCall()
  402. }
  403. }
  404. }
  405. newUsePickFirst := childIsPickFirst(gc)
  406. if lb.usePickFirst == newUsePickFirst {
  407. return
  408. }
  409. if lb.logger.V(2) {
  410. lb.logger.Infof("Switching mode. Is pick_first used for backends? %v", newUsePickFirst)
  411. }
  412. lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
  413. }
  414. func (lb *lbBalancer) ResolverError(error) {
  415. // Ignore resolver errors. GRPCLB is not selected unless the resolver
  416. // works at least once.
  417. }
  418. func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
  419. if lb.logger.V(2) {
  420. lb.logger.Infof("UpdateClientConnState: %s", pretty.ToJSON(ccs))
  421. }
  422. gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
  423. lb.handleServiceConfig(gc)
  424. backendAddrs := ccs.ResolverState.Addresses
  425. var remoteBalancerAddrs []resolver.Address
  426. if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
  427. // Override any balancer addresses provided via
  428. // ccs.ResolverState.Addresses.
  429. remoteBalancerAddrs = sd.BalancerAddresses
  430. }
  431. if len(backendAddrs)+len(remoteBalancerAddrs) == 0 {
  432. // There should be at least one address, either grpclb server or
  433. // fallback. Empty address is not valid.
  434. return balancer.ErrBadResolverState
  435. }
  436. if len(remoteBalancerAddrs) == 0 {
  437. if lb.ccRemoteLB != nil {
  438. lb.ccRemoteLB.close()
  439. lb.ccRemoteLB = nil
  440. }
  441. } else if lb.ccRemoteLB == nil {
  442. // First time receiving resolved addresses, create a cc to remote
  443. // balancers.
  444. if err := lb.newRemoteBalancerCCWrapper(); err != nil {
  445. return err
  446. }
  447. // Start the fallback goroutine.
  448. go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
  449. }
  450. if lb.ccRemoteLB != nil {
  451. // cc to remote balancers uses lb.manualResolver. Send the updated remote
  452. // balancer addresses to it through manualResolver.
  453. lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
  454. }
  455. lb.mu.Lock()
  456. lb.resolvedBackendAddrs = backendAddrs
  457. if len(remoteBalancerAddrs) == 0 || lb.inFallback {
  458. // If there's no remote balancer address in ClientConn update, grpclb
  459. // enters fallback mode immediately.
  460. //
  461. // If a new update is received while grpclb is in fallback, update the
  462. // list of backends being used to the new fallback backends.
  463. lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
  464. }
  465. lb.mu.Unlock()
  466. return nil
  467. }
  468. func (lb *lbBalancer) Close() {
  469. select {
  470. case <-lb.doneCh:
  471. return
  472. default:
  473. }
  474. close(lb.doneCh)
  475. if lb.ccRemoteLB != nil {
  476. lb.ccRemoteLB.close()
  477. }
  478. lb.cc.close()
  479. }
  480. func (lb *lbBalancer) ExitIdle() {}