bootstrap.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package clickhouse
  2. import (
  3. "bufio"
  4. "database/sql"
  5. "database/sql/driver"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net/url"
  10. "os"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "github.com/ClickHouse/clickhouse-go/lib/binary"
  17. "github.com/ClickHouse/clickhouse-go/lib/data"
  18. "github.com/ClickHouse/clickhouse-go/lib/protocol"
  19. )
  20. const (
  21. // DefaultDatabase when connecting to ClickHouse
  22. DefaultDatabase = "default"
  23. // DefaultUsername when connecting to ClickHouse
  24. DefaultUsername = "default"
  25. // DefaultConnTimeout when connecting to ClickHouse
  26. DefaultConnTimeout = 5 * time.Second
  27. // DefaultReadTimeout when reading query results
  28. DefaultReadTimeout = time.Minute
  29. // DefaultWriteTimeout when sending queries
  30. DefaultWriteTimeout = time.Minute
  31. )
  32. var (
  33. unixtime int64
  34. logOutput io.Writer = os.Stdout
  35. hostname, _ = os.Hostname()
  36. poolInit sync.Once
  37. )
  38. func init() {
  39. sql.Register("clickhouse", &bootstrap{})
  40. go func() {
  41. for tick := time.Tick(time.Second); ; {
  42. select {
  43. case <-tick:
  44. atomic.AddInt64(&unixtime, int64(time.Second))
  45. }
  46. }
  47. }()
  48. }
  49. func now() time.Time {
  50. return time.Unix(0, atomic.LoadInt64(&unixtime))
  51. }
  52. type bootstrap struct{}
  53. func (d *bootstrap) Open(dsn string) (driver.Conn, error) {
  54. return Open(dsn)
  55. }
  56. // SetLogOutput allows to change output of the default logger
  57. func SetLogOutput(output io.Writer) {
  58. logOutput = output
  59. }
  60. // Open the connection
  61. func Open(dsn string) (driver.Conn, error) {
  62. clickhouse, err := open(dsn)
  63. if err != nil {
  64. return nil, err
  65. }
  66. return clickhouse, err
  67. }
  68. func open(dsn string) (*clickhouse, error) {
  69. url, err := url.Parse(dsn)
  70. if err != nil {
  71. return nil, err
  72. }
  73. var (
  74. hosts = []string{url.Host}
  75. query = url.Query()
  76. secure = false
  77. skipVerify = false
  78. tlsConfigName = query.Get("tls_config")
  79. noDelay = true
  80. compress = false
  81. database = query.Get("database")
  82. username = query.Get("username")
  83. password = query.Get("password")
  84. blockSize = 1000000
  85. connTimeout = DefaultConnTimeout
  86. readTimeout = DefaultReadTimeout
  87. writeTimeout = DefaultWriteTimeout
  88. connOpenStrategy = connOpenRandom
  89. checkConnLiveness = true
  90. )
  91. if len(database) == 0 {
  92. database = DefaultDatabase
  93. }
  94. if len(username) == 0 {
  95. username = DefaultUsername
  96. }
  97. if v, err := strconv.ParseBool(query.Get("no_delay")); err == nil {
  98. noDelay = v
  99. }
  100. tlsConfig := getTLSConfigClone(tlsConfigName)
  101. if tlsConfigName != "" && tlsConfig == nil {
  102. return nil, fmt.Errorf("invalid tls_config - no config registered under name %s", tlsConfigName)
  103. }
  104. secure = tlsConfig != nil
  105. if v, err := strconv.ParseBool(query.Get("secure")); err == nil {
  106. secure = v
  107. }
  108. if v, err := strconv.ParseBool(query.Get("skip_verify")); err == nil {
  109. skipVerify = v
  110. }
  111. if duration, err := strconv.ParseFloat(query.Get("timeout"), 64); err == nil {
  112. connTimeout = time.Duration(duration * float64(time.Second))
  113. }
  114. if duration, err := strconv.ParseFloat(query.Get("read_timeout"), 64); err == nil {
  115. readTimeout = time.Duration(duration * float64(time.Second))
  116. }
  117. if duration, err := strconv.ParseFloat(query.Get("write_timeout"), 64); err == nil {
  118. writeTimeout = time.Duration(duration * float64(time.Second))
  119. }
  120. if size, err := strconv.ParseInt(query.Get("block_size"), 10, 64); err == nil {
  121. blockSize = int(size)
  122. }
  123. if altHosts := strings.Split(query.Get("alt_hosts"), ","); len(altHosts) != 0 {
  124. for _, host := range altHosts {
  125. if len(host) != 0 {
  126. hosts = append(hosts, host)
  127. }
  128. }
  129. }
  130. switch query.Get("connection_open_strategy") {
  131. case "random":
  132. connOpenStrategy = connOpenRandom
  133. case "in_order":
  134. connOpenStrategy = connOpenInOrder
  135. case "time_random":
  136. connOpenStrategy = connOpenTimeRandom
  137. }
  138. settings, err := makeQuerySettings(query)
  139. if err != nil {
  140. return nil, err
  141. }
  142. if v, err := strconv.ParseBool(query.Get("compress")); err == nil {
  143. compress = v
  144. }
  145. if v, err := strconv.ParseBool(query.Get("check_connection_liveness")); err == nil {
  146. checkConnLiveness = v
  147. }
  148. if secure {
  149. // There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn
  150. checkConnLiveness = false
  151. }
  152. var (
  153. ch = clickhouse{
  154. logf: func(string, ...interface{}) {},
  155. settings: settings,
  156. compress: compress,
  157. blockSize: blockSize,
  158. checkConnLiveness: checkConnLiveness,
  159. ServerInfo: data.ServerInfo{
  160. Timezone: time.Local,
  161. },
  162. }
  163. logger = log.New(logOutput, "[clickhouse]", 0)
  164. )
  165. if debug, err := strconv.ParseBool(url.Query().Get("debug")); err == nil && debug {
  166. ch.logf = logger.Printf
  167. }
  168. ch.logf("host(s)=%s, database=%s, username=%s",
  169. strings.Join(hosts, ", "),
  170. database,
  171. username,
  172. )
  173. options := connOptions{
  174. secure: secure,
  175. tlsConfig: tlsConfig,
  176. skipVerify: skipVerify,
  177. hosts: hosts,
  178. connTimeout: connTimeout,
  179. readTimeout: readTimeout,
  180. writeTimeout: writeTimeout,
  181. noDelay: noDelay,
  182. openStrategy: connOpenStrategy,
  183. logf: ch.logf,
  184. }
  185. if ch.conn, err = dial(options); err != nil {
  186. return nil, err
  187. }
  188. logger.SetPrefix(fmt.Sprintf("[clickhouse][connect=%d]", ch.conn.ident))
  189. ch.buffer = bufio.NewWriter(ch.conn)
  190. ch.decoder = binary.NewDecoderWithCompress(ch.conn)
  191. ch.encoder = binary.NewEncoderWithCompress(ch.buffer)
  192. if err := ch.hello(database, username, password); err != nil {
  193. ch.conn.Close()
  194. return nil, err
  195. }
  196. return &ch, nil
  197. }
  198. func (ch *clickhouse) hello(database, username, password string) error {
  199. ch.logf("[hello] -> %s", ch.ClientInfo)
  200. {
  201. ch.encoder.Uvarint(protocol.ClientHello)
  202. if err := ch.ClientInfo.Write(ch.encoder); err != nil {
  203. return err
  204. }
  205. {
  206. ch.encoder.String(database)
  207. ch.encoder.String(username)
  208. ch.encoder.String(password)
  209. }
  210. if err := ch.encoder.Flush(); err != nil {
  211. return err
  212. }
  213. }
  214. {
  215. packet, err := ch.decoder.Uvarint()
  216. if err != nil {
  217. return err
  218. }
  219. switch packet {
  220. case protocol.ServerException:
  221. return ch.exception()
  222. case protocol.ServerHello:
  223. if err := ch.ServerInfo.Read(ch.decoder); err != nil {
  224. return err
  225. }
  226. case protocol.ServerEndOfStream:
  227. ch.logf("[bootstrap] <- end of stream")
  228. return nil
  229. default:
  230. return fmt.Errorf("[hello] unexpected packet [%d] from server", packet)
  231. }
  232. }
  233. ch.logf("[hello] <- %s", ch.ServerInfo)
  234. return nil
  235. }