clickhouse.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clickhouse
  15. import (
  16. "bytes"
  17. "fmt"
  18. "reflect"
  19. "strings"
  20. _ "github.com/ClickHouse/clickhouse-go"
  21. "yunion.io/x/pkg/errors"
  22. "yunion.io/x/pkg/gotypes"
  23. "yunion.io/x/pkg/tristate"
  24. "yunion.io/x/pkg/util/stringutils"
  25. "yunion.io/x/pkg/utils"
  26. "yunion.io/x/sqlchemy"
  27. )
  28. func init() {
  29. sqlchemy.RegisterBackend(&SClickhouseBackend{})
  30. }
  31. type SClickhouseBackend struct {
  32. sqlchemy.SBaseBackend
  33. }
  34. func (click *SClickhouseBackend) Name() sqlchemy.DBBackendName {
  35. return sqlchemy.ClickhouseBackend
  36. }
  37. func (click *SClickhouseBackend) CaseInsensitiveLikeString() string {
  38. return "ILIKE"
  39. }
  40. func (click *SClickhouseBackend) RegexpWhereClause(cond *sqlchemy.SRegexpConition) string {
  41. var buf bytes.Buffer
  42. buf.WriteString("match(")
  43. buf.WriteString(cond.GetLeft().Reference())
  44. buf.WriteString(", ")
  45. buf.WriteString(sqlchemy.VarConditionWhereClause(cond.GetRight()))
  46. buf.WriteString(")")
  47. return buf.String()
  48. }
  49. // CanUpdate returns wether the backend supports update
  50. func (click *SClickhouseBackend) CanUpdate() bool {
  51. return true
  52. }
  53. // CanInsert returns wether the backend supports Insert
  54. func (click *SClickhouseBackend) CanInsert() bool {
  55. return true
  56. }
  57. // CanInsertOrUpdate returns weather the backend supports InsertOrUpdate
  58. func (click *SClickhouseBackend) CanInsertOrUpdate() bool {
  59. return false
  60. }
  61. func (click *SClickhouseBackend) IsSupportIndexAndContraints() bool {
  62. return false
  63. }
  64. func (click *SClickhouseBackend) CanSupportRowAffected() bool {
  65. return false
  66. }
  67. func (click *SClickhouseBackend) CurrentUTCTimeStampString() string {
  68. return "NOW('UTC')"
  69. }
  70. func (click *SClickhouseBackend) CurrentTimeStampString() string {
  71. return "NOW()"
  72. }
  73. func (click *SClickhouseBackend) UnionAllString() string {
  74. return "UNION ALL"
  75. }
  76. func (click *SClickhouseBackend) UnionDistinctString() string {
  77. return "UNION DISTINCT"
  78. }
  79. func (click *SClickhouseBackend) SupportMixedInsertVariables() bool {
  80. return false
  81. }
  82. func (click *SClickhouseBackend) UpdateSQLTemplate() string {
  83. return "ALTER TABLE `{{ .Table }}` UPDATE {{ .Columns }} WHERE {{ .Conditions }}"
  84. }
  85. func MySQLExtraOptions(hostport, database, table, user, passwd string) sqlchemy.TableExtraOptions {
  86. return sqlchemy.TableExtraOptions{
  87. EXTRA_OPTION_ENGINE_KEY: EXTRA_OPTION_ENGINE_VALUE_MYSQL,
  88. EXTRA_OPTION_CLICKHOUSE_MYSQL_HOSTPORT_KEY: hostport,
  89. EXTRA_OPTION_CLICKHOUSE_MYSQL_DATABASE_KEY: database,
  90. EXTRA_OPTION_CLICKHOUSE_MYSQL_TABLE_KEY: table,
  91. EXTRA_OPTION_CLICKHOUSE_MYSQL_USERNAME_KEY: user,
  92. EXTRA_OPTION_CLICKHOUSE_MYSQL_PASSWORD_KEY: passwd,
  93. }
  94. }
  95. func (click *SClickhouseBackend) GetCreateSQLs(ts sqlchemy.ITableSpec) []string {
  96. cols := make([]string, 0)
  97. primaries := make([]string, 0)
  98. orderbys := make([]string, 0)
  99. partitions := make([]string, 0)
  100. var ttlCol IClickhouseColumnSpec
  101. for _, c := range ts.Columns() {
  102. cols = append(cols, c.DefinitionString())
  103. if c.IsPrimary() {
  104. primaries = append(primaries, fmt.Sprintf("`%s`", c.Name()))
  105. }
  106. if cc, ok := c.(IClickhouseColumnSpec); ok {
  107. if cc.IsOrderBy() {
  108. orderbys = append(orderbys, fmt.Sprintf("`%s`", c.Name()))
  109. }
  110. partition := cc.PartitionBy()
  111. if len(partition) > 0 && !utils.IsInStringArray(partition, partitions) {
  112. partitions = append(partitions, partition)
  113. }
  114. ttlC, ttlU := cc.GetTTL()
  115. if ttlC > 0 && len(ttlU) > 0 {
  116. ttlCol = cc
  117. }
  118. }
  119. }
  120. createSql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (\n%s\n) ENGINE = ", ts.Name(), strings.Join(cols, ",\n"))
  121. extraOpts := ts.GetExtraOptions()
  122. engine := extraOpts.Get(EXTRA_OPTION_ENGINE_KEY)
  123. switch engine {
  124. case EXTRA_OPTION_ENGINE_VALUE_MYSQL:
  125. // mysql
  126. createSql += fmt.Sprintf("MySQL('%s', '%s', '%s', '%s', '%s')",
  127. extraOpts.Get(EXTRA_OPTION_CLICKHOUSE_MYSQL_HOSTPORT_KEY),
  128. extraOpts.Get(EXTRA_OPTION_CLICKHOUSE_MYSQL_DATABASE_KEY),
  129. extraOpts.Get(EXTRA_OPTION_CLICKHOUSE_MYSQL_TABLE_KEY),
  130. extraOpts.Get(EXTRA_OPTION_CLICKHOUSE_MYSQL_USERNAME_KEY),
  131. extraOpts.Get(EXTRA_OPTION_CLICKHOUSE_MYSQL_PASSWORD_KEY),
  132. )
  133. default:
  134. // mergetree
  135. createSql += "MergeTree()"
  136. if len(orderbys) == 0 {
  137. orderbys = primaries
  138. }
  139. if len(partitions) > 0 {
  140. createSql += fmt.Sprintf("\nPARTITION BY (%s)", strings.Join(partitions, ", "))
  141. }
  142. if len(primaries) > 0 {
  143. createSql += fmt.Sprintf("\nPRIMARY KEY (%s)", strings.Join(primaries, ", "))
  144. newOrderBys := make([]string, len(primaries))
  145. copy(newOrderBys, primaries)
  146. for _, f := range orderbys {
  147. if !utils.IsInStringArray(f, newOrderBys) {
  148. newOrderBys = append(newOrderBys, f)
  149. }
  150. }
  151. orderbys = newOrderBys
  152. }
  153. if len(orderbys) > 0 {
  154. createSql += fmt.Sprintf("\nORDER BY (%s)", strings.Join(orderbys, ", "))
  155. } else {
  156. createSql += "\nORDER BY tuple()"
  157. }
  158. if ttlCol != nil {
  159. ttlCount, ttlUnit := ttlCol.GetTTL()
  160. createSql += fmt.Sprintf("\nTTL `%s` + INTERVAL %d %s", ttlCol.Name(), ttlCount, ttlUnit)
  161. }
  162. // set default time zone of table to UTC
  163. createSql += "\nSETTINGS index_granularity=8192"
  164. }
  165. return []string{
  166. createSql,
  167. }
  168. }
  169. func (click *SClickhouseBackend) FetchTableColumnSpecs(ts sqlchemy.ITableSpec) ([]sqlchemy.IColumnSpec, error) {
  170. sql := fmt.Sprintf("DESCRIBE `%s`", ts.Name())
  171. query := ts.Database().NewRawQuery(sql, "name", "type", "default_type", "default_expression", "comment", "codec_expression", "ttl_expression")
  172. infos := make([]sSqlColumnInfo, 0)
  173. err := query.All(&infos)
  174. if err != nil {
  175. return nil, errors.Wrap(err, "describe table")
  176. }
  177. specs := make([]sqlchemy.IColumnSpec, 0)
  178. for _, info := range infos {
  179. spec := info.toColumnSpec()
  180. specs = append(specs, spec)
  181. }
  182. sql = fmt.Sprintf("SHOW CREATE TABLE `%s`", ts.Name())
  183. query = ts.Database().NewRawQuery(sql, "statement")
  184. row := query.Row()
  185. var defStr string
  186. err = row.Scan(&defStr)
  187. if err != nil {
  188. return nil, errors.Wrap(err, "show create table")
  189. }
  190. primaries, orderbys, partitions, ttl := parseCreateTable(defStr)
  191. var ttlCfg sColumnTTL
  192. if len(ttl) > 0 {
  193. ttlCfg, err = parseTTLExpression(ttl)
  194. if err != nil {
  195. return nil, errors.Wrap(err, "parseTTLExpression")
  196. }
  197. }
  198. for _, spec := range specs {
  199. if utils.IsInStringArray(spec.Name(), primaries) {
  200. spec.SetPrimary(true)
  201. }
  202. if clickSpec, ok := spec.(IClickhouseColumnSpec); ok {
  203. if utils.IsInStringArray(clickSpec.Name(), orderbys) {
  204. clickSpec.SetOrderBy(true)
  205. }
  206. for _, part := range partitions {
  207. if stringutils.ContainsWord(part, clickSpec.Name()) {
  208. clickSpec.SetPartitionBy(part)
  209. }
  210. }
  211. if ttlCfg.ColName == clickSpec.Name() {
  212. clickSpec.SetTTL(ttlCfg.Count, ttlCfg.Unit)
  213. }
  214. }
  215. }
  216. return specs, nil
  217. }
  218. func (click *SClickhouseBackend) GetColumnSpecByFieldType(table *sqlchemy.STableSpec, fieldType reflect.Type, fieldname string, tagmap map[string]string, isPointer bool) sqlchemy.IColumnSpec {
  219. extraOpts := table.GetExtraOptions()
  220. engine := extraOpts.Get(EXTRA_OPTION_ENGINE_KEY)
  221. isMySQLEngine := false
  222. switch engine {
  223. case EXTRA_OPTION_ENGINE_VALUE_MYSQL:
  224. isMySQLEngine = true
  225. }
  226. colSpec := click.getColumnSpecByFieldTypeInternal(table, fieldType, fieldname, tagmap, isPointer)
  227. if isMySQLEngine && colSpec.IsPrimary() {
  228. colSpec.SetPrimary(false)
  229. }
  230. return colSpec
  231. }
  232. func (click *SClickhouseBackend) getColumnSpecByFieldTypeInternal(table *sqlchemy.STableSpec, fieldType reflect.Type, fieldname string, tagmap map[string]string, isPointer bool) sqlchemy.IColumnSpec {
  233. switch fieldType {
  234. case tristate.TriStateType:
  235. col := NewTristateColumn(table.Name(), fieldname, tagmap, isPointer)
  236. return &col
  237. case gotypes.TimeType:
  238. col := NewDateTimeColumn(fieldname, tagmap, isPointer)
  239. return &col
  240. }
  241. switch fieldType.Kind() {
  242. case reflect.String:
  243. col := NewTextColumn(fieldname, "String", tagmap, isPointer)
  244. return &col
  245. case reflect.Int, reflect.Int32:
  246. col := NewIntegerColumn(fieldname, "Int32", tagmap, isPointer)
  247. return &col
  248. case reflect.Int8:
  249. col := NewIntegerColumn(fieldname, "Int8", tagmap, isPointer)
  250. return &col
  251. case reflect.Int16:
  252. col := NewIntegerColumn(fieldname, "Int16", tagmap, isPointer)
  253. return &col
  254. case reflect.Int64:
  255. col := NewIntegerColumn(fieldname, "Int64", tagmap, isPointer)
  256. return &col
  257. case reflect.Uint, reflect.Uint32:
  258. col := NewIntegerColumn(fieldname, "UInt32", tagmap, isPointer)
  259. return &col
  260. case reflect.Uint8:
  261. col := NewIntegerColumn(fieldname, "UInt8", tagmap, isPointer)
  262. return &col
  263. case reflect.Uint16:
  264. col := NewIntegerColumn(fieldname, "UInt16", tagmap, isPointer)
  265. return &col
  266. case reflect.Uint64:
  267. col := NewIntegerColumn(fieldname, "UInt64", tagmap, isPointer)
  268. return &col
  269. case reflect.Bool:
  270. col := NewBooleanColumn(fieldname, tagmap, isPointer)
  271. return &col
  272. case reflect.Float32:
  273. if _, ok := tagmap[sqlchemy.TAG_WIDTH]; ok {
  274. col := NewDecimalColumn(fieldname, tagmap, isPointer)
  275. return &col
  276. }
  277. col := NewFloatColumn(fieldname, "Float32", tagmap, isPointer)
  278. return &col
  279. case reflect.Float64:
  280. if _, ok := tagmap[sqlchemy.TAG_WIDTH]; ok {
  281. col := NewDecimalColumn(fieldname, tagmap, isPointer)
  282. return &col
  283. }
  284. col := NewFloatColumn(fieldname, "Float64", tagmap, isPointer)
  285. return &col
  286. case reflect.Map, reflect.Slice:
  287. col := NewCompoundColumn(fieldname, tagmap, isPointer)
  288. return &col
  289. }
  290. if fieldType.Implements(gotypes.ISerializableType) {
  291. col := NewCompoundColumn(fieldname, tagmap, isPointer)
  292. return &col
  293. }
  294. return nil
  295. }