clickhouse.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package clickhouse
  2. import (
  3. "bufio"
  4. "context"
  5. "database/sql"
  6. "database/sql/driver"
  7. "errors"
  8. "fmt"
  9. "net"
  10. "reflect"
  11. "regexp"
  12. "sync"
  13. "time"
  14. "github.com/ClickHouse/clickhouse-go/lib/binary"
  15. "github.com/ClickHouse/clickhouse-go/lib/column"
  16. "github.com/ClickHouse/clickhouse-go/lib/data"
  17. "github.com/ClickHouse/clickhouse-go/lib/protocol"
  18. "github.com/ClickHouse/clickhouse-go/lib/types"
  19. )
  20. type (
  21. Date = types.Date
  22. DateTime = types.DateTime
  23. UUID = types.UUID
  24. )
  25. type ExternalTable struct {
  26. Name string
  27. Values [][]driver.Value
  28. Columns []column.Column
  29. }
  30. var (
  31. ErrInsertInNotBatchMode = errors.New("insert statement supported only in the batch mode (use begin/commit)")
  32. ErrLimitDataRequestInTx = errors.New("data request has already been prepared in transaction")
  33. )
  34. var (
  35. splitInsertRe = regexp.MustCompile(`(?i)\sVALUES\s*\(`)
  36. )
  37. type logger func(format string, v ...interface{})
  38. type clickhouse struct {
  39. sync.Mutex
  40. data.ServerInfo
  41. data.ClientInfo
  42. logf logger
  43. conn *connect
  44. block *data.Block
  45. buffer *bufio.Writer
  46. decoder *binary.Decoder
  47. encoder *binary.Encoder
  48. settings *querySettings
  49. compress bool
  50. blockSize int
  51. inTransaction bool
  52. checkConnLiveness bool
  53. }
  54. func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) {
  55. return ch.prepareContext(context.Background(), query)
  56. }
  57. func (ch *clickhouse) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
  58. return ch.prepareContext(ctx, query)
  59. }
  60. func (ch *clickhouse) prepareContext(ctx context.Context, query string) (driver.Stmt, error) {
  61. ch.logf("[prepare] %s", query)
  62. switch {
  63. case ch.conn.closed:
  64. return nil, driver.ErrBadConn
  65. case ch.block != nil:
  66. return nil, ErrLimitDataRequestInTx
  67. case isInsert(query):
  68. if !ch.inTransaction {
  69. return nil, ErrInsertInNotBatchMode
  70. }
  71. return ch.insert(ctx, query)
  72. }
  73. return &stmt{
  74. ch: ch,
  75. query: query,
  76. numInput: numInput(query),
  77. }, nil
  78. }
  79. func (ch *clickhouse) insert(ctx context.Context, query string) (_ driver.Stmt, err error) {
  80. if err := ch.sendQuery(ctx, splitInsertRe.Split(query, -1)[0]+" VALUES ", nil); err != nil {
  81. return nil, err
  82. }
  83. if ch.block, err = ch.readMeta(); err != nil {
  84. return nil, err
  85. }
  86. return &stmt{
  87. ch: ch,
  88. isInsert: true,
  89. }, nil
  90. }
  91. func (ch *clickhouse) Begin() (driver.Tx, error) {
  92. return ch.beginTx(context.Background(), txOptions{})
  93. }
  94. func (ch *clickhouse) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
  95. return ch.beginTx(ctx, txOptions{
  96. Isolation: int(opts.Isolation),
  97. ReadOnly: opts.ReadOnly,
  98. })
  99. }
  100. type txOptions struct {
  101. Isolation int
  102. ReadOnly bool
  103. }
  104. func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse, error) {
  105. ch.logf("[begin] tx=%t, data=%t", ch.inTransaction, ch.block != nil)
  106. switch {
  107. case ch.inTransaction:
  108. return nil, sql.ErrTxDone
  109. case ch.conn.closed:
  110. return nil, driver.ErrBadConn
  111. }
  112. // Perform a stale connection check. We only perform this check in beginTx,
  113. // because database/sql retries driver.ErrBadConn only for first request,
  114. // but beginTx doesn't perform any other network interaction.
  115. if ch.checkConnLiveness {
  116. if err := ch.conn.connCheck(); err != nil {
  117. ch.logf("[begin] closing bad idle connection: %w", err)
  118. ch.Close()
  119. return ch, driver.ErrBadConn
  120. }
  121. }
  122. if finish := ch.watchCancel(ctx); finish != nil {
  123. defer finish()
  124. }
  125. ch.block = nil
  126. ch.inTransaction = true
  127. return ch, nil
  128. }
  129. func (ch *clickhouse) Commit() error {
  130. ch.logf("[commit] tx=%t, data=%t", ch.inTransaction, ch.block != nil)
  131. defer func() {
  132. if ch.block != nil {
  133. ch.block.Reset()
  134. ch.block = nil
  135. }
  136. ch.inTransaction = false
  137. }()
  138. switch {
  139. case !ch.inTransaction:
  140. return sql.ErrTxDone
  141. case ch.conn.closed:
  142. return driver.ErrBadConn
  143. }
  144. if ch.block != nil {
  145. if err := ch.writeBlock(ch.block, ""); err != nil {
  146. return err
  147. }
  148. // Send empty block as marker of end of data.
  149. if err := ch.writeBlock(&data.Block{}, ""); err != nil {
  150. return err
  151. }
  152. if err := ch.encoder.Flush(); err != nil {
  153. return err
  154. }
  155. return ch.process()
  156. }
  157. return nil
  158. }
  159. func (ch *clickhouse) Rollback() error {
  160. ch.logf("[rollback] tx=%t, data=%t", ch.inTransaction, ch.block != nil)
  161. if !ch.inTransaction {
  162. return sql.ErrTxDone
  163. }
  164. if ch.block != nil {
  165. ch.block.Reset()
  166. }
  167. ch.block = nil
  168. ch.buffer = nil
  169. ch.inTransaction = false
  170. return ch.conn.Close()
  171. }
  172. func (ch *clickhouse) CheckNamedValue(nv *driver.NamedValue) error {
  173. switch nv.Value.(type) {
  174. case ExternalTable, column.IP, column.UUID:
  175. return nil
  176. case nil, []byte, int8, int16, int32, int64, uint8, uint16, uint32, uint64, float32, float64, string, time.Time:
  177. return nil
  178. }
  179. switch v := nv.Value.(type) {
  180. case
  181. []int, []int8, []int16, []int32, []int64,
  182. []uint, []uint8, []uint16, []uint32, []uint64,
  183. []float32, []float64,
  184. []string:
  185. return nil
  186. case net.IP, *net.IP:
  187. return nil
  188. case driver.Valuer:
  189. value, err := v.Value()
  190. if err != nil {
  191. return err
  192. }
  193. nv.Value = value
  194. default:
  195. switch value := reflect.ValueOf(nv.Value); value.Kind() {
  196. case reflect.Slice:
  197. return nil
  198. case reflect.Bool:
  199. nv.Value = uint8(0)
  200. if value.Bool() {
  201. nv.Value = uint8(1)
  202. }
  203. case reflect.Int8:
  204. nv.Value = int8(value.Int())
  205. case reflect.Int16:
  206. nv.Value = int16(value.Int())
  207. case reflect.Int32:
  208. nv.Value = int32(value.Int())
  209. case reflect.Int64:
  210. nv.Value = value.Int()
  211. case reflect.Uint8:
  212. nv.Value = uint8(value.Uint())
  213. case reflect.Uint16:
  214. nv.Value = uint16(value.Uint())
  215. case reflect.Uint32:
  216. nv.Value = uint32(value.Uint())
  217. case reflect.Uint64:
  218. nv.Value = uint64(value.Uint())
  219. case reflect.Float32:
  220. nv.Value = float32(value.Float())
  221. case reflect.Float64:
  222. nv.Value = float64(value.Float())
  223. case reflect.String:
  224. nv.Value = value.String()
  225. }
  226. }
  227. return nil
  228. }
  229. func (ch *clickhouse) Close() error {
  230. ch.block = nil
  231. return ch.conn.Close()
  232. }
  233. func (ch *clickhouse) process() error {
  234. packet, err := ch.decoder.Uvarint()
  235. if err != nil {
  236. return err
  237. }
  238. for {
  239. switch packet {
  240. case protocol.ServerPong:
  241. ch.logf("[process] <- pong")
  242. return nil
  243. case protocol.ServerException:
  244. ch.logf("[process] <- exception")
  245. return ch.exception()
  246. case protocol.ServerProgress:
  247. progress, err := ch.progress()
  248. if err != nil {
  249. return err
  250. }
  251. ch.logf("[process] <- progress: rows=%d, bytes=%d, total rows=%d",
  252. progress.rows,
  253. progress.bytes,
  254. progress.totalRows,
  255. )
  256. case protocol.ServerProfileInfo:
  257. profileInfo, err := ch.profileInfo()
  258. if err != nil {
  259. return err
  260. }
  261. ch.logf("[process] <- profiling: rows=%d, bytes=%d, blocks=%d", profileInfo.rows, profileInfo.bytes, profileInfo.blocks)
  262. case protocol.ServerData:
  263. block, err := ch.readBlock()
  264. if err != nil {
  265. return err
  266. }
  267. ch.logf("[process] <- data: packet=%d, columns=%d, rows=%d", packet, block.NumColumns, block.NumRows)
  268. case protocol.ServerEndOfStream:
  269. ch.logf("[process] <- end of stream")
  270. return nil
  271. default:
  272. ch.conn.Close()
  273. return fmt.Errorf("[process] unexpected packet [%d] from server", packet)
  274. }
  275. if packet, err = ch.decoder.Uvarint(); err != nil {
  276. return err
  277. }
  278. }
  279. }
  280. func (ch *clickhouse) cancel() error {
  281. ch.logf("[cancel request]")
  282. // even if we fail to write the cancel, we still need to close
  283. err := ch.encoder.Uvarint(protocol.ClientCancel)
  284. if err == nil {
  285. err = ch.encoder.Flush()
  286. }
  287. // return the close error if there was one, otherwise return the write error
  288. if cerr := ch.conn.Close(); cerr != nil {
  289. return cerr
  290. }
  291. return err
  292. }
  293. func (ch *clickhouse) watchCancel(ctx context.Context) func() {
  294. if done := ctx.Done(); done != nil {
  295. finished := make(chan struct{})
  296. go func() {
  297. select {
  298. case <-done:
  299. ch.cancel()
  300. finished <- struct{}{}
  301. ch.logf("[cancel] <- done")
  302. case <-finished:
  303. ch.logf("[cancel] <- finished")
  304. }
  305. }()
  306. return func() {
  307. select {
  308. case <-finished:
  309. case finished <- struct{}{}:
  310. }
  311. }
  312. }
  313. return func() {}
  314. }
  315. func (ch *clickhouse) ExecContext(ctx context.Context, query string,
  316. args []driver.NamedValue) (driver.Result, error) {
  317. finish := ch.watchCancel(ctx)
  318. defer finish()
  319. stmt, err := ch.PrepareContext(ctx, query)
  320. if err != nil {
  321. return nil, err
  322. }
  323. dargs := make([]driver.Value, len(args))
  324. for i, nv := range args {
  325. dargs[i] = nv.Value
  326. }
  327. return stmt.Exec(dargs)
  328. }