sync.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clickhouse
  15. import (
  16. "fmt"
  17. "sort"
  18. "strings"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/log"
  22. "yunion.io/x/pkg/sortedstring"
  23. "yunion.io/x/pkg/util/stringutils"
  24. "yunion.io/x/pkg/utils"
  25. "yunion.io/x/sqlchemy"
  26. )
  27. func findTtlColumn(cols []sqlchemy.IColumnSpec) sColumnTTL {
  28. ret := sColumnTTL{}
  29. for _, col := range cols {
  30. if clickCol, ok := col.(IClickhouseColumnSpec); ok {
  31. c, u := clickCol.GetTTL()
  32. if c > 0 && len(u) > 0 {
  33. ret = sColumnTTL{
  34. ColName: clickCol.Name(),
  35. sTTL: sTTL{
  36. Count: c,
  37. Unit: u,
  38. },
  39. }
  40. }
  41. }
  42. }
  43. return ret
  44. }
  45. func findOrderByColumns(cols []sqlchemy.IColumnSpec) []string {
  46. var ret []string
  47. for _, col := range cols {
  48. if clickCol, ok := col.(IClickhouseColumnSpec); ok {
  49. if (clickCol.IsOrderBy() || clickCol.IsPrimary()) && len(clickCol.PartitionBy()) == 0 {
  50. ret = append(ret, clickCol.Name())
  51. }
  52. }
  53. }
  54. return ret
  55. }
  56. func findPartitions(cols []sqlchemy.IColumnSpec) []string {
  57. parts := make([]string, 0)
  58. for i := range cols {
  59. if c, ok := cols[i].(IClickhouseColumnSpec); ok {
  60. part := strings.ReplaceAll(c.PartitionBy(), " ", "")
  61. if len(part) > 0 && !utils.IsInStringArray(part, parts) {
  62. parts = append(parts, part)
  63. }
  64. }
  65. }
  66. sort.Strings(parts)
  67. return parts
  68. }
  69. func arrayContainsWord(strs []string, word string) bool {
  70. for _, str := range strs {
  71. if stringutils.ContainsWord(str, word) {
  72. return true
  73. }
  74. }
  75. return false
  76. }
  77. func (clickhouse *SClickhouseBackend) CommitTableChangeSQL(ts sqlchemy.ITableSpec, changes sqlchemy.STableChanges) []string {
  78. needCopyTable := false
  79. alters := make([]string, 0)
  80. // first check if primary key is modifed
  81. changePrimary := false
  82. for _, col := range changes.RemoveColumns {
  83. if col.IsPrimary() {
  84. changePrimary = true
  85. }
  86. }
  87. for _, cols := range changes.UpdatedColumns {
  88. if cols.OldCol.IsPrimary() != cols.NewCol.IsPrimary() {
  89. changePrimary = true
  90. }
  91. }
  92. for _, col := range changes.AddColumns {
  93. if col.IsPrimary() {
  94. changePrimary = true
  95. }
  96. }
  97. if changePrimary {
  98. log.Infof("primary key changed")
  99. needCopyTable = true
  100. }
  101. // if changePrimary && oldHasPrimary {
  102. // sql := fmt.Sprintf("DROP PRIMARY KEY")
  103. // alters = append(alters, sql)
  104. // }
  105. /* IGNORE DROP STATEMENT */
  106. for _, col := range changes.RemoveColumns {
  107. sql := fmt.Sprintf("DROP COLUMN `%s`", col.Name())
  108. log.Debugf("skip ALTER TABLE %s %s;", ts.Name(), sql)
  109. // alters = append(alters, sql)
  110. // ignore drop statement
  111. // if the column is auto_increment integer column,
  112. // then need to drop auto_increment attribute
  113. if col.IsAutoIncrement() {
  114. // make sure the column is nullable
  115. col.SetNullable(true)
  116. log.Errorf("column %s is auto_increment, drop auto_inrement attribute", col.Name())
  117. col.SetAutoIncrement(false)
  118. sql := fmt.Sprintf("MODIFY COLUMN %s", col.DefinitionString())
  119. alters = append(alters, sql)
  120. }
  121. // if the column is not nullable but no default
  122. // then need to drop the not-nullable attribute
  123. if !col.IsNullable() && col.Default() == "" {
  124. col.SetNullable(true)
  125. sql := fmt.Sprintf("MODIFY COLUMN %s", col.DefinitionString())
  126. alters = append(alters, sql)
  127. log.Errorf("column %s is not nullable but no default, drop not nullable attribute", col.Name())
  128. }
  129. }
  130. oldPartitions := findPartitions(changes.OldColumns)
  131. for _, cols := range changes.UpdatedColumns {
  132. if cols.OldCol.Name() != cols.NewCol.Name() {
  133. sql := fmt.Sprintf("RENAME COLUMN %s TO %s", cols.OldCol.Name(), cols.NewCol.Name())
  134. alters = append(alters, sql)
  135. } else if cols.OldCol.IsNullable() && !cols.NewCol.IsNullable() && arrayContainsWord(oldPartitions, cols.NewCol.Name()) {
  136. needCopyTable = true
  137. } else {
  138. sql := fmt.Sprintf("MODIFY COLUMN %s", cols.NewCol.DefinitionString())
  139. alters = append(alters, sql)
  140. }
  141. }
  142. for _, col := range changes.AddColumns {
  143. sql := fmt.Sprintf("ADD COLUMN %s", col.DefinitionString())
  144. alters = append(alters, sql)
  145. }
  146. /*if changePrimary {
  147. primaries := make([]string, 0)
  148. for _, c := range ts.Columns() {
  149. if c.IsPrimary() {
  150. primaries = append(primaries, fmt.Sprintf("`%s`", c.Name()))
  151. }
  152. }
  153. if len(primaries) > 0 {
  154. sql := fmt.Sprintf("ADD PRIMARY KEY(%s)", strings.Join(primaries, ", "))
  155. alters = append(alters, sql)
  156. }
  157. }*/
  158. // check TTL
  159. {
  160. oldTtlSpec := findTtlColumn(changes.OldColumns)
  161. newTtlSpec := findTtlColumn(ts.Columns())
  162. log.Debugf("old: %s new: %s", jsonutils.Marshal(oldTtlSpec), jsonutils.Marshal(newTtlSpec))
  163. if oldTtlSpec != newTtlSpec {
  164. if oldTtlSpec.Count > 0 && newTtlSpec.Count == 0 {
  165. // remove
  166. sql := fmt.Sprintf("REMOVE TTL")
  167. alters = append(alters, sql)
  168. } else {
  169. // alter
  170. sql := fmt.Sprintf("MODIFY TTL `%s` + INTERVAL %d %s", newTtlSpec.ColName, newTtlSpec.Count, newTtlSpec.Unit)
  171. alters = append(alters, sql)
  172. }
  173. }
  174. }
  175. // check order by
  176. {
  177. oldOrderBys := findOrderByColumns(changes.OldColumns)
  178. newOrderBys := findOrderByColumns(ts.Columns())
  179. log.Debugf("old: %s new: %s", jsonutils.Marshal(oldOrderBys), jsonutils.Marshal(newOrderBys))
  180. if jsonutils.Marshal(oldOrderBys).String() != jsonutils.Marshal(newOrderBys).String() {
  181. // alter
  182. altered := false
  183. for i := range newOrderBys {
  184. if !utils.IsInStringArray(newOrderBys[i], oldOrderBys) {
  185. oldOrderBys = append(oldOrderBys, newOrderBys[i])
  186. altered = true
  187. }
  188. }
  189. if altered {
  190. sql := fmt.Sprintf("MODIFY ORDER BY (%s)", strings.Join(oldOrderBys, ", "))
  191. alters = append(alters, sql)
  192. }
  193. }
  194. }
  195. // check partitions
  196. newPartitions := findPartitions(ts.Columns())
  197. if !sortedstring.Equals(oldPartitions, newPartitions) {
  198. log.Infof("partition inconsistemt: old=%s new=%s", oldPartitions, newPartitions)
  199. needCopyTable = true
  200. }
  201. ret := make([]string, 0)
  202. // needCopyTable
  203. if needCopyTable {
  204. // create new table
  205. alterTableName := fmt.Sprintf("%s_tmp_%d", ts.Name(), time.Now().Unix())
  206. alterTable := ts.(*sqlchemy.STableSpec).Clone(alterTableName, 0)
  207. createSqls := alterTable.CreateSQLs()
  208. ret = append(ret, createSqls...)
  209. colNames := make([]string, 0)
  210. for _, c := range ts.Columns() {
  211. colNames = append(colNames, fmt.Sprintf("`%s`", c.Name()))
  212. }
  213. colNamesStr := strings.Join(colNames, ",")
  214. // copy data
  215. sql := fmt.Sprintf("INSERT INTO `%s` (%s) SELECT %s FROM `%s`", alterTableName, colNamesStr, colNamesStr, ts.Name())
  216. ret = append(ret, sql)
  217. // rename tables
  218. sql = fmt.Sprintf("RENAME TABLE `%s` TO `%s_backup`", ts.Name(), alterTableName)
  219. ret = append(ret, sql)
  220. sql = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", alterTableName, ts.Name())
  221. ret = append(ret, sql)
  222. } else if len(alters) > 0 {
  223. tableSpec := ts.(*sqlchemy.STableSpec)
  224. if tableSpec.IsLinked {
  225. // if the table is a linked table, simply re-create the table
  226. ret = append(ret, fmt.Sprintf("DROP TABLE IF EXISTS `%s`", tableSpec.Name()))
  227. createSqls := tableSpec.CreateSQLs()
  228. ret = append(ret, createSqls...)
  229. } else {
  230. sql := fmt.Sprintf("ALTER TABLE `%s` %s;", ts.Name(), strings.Join(alters, ", "))
  231. ret = append(ret, sql)
  232. }
  233. }
  234. return ret
  235. }