pool.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // Copyright (c) 2018 David Crawshaw <david@zentus.com>
  2. // Copyright (c) 2021 Ross Light <rosss@zombiezen.com>
  3. //
  4. // Permission to use, copy, modify, and distribute this software for any
  5. // purpose with or without fee is hereby granted, provided that the above
  6. // copyright notice and this permission notice appear in all copies.
  7. //
  8. // THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9. // WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  10. // MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  11. // ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  12. // WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  13. // ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  14. // OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. //
  16. // SPDX-License-Identifier: ISC
  17. package sqlitex
  18. import (
  19. "context"
  20. "fmt"
  21. "sync"
  22. "zombiezen.com/go/sqlite"
  23. )
  24. // Pool is a pool of SQLite connections.
  25. // It is safe for use by multiple goroutines concurrently.
  26. type Pool struct {
  27. free chan *sqlite.Conn
  28. closed chan struct{}
  29. mu sync.Mutex
  30. all map[*sqlite.Conn]context.CancelFunc
  31. }
  32. // Open opens a fixed-size pool of SQLite connections.
  33. // A flags value of 0 defaults to:
  34. //
  35. // SQLITE_OPEN_READWRITE
  36. // SQLITE_OPEN_CREATE
  37. // SQLITE_OPEN_WAL
  38. // SQLITE_OPEN_URI
  39. // SQLITE_OPEN_NOMUTEX
  40. func Open(uri string, flags sqlite.OpenFlags, poolSize int) (pool *Pool, err error) {
  41. if uri == ":memory:" {
  42. return nil, strerror{msg: `sqlite: ":memory:" does not work with multiple connections, use "file::memory:?mode=memory"`}
  43. }
  44. p := &Pool{
  45. free: make(chan *sqlite.Conn, poolSize),
  46. closed: make(chan struct{}),
  47. }
  48. defer func() {
  49. // If an error occurred, call Close outside the lock so this doesn't deadlock.
  50. if err != nil {
  51. p.Close()
  52. }
  53. }()
  54. if flags == 0 {
  55. flags = sqlite.OpenReadWrite |
  56. sqlite.OpenCreate |
  57. sqlite.OpenWAL |
  58. sqlite.OpenURI
  59. }
  60. // TODO(maybe)
  61. // sqlitex_pool is also defined in package sqlite
  62. // const sqlitex_pool = sqlite.OpenFlags(0x01000000)
  63. // flags |= sqlitex_pool
  64. p.all = make(map[*sqlite.Conn]context.CancelFunc)
  65. for i := 0; i < poolSize; i++ {
  66. conn, err := sqlite.OpenConn(uri, flags)
  67. if err != nil {
  68. return nil, err
  69. }
  70. p.free <- conn
  71. p.all[conn] = func() {}
  72. }
  73. return p, nil
  74. }
  75. // Get returns an SQLite connection from the Pool.
  76. //
  77. // If no Conn is available, Get will block until at least one Conn is returned
  78. // with Put, or until either the Pool is closed or the context is canceled. If
  79. // no Conn can be obtained, nil is returned.
  80. //
  81. // The provided context is also used to control the execution lifetime of the
  82. // connection. See Conn.SetInterrupt for details.
  83. //
  84. // Applications must ensure that all non-nil Conns returned from Get are
  85. // returned to the same Pool with Put.
  86. //
  87. // Although ctx historically may be nil, this is not a recommended design
  88. // pattern.
  89. func (p *Pool) Get(ctx context.Context) *sqlite.Conn {
  90. if ctx == nil {
  91. ctx = context.Background()
  92. }
  93. select {
  94. case conn := <-p.free:
  95. ctx, cancel := context.WithCancel(ctx)
  96. // TODO(maybe)
  97. // conn.SetTracer(&tracer{ctx: ctx})
  98. conn.SetInterrupt(ctx.Done())
  99. p.mu.Lock()
  100. defer p.mu.Unlock()
  101. p.all[conn] = cancel
  102. return conn
  103. case <-ctx.Done():
  104. case <-p.closed:
  105. }
  106. return nil
  107. }
  108. // Put puts an SQLite connection back into the Pool.
  109. //
  110. // Put will panic if the conn was not originally created by p. Put(nil) is a
  111. // no-op.
  112. //
  113. // Applications must ensure that all non-nil Conns returned from Get are
  114. // returned to the same Pool with Put.
  115. func (p *Pool) Put(conn *sqlite.Conn) {
  116. if conn == nil {
  117. // See https://github.com/zombiezen/go-sqlite/issues/17
  118. return
  119. }
  120. query := conn.CheckReset()
  121. if query != "" {
  122. panic(fmt.Sprintf(
  123. "connection returned to pool has active statement: %q",
  124. query))
  125. }
  126. p.mu.Lock()
  127. cancel, found := p.all[conn]
  128. if found {
  129. p.all[conn] = func() {}
  130. }
  131. p.mu.Unlock()
  132. if !found {
  133. panic("sqlite.Pool.Put: connection not created by this pool")
  134. }
  135. conn.SetInterrupt(nil)
  136. cancel()
  137. p.free <- conn
  138. }
  139. // Close interrupts and closes all the connections in the Pool,
  140. // blocking until all connections are returned to the Pool.
  141. func (p *Pool) Close() (err error) {
  142. close(p.closed)
  143. p.mu.Lock()
  144. n := len(p.all)
  145. cancelList := make([]context.CancelFunc, 0, n)
  146. for conn, cancel := range p.all {
  147. cancelList = append(cancelList, cancel)
  148. p.all[conn] = func() {}
  149. }
  150. p.mu.Unlock()
  151. for _, cancel := range cancelList {
  152. cancel()
  153. }
  154. for closed := 0; closed < n; closed++ {
  155. conn := <-p.free
  156. if err2 := conn.Close(); err == nil {
  157. err = err2
  158. }
  159. }
  160. return
  161. }
  162. type strerror struct {
  163. msg string
  164. }
  165. func (err strerror) Error() string { return err.msg }
  166. // TODO(maybe)
  167. // type tracer struct {
  168. // ctx context.Context
  169. // ctxStack []context.Context
  170. // taskStack []*trace.Task
  171. // }
  172. // func (t *tracer) pctx() context.Context {
  173. // if len(t.ctxStack) != 0 {
  174. // return t.ctxStack[len(t.ctxStack)-1]
  175. // }
  176. // return t.ctx
  177. // }
  178. // func (t *tracer) Push(name string) {
  179. // ctx, task := trace.NewTask(t.pctx(), name)
  180. // t.ctxStack = append(t.ctxStack, ctx)
  181. // t.taskStack = append(t.taskStack, task)
  182. // }
  183. // func (t *tracer) Pop() {
  184. // t.taskStack[len(t.taskStack)-1].End()
  185. // t.taskStack = t.taskStack[:len(t.taskStack)-1]
  186. // t.ctxStack = t.ctxStack[:len(t.ctxStack)-1]
  187. // }
  188. // func (t *tracer) NewTask(name string) sqlite.TracerTask {
  189. // ctx, task := trace.NewTask(t.pctx(), name)
  190. // return &tracerTask{
  191. // ctx: ctx,
  192. // task: task,
  193. // }
  194. // }
  195. // type tracerTask struct {
  196. // ctx context.Context
  197. // task *trace.Task
  198. // region *trace.Region
  199. // }
  200. // func (t *tracerTask) StartRegion(regionType string) {
  201. // if t.region != nil {
  202. // panic("sqlitex.tracerTask.StartRegion: already in region")
  203. // }
  204. // t.region = trace.StartRegion(t.ctx, regionType)
  205. // }
  206. // func (t *tracerTask) EndRegion() {
  207. // t.region.End()
  208. // t.region = nil
  209. // }
  210. // func (t *tracerTask) End() {
  211. // t.task.End()
  212. // }