database.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package cloudcommon
  15. import (
  16. "context"
  17. "database/sql"
  18. "strings"
  19. "time"
  20. "github.com/mattn/go-sqlite3"
  21. "yunion.io/x/jsonutils"
  22. "yunion.io/x/log"
  23. "yunion.io/x/pkg/errors"
  24. "yunion.io/x/sqlchemy"
  25. _ "yunion.io/x/sqlchemy/backends"
  26. noapi "yunion.io/x/onecloud/pkg/apis/notify"
  27. "yunion.io/x/onecloud/pkg/cloudcommon/consts"
  28. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  29. "yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
  30. "yunion.io/x/onecloud/pkg/cloudcommon/etcd"
  31. "yunion.io/x/onecloud/pkg/cloudcommon/informer"
  32. "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
  33. common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
  34. "yunion.io/x/onecloud/pkg/util/dbutils"
  35. )
  36. func InitDBConn(options *common_options.DBOptions) {
  37. dialect, sqlStr, err := options.GetDBConnection()
  38. if err != nil {
  39. log.Fatalf("Invalid SqlConnection string: %s error: %v", options.SqlConnection, err)
  40. }
  41. backend := sqlchemy.MySQLBackend
  42. switch dialect {
  43. case "dm":
  44. backend = sqlchemy.DamengBackend
  45. dialect = "dm"
  46. sqlStr = "dm://" + sqlStr
  47. case "sqlite3":
  48. backend = sqlchemy.SQLiteBackend
  49. dialect = "sqlite3_with_extensions"
  50. sql.Register(dialect,
  51. &sqlite3.SQLiteDriver{
  52. Extensions: []string{
  53. "/opt/yunion/share/sqlite/inet",
  54. },
  55. },
  56. )
  57. case "clickhouse":
  58. log.Fatalf("cannot use clickhouse as primary database")
  59. }
  60. log.Infof("database dialect: %s sqlStr: %s", dialect, sqlStr)
  61. // save configuration to consts
  62. consts.SetDefaultDB(dialect, sqlStr)
  63. dbConn, err := sql.Open(dialect, sqlStr)
  64. if err != nil {
  65. panic(err)
  66. }
  67. sqlchemy.SetDBWithNameBackend(dbConn, sqlchemy.DefaultDB, backend)
  68. if options.DbMaxWaitTimeoutSeconds <= 300 {
  69. options.DbMaxWaitTimeoutSeconds = 3600
  70. }
  71. // ConnMaxLifetime is the maximum amount of time a connection may be reused.
  72. // mysql default max_waitimeout is 28800 seconds, 1 hour should be enough
  73. // but if user set a customized mysql max_waittimeout, the value should be adjusted accordingly
  74. dbConn.SetConnMaxLifetime(time.Duration(options.DbMaxWaitTimeoutSeconds) * time.Second)
  75. // ConnMaxIdleTime should be half of ConnMaxLifetime
  76. dbConn.SetConnMaxIdleTime(time.Duration(options.DbMaxWaitTimeoutSeconds/2) * time.Second)
  77. }
  78. func InitClickhouseConn(options *common_options.DBOptions) {
  79. dialect, sqlStr, err := options.GetClickhouseConnStr()
  80. if err == nil {
  81. // connect to clickcloud
  82. // force convert sqlstr from clickhouse v2 to v1
  83. sqlStr, err = dbutils.ClickhouseSqlStrV2ToV1(sqlStr)
  84. if err != nil {
  85. log.Fatalf("fail to convert clickhouse sqlstr from v2 to v1: %s", err)
  86. }
  87. err = dbutils.ValidateClickhouseV1Str(sqlStr)
  88. if err != nil {
  89. log.Fatalf("invalid clickhouse sqlstr: %s", err)
  90. }
  91. click, err := sql.Open(dialect, sqlStr)
  92. if err != nil {
  93. panic(err)
  94. }
  95. sqlchemy.SetDBWithNameBackend(click, db.ClickhouseDB, sqlchemy.ClickhouseBackend)
  96. if options.OpsLogWithClickhouse {
  97. consts.OpsLogWithClickhouse = true
  98. }
  99. }
  100. }
  101. func InitDB(options *common_options.DBOptions) {
  102. if options.DebugSqlchemy {
  103. log.Warningf("debug Sqlchemy is turned on")
  104. sqlchemy.DEBUG_SQLCHEMY = true
  105. }
  106. log.Infof("Registered SQL drivers: %s", strings.Join(sql.Drivers(), ", "))
  107. consts.QueryOffsetOptimization = options.QueryOffsetOptimization
  108. if options.HistoricalUniqueName {
  109. consts.EnableHistoricalUniqueName()
  110. } else {
  111. consts.DisableHistoricalUniqueName()
  112. }
  113. if options.OpsLogMaxKeepMonths > 0 {
  114. consts.SetSplitableMaxKeepMonths(options.OpsLogMaxKeepMonths)
  115. }
  116. if options.SplitableMaxDurationHours > 0 {
  117. consts.SetSplitableMaxDurationHours(options.SplitableMaxDurationHours)
  118. }
  119. InitDBConn(options)
  120. InitClickhouseConn(options)
  121. switch options.LockmanMethod {
  122. case common_options.LockMethodInMemory, "":
  123. log.Infof("using inmemory lockman")
  124. lm := lockman.NewInMemoryLockManager()
  125. lockman.Init(lm)
  126. case common_options.LockMethodEtcd:
  127. log.Infof("using etcd lockman")
  128. tlsCfg, err := options.GetEtcdTLSConfig()
  129. if err != nil {
  130. log.Fatalln(err.Error())
  131. }
  132. lm, err := lockman.NewEtcdLockManager(&lockman.SEtcdLockManagerConfig{
  133. Endpoints: options.EtcdEndpoints,
  134. Username: options.EtcdUsername,
  135. Password: options.EtcdPassword,
  136. LockTTL: options.EtcdLockTTL,
  137. LockPrefix: options.EtcdLockPrefix,
  138. TLS: tlsCfg,
  139. })
  140. if err != nil {
  141. log.Fatalf("etcd lockman: %v", err)
  142. }
  143. lockman.Init(lm)
  144. }
  145. // lm := lockman.NewNoopLockManager()
  146. if options.EnableDBChecksumTables && len(options.DBChecksumHashAlgorithm) > 0 {
  147. consts.SetDefaultDBChecksumHashAlgorithm(options.DBChecksumHashAlgorithm)
  148. }
  149. initDBNotifier()
  150. startInitInformer(options)
  151. }
  152. func initDBNotifier() {
  153. db.SetChecksumTestFailedNotifier(func(obj *jsonutils.JSONDict) {
  154. notifyclient.SystemExceptionNotifyWithResult(context.TODO(), noapi.ActionChecksumTest, noapi.TOPIC_RESOURCE_DB_TABLE_RECORD, noapi.ResultFailed, obj)
  155. })
  156. }
  157. // startInitInformer starts goroutine init informer backend
  158. func startInitInformer(options *common_options.DBOptions) {
  159. go func() {
  160. if len(options.EtcdEndpoints) == 0 {
  161. return
  162. }
  163. for {
  164. log.Infof("using etcd as resource informer backend")
  165. if err := initInformer(options); err != nil {
  166. log.Errorf("Init informer error: %v", err)
  167. time.Sleep(10 * time.Second)
  168. } else {
  169. break
  170. }
  171. }
  172. }()
  173. }
  174. func initInformer(options *common_options.DBOptions) error {
  175. tlsCfg, err := options.GetEtcdTLSConfig()
  176. if err != nil {
  177. return errors.Wrap(err, "get etcd informer backend tls config")
  178. }
  179. informerBackend, err := informer.NewEtcdBackend(&etcd.SEtcdOptions{
  180. EtcdEndpoint: options.EtcdEndpoints,
  181. EtcdTimeoutSeconds: 5,
  182. EtcdRequestTimeoutSeconds: 2,
  183. EtcdLeaseExpireSeconds: 5,
  184. EtcdEnabldSsl: options.EtcdUseTLS,
  185. TLSConfig: tlsCfg,
  186. }, nil)
  187. if err != nil {
  188. return errors.Wrap(err, "new etcd informer backend")
  189. }
  190. informer.Init(informerBackend)
  191. return nil
  192. }
  193. func CloseDB() {
  194. sqlchemy.CloseDB()
  195. }