| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package clickhouse
- import (
- "fmt"
- "sort"
- "strings"
- "time"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/sortedstring"
- "yunion.io/x/pkg/util/stringutils"
- "yunion.io/x/pkg/utils"
- "yunion.io/x/sqlchemy"
- )
- func findTtlColumn(cols []sqlchemy.IColumnSpec) sColumnTTL {
- ret := sColumnTTL{}
- for _, col := range cols {
- if clickCol, ok := col.(IClickhouseColumnSpec); ok {
- c, u := clickCol.GetTTL()
- if c > 0 && len(u) > 0 {
- ret = sColumnTTL{
- ColName: clickCol.Name(),
- sTTL: sTTL{
- Count: c,
- Unit: u,
- },
- }
- }
- }
- }
- return ret
- }
- func findOrderByColumns(cols []sqlchemy.IColumnSpec) []string {
- var ret []string
- for _, col := range cols {
- if clickCol, ok := col.(IClickhouseColumnSpec); ok {
- if (clickCol.IsOrderBy() || clickCol.IsPrimary()) && len(clickCol.PartitionBy()) == 0 {
- ret = append(ret, clickCol.Name())
- }
- }
- }
- return ret
- }
- func findPartitions(cols []sqlchemy.IColumnSpec) []string {
- parts := make([]string, 0)
- for i := range cols {
- if c, ok := cols[i].(IClickhouseColumnSpec); ok {
- part := strings.ReplaceAll(c.PartitionBy(), " ", "")
- if len(part) > 0 && !utils.IsInStringArray(part, parts) {
- parts = append(parts, part)
- }
- }
- }
- sort.Strings(parts)
- return parts
- }
- func arrayContainsWord(strs []string, word string) bool {
- for _, str := range strs {
- if stringutils.ContainsWord(str, word) {
- return true
- }
- }
- return false
- }
- func (clickhouse *SClickhouseBackend) CommitTableChangeSQL(ts sqlchemy.ITableSpec, changes sqlchemy.STableChanges) []string {
- needCopyTable := false
- alters := make([]string, 0)
- // first check if primary key is modifed
- changePrimary := false
- for _, col := range changes.RemoveColumns {
- if col.IsPrimary() {
- changePrimary = true
- }
- }
- for _, cols := range changes.UpdatedColumns {
- if cols.OldCol.IsPrimary() != cols.NewCol.IsPrimary() {
- changePrimary = true
- }
- }
- for _, col := range changes.AddColumns {
- if col.IsPrimary() {
- changePrimary = true
- }
- }
- if changePrimary {
- log.Infof("primary key changed")
- needCopyTable = true
- }
- // if changePrimary && oldHasPrimary {
- // sql := fmt.Sprintf("DROP PRIMARY KEY")
- // alters = append(alters, sql)
- // }
- /* IGNORE DROP STATEMENT */
- for _, col := range changes.RemoveColumns {
- sql := fmt.Sprintf("DROP COLUMN `%s`", col.Name())
- log.Debugf("skip ALTER TABLE %s %s;", ts.Name(), sql)
- // alters = append(alters, sql)
- // ignore drop statement
- // if the column is auto_increment integer column,
- // then need to drop auto_increment attribute
- if col.IsAutoIncrement() {
- // make sure the column is nullable
- col.SetNullable(true)
- log.Errorf("column %s is auto_increment, drop auto_inrement attribute", col.Name())
- col.SetAutoIncrement(false)
- sql := fmt.Sprintf("MODIFY COLUMN %s", col.DefinitionString())
- alters = append(alters, sql)
- }
- // if the column is not nullable but no default
- // then need to drop the not-nullable attribute
- if !col.IsNullable() && col.Default() == "" {
- col.SetNullable(true)
- sql := fmt.Sprintf("MODIFY COLUMN %s", col.DefinitionString())
- alters = append(alters, sql)
- log.Errorf("column %s is not nullable but no default, drop not nullable attribute", col.Name())
- }
- }
- oldPartitions := findPartitions(changes.OldColumns)
- for _, cols := range changes.UpdatedColumns {
- if cols.OldCol.Name() != cols.NewCol.Name() {
- sql := fmt.Sprintf("RENAME COLUMN %s TO %s", cols.OldCol.Name(), cols.NewCol.Name())
- alters = append(alters, sql)
- } else if cols.OldCol.IsNullable() && !cols.NewCol.IsNullable() && arrayContainsWord(oldPartitions, cols.NewCol.Name()) {
- needCopyTable = true
- } else {
- sql := fmt.Sprintf("MODIFY COLUMN %s", cols.NewCol.DefinitionString())
- alters = append(alters, sql)
- }
- }
- for _, col := range changes.AddColumns {
- sql := fmt.Sprintf("ADD COLUMN %s", col.DefinitionString())
- alters = append(alters, sql)
- }
- /*if changePrimary {
- primaries := make([]string, 0)
- for _, c := range ts.Columns() {
- if c.IsPrimary() {
- primaries = append(primaries, fmt.Sprintf("`%s`", c.Name()))
- }
- }
- if len(primaries) > 0 {
- sql := fmt.Sprintf("ADD PRIMARY KEY(%s)", strings.Join(primaries, ", "))
- alters = append(alters, sql)
- }
- }*/
- // check TTL
- {
- oldTtlSpec := findTtlColumn(changes.OldColumns)
- newTtlSpec := findTtlColumn(ts.Columns())
- log.Debugf("old: %s new: %s", jsonutils.Marshal(oldTtlSpec), jsonutils.Marshal(newTtlSpec))
- if oldTtlSpec != newTtlSpec {
- if oldTtlSpec.Count > 0 && newTtlSpec.Count == 0 {
- // remove
- sql := fmt.Sprintf("REMOVE TTL")
- alters = append(alters, sql)
- } else {
- // alter
- sql := fmt.Sprintf("MODIFY TTL `%s` + INTERVAL %d %s", newTtlSpec.ColName, newTtlSpec.Count, newTtlSpec.Unit)
- alters = append(alters, sql)
- }
- }
- }
- // check order by
- {
- oldOrderBys := findOrderByColumns(changes.OldColumns)
- newOrderBys := findOrderByColumns(ts.Columns())
- log.Debugf("old: %s new: %s", jsonutils.Marshal(oldOrderBys), jsonutils.Marshal(newOrderBys))
- if jsonutils.Marshal(oldOrderBys).String() != jsonutils.Marshal(newOrderBys).String() {
- // alter
- altered := false
- for i := range newOrderBys {
- if !utils.IsInStringArray(newOrderBys[i], oldOrderBys) {
- oldOrderBys = append(oldOrderBys, newOrderBys[i])
- altered = true
- }
- }
- if altered {
- sql := fmt.Sprintf("MODIFY ORDER BY (%s)", strings.Join(oldOrderBys, ", "))
- alters = append(alters, sql)
- }
- }
- }
- // check partitions
- newPartitions := findPartitions(ts.Columns())
- if !sortedstring.Equals(oldPartitions, newPartitions) {
- log.Infof("partition inconsistemt: old=%s new=%s", oldPartitions, newPartitions)
- needCopyTable = true
- }
- ret := make([]string, 0)
- // needCopyTable
- if needCopyTable {
- // create new table
- alterTableName := fmt.Sprintf("%s_tmp_%d", ts.Name(), time.Now().Unix())
- alterTable := ts.(*sqlchemy.STableSpec).Clone(alterTableName, 0)
- createSqls := alterTable.CreateSQLs()
- ret = append(ret, createSqls...)
- colNames := make([]string, 0)
- for _, c := range ts.Columns() {
- colNames = append(colNames, fmt.Sprintf("`%s`", c.Name()))
- }
- colNamesStr := strings.Join(colNames, ",")
- // copy data
- sql := fmt.Sprintf("INSERT INTO `%s` (%s) SELECT %s FROM `%s`", alterTableName, colNamesStr, colNamesStr, ts.Name())
- ret = append(ret, sql)
- // rename tables
- sql = fmt.Sprintf("RENAME TABLE `%s` TO `%s_backup`", ts.Name(), alterTableName)
- ret = append(ret, sql)
- sql = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", alterTableName, ts.Name())
- ret = append(ret, sql)
- } else if len(alters) > 0 {
- tableSpec := ts.(*sqlchemy.STableSpec)
- if tableSpec.IsLinked {
- // if the table is a linked table, simply re-create the table
- ret = append(ret, fmt.Sprintf("DROP TABLE IF EXISTS `%s`", tableSpec.Name()))
- createSqls := tableSpec.CreateSQLs()
- ret = append(ret, createSqls...)
- } else {
- sql := fmt.Sprintf("ALTER TABLE `%s` %s;", ts.Name(), strings.Join(alters, ", "))
- ret = append(ret, sql)
- }
- }
- return ret
- }
|