sync.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sqlchemy
  15. import (
  16. "fmt"
  17. "sort"
  18. "strings"
  19. "yunion.io/x/log"
  20. "yunion.io/x/pkg/errors"
  21. "yunion.io/x/pkg/utils"
  22. )
  23. func (ts *STableSpec) fetchIndexesAndConstraints() ([]STableIndex, []STableConstraint, error) {
  24. return ts.Database().backend.FetchIndexesAndConstraints(ts)
  25. }
  26. func compareColumnSpec(c1, c2 IColumnSpec) int {
  27. return strings.Compare(c1.Name(), c2.Name())
  28. }
  29. func compareColumnIndex(c1, c2 IColumnSpec) int {
  30. i1 := c1.GetColIndex()
  31. i2 := c2.GetColIndex()
  32. return i1 - i2
  33. }
  34. type SUpdateColumnSpec struct {
  35. OldCol IColumnSpec
  36. NewCol IColumnSpec
  37. }
  38. func DiffCols(tableName string, cols1 []IColumnSpec, cols2 []IColumnSpec) ([]IColumnSpec, []SUpdateColumnSpec, []IColumnSpec) {
  39. sort.Slice(cols1, func(i, j int) bool {
  40. return compareColumnSpec(cols1[i], cols1[j]) < 0
  41. })
  42. sort.Slice(cols2, func(i, j int) bool {
  43. return compareColumnSpec(cols2[i], cols2[j]) < 0
  44. })
  45. // for i := range cols1 {
  46. // log.Debugf("%s %v", cols1[i].DefinitionString(), cols1[i].IsPrimary())
  47. // }
  48. // for i := range cols2 {
  49. // log.Debugf("%s %v", cols2[i].DefinitionString(), cols2[i].IsPrimary())
  50. // }
  51. i := 0
  52. j := 0
  53. remove := make([]IColumnSpec, 0)
  54. update := make([]SUpdateColumnSpec, 0)
  55. add := make([]IColumnSpec, 0)
  56. for i < len(cols1) || j < len(cols2) {
  57. if i < len(cols1) && j < len(cols2) {
  58. comp := compareColumnSpec(cols1[i], cols2[j])
  59. if comp == 0 {
  60. if cols1[i].DefinitionString() != cols2[j].DefinitionString() || cols1[i].IsPrimary() != cols2[j].IsPrimary() {
  61. log.Infof("UPDATE %s: %s(primary:%v) => %s(primary:%v)", tableName, cols1[i].DefinitionString(), cols1[i].IsPrimary(), cols2[j].DefinitionString(), cols2[j].IsPrimary())
  62. update = append(update, SUpdateColumnSpec{
  63. OldCol: cols1[i],
  64. NewCol: cols2[j],
  65. })
  66. }
  67. i++
  68. j++
  69. } else if comp > 0 {
  70. add = append(add, cols2[j])
  71. j++
  72. } else {
  73. remove = append(remove, cols1[i])
  74. i++
  75. }
  76. } else if i < len(cols1) {
  77. remove = append(remove, cols1[i])
  78. i++
  79. } else if j < len(cols2) {
  80. add = append(add, cols2[j])
  81. j++
  82. }
  83. }
  84. for i := 0; i < len(add); {
  85. intCol := add[i].(iColumnInternal)
  86. if len(intCol.Oldname()) > 0 {
  87. // find delete column
  88. rmIdx := -1
  89. for j := range remove {
  90. if remove[j].Name() == intCol.Oldname() {
  91. // remove from
  92. rmIdx = j
  93. break
  94. }
  95. }
  96. if rmIdx >= 0 {
  97. oldCol := remove[rmIdx]
  98. {
  99. // remove from remove
  100. copy(remove[rmIdx:], remove[rmIdx+1:])
  101. remove = remove[:len(remove)-1]
  102. }
  103. {
  104. // remove from add
  105. copy(add[i:], add[i+1:])
  106. add = add[:len(add)-1]
  107. }
  108. {
  109. update = append(update, SUpdateColumnSpec{
  110. OldCol: oldCol,
  111. NewCol: intCol,
  112. })
  113. }
  114. // do not increase i
  115. continue
  116. }
  117. }
  118. i++
  119. }
  120. return remove, update, add
  121. }
  122. func diffIndexes2(exists []STableIndex, defs []STableIndex) (diff []STableIndex) {
  123. diff = make([]STableIndex, 0)
  124. for i := 0; i < len(exists); i++ {
  125. findDef := false
  126. for j := 0; j < len(defs); j++ {
  127. if defs[j].IsIdentical(exists[i].columns...) {
  128. findDef = true
  129. break
  130. }
  131. }
  132. if !findDef {
  133. diff = append(diff, exists[i])
  134. }
  135. }
  136. return
  137. }
  138. func diffIndexes(exists []STableIndex, defs []STableIndex) (added []STableIndex, removed []STableIndex) {
  139. return diffIndexes2(defs, exists), diffIndexes2(exists, defs)
  140. }
  141. // DropForeignKeySQL returns the SQL statements to do droping foreignkey for a TableSpec
  142. func (ts *STableSpec) DropForeignKeySQL() []string {
  143. ret := make([]string, 0)
  144. db := ts.Database()
  145. if db == nil {
  146. panic("DropForeignKeySQL empty database")
  147. }
  148. if db.backend == nil {
  149. panic("DropForeignKeySQL empty backend")
  150. }
  151. qChar := db.backend.QuoteChar()
  152. if db.backend.IsSupportIndexAndContraints() {
  153. _, constraints, err := ts.fetchIndexesAndConstraints()
  154. if err != nil {
  155. if errors.Cause(err) != ErrTableNotExists {
  156. log.Errorf("fetchIndexesAndConstraints fail %s", err)
  157. }
  158. return nil
  159. }
  160. for _, constraint := range constraints {
  161. sql := fmt.Sprintf("ALTER TABLE %s%s%s DROP FOREIGN KEY %s%s%s", qChar, ts.name, qChar, qChar, constraint.name, qChar)
  162. ret = append(ret, sql)
  163. log.Infof("%s;", sql)
  164. }
  165. }
  166. return ret
  167. }
  168. // Exists checks wheter a table exists
  169. func (ts *STableSpec) Exists() bool {
  170. tables := ts.Database().GetTables()
  171. in, _ := utils.InStringArray(ts.name, tables)
  172. return in
  173. }
  174. // Drop drop table
  175. func (ts *STableSpec) Drop() error {
  176. if !ts.Exists() {
  177. return nil
  178. }
  179. db := ts.Database()
  180. if db == nil {
  181. panic("DropForeignKeySQL empty database")
  182. }
  183. if db.backend == nil {
  184. panic("DropForeignKeySQL empty backend")
  185. }
  186. sql := db.backend.DropTableSQL(ts.name)
  187. _, err := db.Exec(sql)
  188. if err != nil {
  189. log.Errorf("exec sql error %s: %s", sql, err)
  190. return errors.Wrap(err, "Exec")
  191. }
  192. return nil
  193. }
  194. type STableChanges struct {
  195. // indexes
  196. RemoveIndexes []STableIndex
  197. AddIndexes []STableIndex
  198. // Columns
  199. RemoveColumns []IColumnSpec
  200. UpdatedColumns []SUpdateColumnSpec
  201. AddColumns []IColumnSpec
  202. OldColumns []IColumnSpec
  203. }
  204. // SyncSQL returns SQL statements that make table in database consistent with TableSpec definitions
  205. // by comparing table definition derived from TableSpec and that in database
  206. func (ts *STableSpec) SyncSQL() []string {
  207. if !ts.Exists() {
  208. log.Debugf("table %s not created yet", ts.name)
  209. return ts.CreateSQLs()
  210. }
  211. var addIndexes, removeIndexes []STableIndex
  212. if ts.Database().backend.IsSupportIndexAndContraints() {
  213. indexes, _, err := ts.fetchIndexesAndConstraints()
  214. if err != nil {
  215. if errors.Cause(err) != ErrTableNotExists {
  216. log.Errorf("fetchIndexesAndConstraints fail %s", err)
  217. }
  218. return nil
  219. }
  220. addIndexes, removeIndexes = diffIndexes(indexes, ts._indexes)
  221. }
  222. cols, err := ts.Database().backend.FetchTableColumnSpecs(ts)
  223. if err != nil {
  224. log.Errorf("fetchColumnDefs fail: %s", err)
  225. return nil
  226. }
  227. remove, update, add := DiffCols(ts.name, cols, ts.Columns())
  228. return ts.Database().backend.CommitTableChangeSQL(ts, STableChanges{
  229. RemoveIndexes: removeIndexes,
  230. AddIndexes: addIndexes,
  231. RemoveColumns: remove,
  232. UpdatedColumns: update,
  233. AddColumns: add,
  234. OldColumns: cols,
  235. })
  236. }
  237. // Sync executes the SQLs to synchronize the DB definion of s SQL database
  238. // by applying the SQL statements generated by SyncSQL()
  239. func (ts *STableSpec) Sync() error {
  240. sqls := ts.SyncSQL()
  241. if sqls != nil {
  242. for _, sql := range sqls {
  243. log.Infof(sql)
  244. _, err := ts.Database().Exec(sql)
  245. if err != nil {
  246. log.Errorf("exec sql error %s: %s", sql, err)
  247. return err
  248. }
  249. }
  250. }
  251. return nil
  252. }
  253. // CheckSync checks whether the table in database consistent with TableSpec
  254. func (ts *STableSpec) CheckSync() error {
  255. sqls := ts.SyncSQL()
  256. if len(sqls) > 0 {
  257. for _, sql := range sqls {
  258. fmt.Println(sql)
  259. }
  260. return fmt.Errorf("DB table %q not in sync", ts.name)
  261. }
  262. return nil
  263. }