singleflight.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. // Copyright 2013 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package singleflight provides a duplicate function call suppression
  5. // mechanism.
  6. package singleflight // import "golang.org/x/sync/singleflight"
  7. import (
  8. "bytes"
  9. "errors"
  10. "fmt"
  11. "runtime"
  12. "runtime/debug"
  13. "sync"
  14. )
  15. // errGoexit indicates the runtime.Goexit was called in
  16. // the user given function.
  17. var errGoexit = errors.New("runtime.Goexit was called")
  18. // A panicError is an arbitrary value recovered from a panic
  19. // with the stack trace during the execution of given function.
  20. type panicError struct {
  21. value interface{}
  22. stack []byte
  23. }
  24. // Error implements error interface.
  25. func (p *panicError) Error() string {
  26. return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
  27. }
  28. func (p *panicError) Unwrap() error {
  29. err, ok := p.value.(error)
  30. if !ok {
  31. return nil
  32. }
  33. return err
  34. }
  35. func newPanicError(v interface{}) error {
  36. stack := debug.Stack()
  37. // The first line of the stack trace is of the form "goroutine N [status]:"
  38. // but by the time the panic reaches Do the goroutine may no longer exist
  39. // and its status will have changed. Trim out the misleading line.
  40. if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
  41. stack = stack[line+1:]
  42. }
  43. return &panicError{value: v, stack: stack}
  44. }
  45. // call is an in-flight or completed singleflight.Do call
  46. type call struct {
  47. wg sync.WaitGroup
  48. // These fields are written once before the WaitGroup is done
  49. // and are only read after the WaitGroup is done.
  50. val interface{}
  51. err error
  52. // These fields are read and written with the singleflight
  53. // mutex held before the WaitGroup is done, and are read but
  54. // not written after the WaitGroup is done.
  55. dups int
  56. chans []chan<- Result
  57. }
  58. // Group represents a class of work and forms a namespace in
  59. // which units of work can be executed with duplicate suppression.
  60. type Group struct {
  61. mu sync.Mutex // protects m
  62. m map[string]*call // lazily initialized
  63. }
  64. // Result holds the results of Do, so they can be passed
  65. // on a channel.
  66. type Result struct {
  67. Val interface{}
  68. Err error
  69. Shared bool
  70. }
  71. // Do executes and returns the results of the given function, making
  72. // sure that only one execution is in-flight for a given key at a
  73. // time. If a duplicate comes in, the duplicate caller waits for the
  74. // original to complete and receives the same results.
  75. // The return value shared indicates whether v was given to multiple callers.
  76. func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  77. g.mu.Lock()
  78. if g.m == nil {
  79. g.m = make(map[string]*call)
  80. }
  81. if c, ok := g.m[key]; ok {
  82. c.dups++
  83. g.mu.Unlock()
  84. c.wg.Wait()
  85. if e, ok := c.err.(*panicError); ok {
  86. panic(e)
  87. } else if c.err == errGoexit {
  88. runtime.Goexit()
  89. }
  90. return c.val, c.err, true
  91. }
  92. c := new(call)
  93. c.wg.Add(1)
  94. g.m[key] = c
  95. g.mu.Unlock()
  96. g.doCall(c, key, fn)
  97. return c.val, c.err, c.dups > 0
  98. }
  99. // DoChan is like Do but returns a channel that will receive the
  100. // results when they are ready.
  101. //
  102. // The returned channel will not be closed.
  103. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
  104. ch := make(chan Result, 1)
  105. g.mu.Lock()
  106. if g.m == nil {
  107. g.m = make(map[string]*call)
  108. }
  109. if c, ok := g.m[key]; ok {
  110. c.dups++
  111. c.chans = append(c.chans, ch)
  112. g.mu.Unlock()
  113. return ch
  114. }
  115. c := &call{chans: []chan<- Result{ch}}
  116. c.wg.Add(1)
  117. g.m[key] = c
  118. g.mu.Unlock()
  119. go g.doCall(c, key, fn)
  120. return ch
  121. }
  122. // doCall handles the single call for a key.
  123. func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
  124. normalReturn := false
  125. recovered := false
  126. // use double-defer to distinguish panic from runtime.Goexit,
  127. // more details see https://golang.org/cl/134395
  128. defer func() {
  129. // the given function invoked runtime.Goexit
  130. if !normalReturn && !recovered {
  131. c.err = errGoexit
  132. }
  133. g.mu.Lock()
  134. defer g.mu.Unlock()
  135. c.wg.Done()
  136. if g.m[key] == c {
  137. delete(g.m, key)
  138. }
  139. if e, ok := c.err.(*panicError); ok {
  140. // In order to prevent the waiting channels from being blocked forever,
  141. // needs to ensure that this panic cannot be recovered.
  142. if len(c.chans) > 0 {
  143. go panic(e)
  144. select {} // Keep this goroutine around so that it will appear in the crash dump.
  145. } else {
  146. panic(e)
  147. }
  148. } else if c.err == errGoexit {
  149. // Already in the process of goexit, no need to call again
  150. } else {
  151. // Normal return
  152. for _, ch := range c.chans {
  153. ch <- Result{c.val, c.err, c.dups > 0}
  154. }
  155. }
  156. }()
  157. func() {
  158. defer func() {
  159. if !normalReturn {
  160. // Ideally, we would wait to take a stack trace until we've determined
  161. // whether this is a panic or a runtime.Goexit.
  162. //
  163. // Unfortunately, the only way we can distinguish the two is to see
  164. // whether the recover stopped the goroutine from terminating, and by
  165. // the time we know that, the part of the stack trace relevant to the
  166. // panic has been discarded.
  167. if r := recover(); r != nil {
  168. c.err = newPanicError(r)
  169. }
  170. }
  171. }()
  172. c.val, c.err = fn()
  173. normalReturn = true
  174. }()
  175. if !normalReturn {
  176. recovered = true
  177. }
  178. }
  179. // Forget tells the singleflight to forget about a key. Future calls
  180. // to Do for this key will call the function rather than waiting for
  181. // an earlier call to complete.
  182. func (g *Group) Forget(key string) {
  183. g.mu.Lock()
  184. delete(g.m, key)
  185. g.mu.Unlock()
  186. }