zc.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "context"
  8. "database/sql/driver"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. type filter interface {
  15. DmDriverOpen(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnection, error)
  16. DmDriverOpenConnector(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnector, error)
  17. DmConnectorConnect(filterChain *filterChain, c *DmConnector, ctx context.Context) (*DmConnection, error)
  18. DmConnectorDriver(filterChain *filterChain, c *DmConnector) *DmDriver
  19. DmConnectionBegin(filterChain *filterChain, c *DmConnection) (*DmConnection, error)
  20. DmConnectionBeginTx(filterChain *filterChain, c *DmConnection, ctx context.Context, opts driver.TxOptions) (*DmConnection, error)
  21. DmConnectionCommit(filterChain *filterChain, c *DmConnection) error
  22. DmConnectionRollback(filterChain *filterChain, c *DmConnection) error
  23. DmConnectionClose(filterChain *filterChain, c *DmConnection) error
  24. DmConnectionPing(filterChain *filterChain, c *DmConnection, ctx context.Context) error
  25. DmConnectionExec(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmResult, error)
  26. DmConnectionExecContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error)
  27. DmConnectionQuery(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmRows, error)
  28. DmConnectionQueryContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error)
  29. DmConnectionPrepare(filterChain *filterChain, c *DmConnection, query string) (*DmStatement, error)
  30. DmConnectionPrepareContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string) (*DmStatement, error)
  31. DmConnectionResetSession(filterChain *filterChain, c *DmConnection, ctx context.Context) error
  32. DmConnectionCheckNamedValue(filterChain *filterChain, c *DmConnection, nv *driver.NamedValue) error
  33. DmStatementClose(filterChain *filterChain, s *DmStatement) error
  34. DmStatementNumInput(filterChain *filterChain, s *DmStatement) int
  35. DmStatementExec(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmResult, error)
  36. DmStatementExecContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmResult, error)
  37. DmStatementQuery(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmRows, error)
  38. DmStatementQueryContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmRows, error)
  39. DmStatementCheckNamedValue(filterChain *filterChain, s *DmStatement, nv *driver.NamedValue) error
  40. DmResultLastInsertId(filterChain *filterChain, r *DmResult) (int64, error)
  41. DmResultRowsAffected(filterChain *filterChain, r *DmResult) (int64, error)
  42. DmRowsColumns(filterChain *filterChain, r *DmRows) []string
  43. DmRowsClose(filterChain *filterChain, r *DmRows) error
  44. DmRowsNext(filterChain *filterChain, r *DmRows, dest []driver.Value) error
  45. DmRowsHasNextResultSet(filterChain *filterChain, r *DmRows) bool
  46. DmRowsNextResultSet(filterChain *filterChain, r *DmRows) error
  47. DmRowsColumnTypeScanType(filterChain *filterChain, r *DmRows, index int) reflect.Type
  48. DmRowsColumnTypeDatabaseTypeName(filterChain *filterChain, r *DmRows, index int) string
  49. DmRowsColumnTypeLength(filterChain *filterChain, r *DmRows, index int) (length int64, ok bool)
  50. DmRowsColumnTypeNullable(filterChain *filterChain, r *DmRows, index int) (nullable, ok bool)
  51. DmRowsColumnTypePrecisionScale(filterChain *filterChain, r *DmRows, index int) (precision, scale int64, ok bool)
  52. }
  53. type IDGenerator int64
  54. var dmDriverIDGenerator = new(IDGenerator)
  55. var dmConntorIDGenerator = new(IDGenerator)
  56. var dmConnIDGenerator = new(IDGenerator)
  57. var dmStmtIDGenerator = new(IDGenerator)
  58. var dmResultIDGenerator = new(IDGenerator)
  59. var dmRowsIDGenerator = new(IDGenerator)
  60. func (g *IDGenerator) incrementAndGet() int64 {
  61. return atomic.AddInt64((*int64)(g), 1)
  62. }
  63. type RWSiteEnum int
  64. const (
  65. PRIMARY RWSiteEnum = iota
  66. STANDBY
  67. ANYSITE
  68. )
  69. var (
  70. goMapMu sync.RWMutex
  71. goMap = make(map[string]goRun, 2)
  72. )
  73. type filterable struct {
  74. filterChain *filterChain
  75. rwInfo *rwInfo
  76. logInfo *logInfo
  77. recoverInfo *recoverInfo
  78. statInfo *statInfo
  79. objId int64
  80. idGenerator *IDGenerator
  81. }
  82. func runLog() {
  83. goMapMu.Lock()
  84. _, ok := goMap["log"]
  85. if !ok {
  86. goMap["log"] = &logWriter{
  87. flushQueue: make(chan []byte, LogFlushQueueSize),
  88. date: time.Now().Format("2006-01-02"),
  89. logFile: nil,
  90. flushFreq: LogFlushFreq,
  91. filePath: LogDir,
  92. filePrefix: "dm_go",
  93. buffer: Dm_build_935(),
  94. }
  95. go goMap["log"].doRun()
  96. }
  97. goMapMu.Unlock()
  98. }
  99. func runStat() {
  100. goMapMu.Lock()
  101. _, ok := goMap["stat"]
  102. if !ok {
  103. goMap["stat"] = newStatFlusher()
  104. go goMap["stat"].doRun()
  105. }
  106. goMapMu.Unlock()
  107. }
  108. func (f *filterable) createFilterChain(bc *DmConnector, props *Properties) {
  109. var filters = make([]filter, 0, 5)
  110. if bc != nil {
  111. if LogLevel != LOG_OFF {
  112. filters = append(filters, &logFilter{})
  113. f.logInfo = &logInfo{logRecord: new(LogRecord)}
  114. runLog()
  115. }
  116. if StatEnable {
  117. filters = append(filters, &statFilter{})
  118. f.statInfo = newStatInfo()
  119. goStatMu.Lock()
  120. if goStat == nil {
  121. goStat = newGoStat(1000)
  122. }
  123. goStatMu.Unlock()
  124. runStat()
  125. }
  126. if bc.doSwitch != DO_SWITCH_OFF {
  127. filters = append(filters, &reconnectFilter{})
  128. f.recoverInfo = newRecoverInfo()
  129. }
  130. if bc.rwSeparate {
  131. filters = append(filters, &rwFilter{})
  132. f.rwInfo = newRwInfo()
  133. }
  134. } else if props != nil {
  135. if ParseLogLevel(props) != LOG_OFF {
  136. filters = append(filters, &logFilter{})
  137. f.logInfo = &logInfo{logRecord: new(LogRecord)}
  138. runLog()
  139. }
  140. if props.GetBool("statEnable", StatEnable) {
  141. filters = append(filters, &statFilter{})
  142. f.statInfo = newStatInfo()
  143. goStatMu.Lock()
  144. if goStat == nil {
  145. goStat = newGoStat(1000)
  146. }
  147. goStatMu.Unlock()
  148. runStat()
  149. }
  150. if props.GetInt(DoSwitchKey, int(DO_SWITCH_OFF), 0, 2) != int(DO_SWITCH_OFF) {
  151. filters = append(filters, &reconnectFilter{})
  152. f.recoverInfo = newRecoverInfo()
  153. }
  154. if props.GetBool("rwSeparate", false) {
  155. filters = append(filters, &rwFilter{})
  156. f.rwInfo = newRwInfo()
  157. }
  158. }
  159. f.filterChain = newFilterChain(filters)
  160. }
  161. func (f *filterable) resetFilterable(src *filterable) {
  162. f.filterChain = src.filterChain
  163. f.logInfo = src.logInfo
  164. f.rwInfo = src.rwInfo
  165. f.statInfo = src.statInfo
  166. }
  167. func (f *filterable) getID() int64 {
  168. if f.objId < 0 {
  169. f.objId = f.idGenerator.incrementAndGet()
  170. }
  171. return f.objId
  172. }
  173. type logInfo struct {
  174. logRecord *LogRecord
  175. lastExecuteStartNano time.Time
  176. }
  177. type rwInfo struct {
  178. distribute RWSiteEnum
  179. rwCounter *rwCounter
  180. connStandby *DmConnection
  181. connCurrent *DmConnection
  182. tryRecoverTs int64
  183. stmtStandby *DmStatement
  184. stmtCurrent *DmStatement
  185. readOnly bool
  186. }
  187. func newRwInfo() *rwInfo {
  188. rwInfo := new(rwInfo)
  189. rwInfo.distribute = PRIMARY
  190. rwInfo.readOnly = true
  191. return rwInfo
  192. }
  193. func (rwi *rwInfo) cleanup() {
  194. rwi.distribute = PRIMARY
  195. rwi.rwCounter = nil
  196. rwi.connStandby = nil
  197. rwi.connCurrent = nil
  198. rwi.stmtStandby = nil
  199. rwi.stmtCurrent = nil
  200. }
  201. func (rwi *rwInfo) toPrimary() RWSiteEnum {
  202. if rwi.distribute != PRIMARY {
  203. rwi.rwCounter.countPrimary()
  204. }
  205. rwi.distribute = PRIMARY
  206. return rwi.distribute
  207. }
  208. func (rwi *rwInfo) toAny() RWSiteEnum {
  209. rwi.distribute = rwi.rwCounter.count(ANYSITE, rwi.connStandby)
  210. return rwi.distribute
  211. }
  212. type recoverInfo struct {
  213. checkEpRecoverTs int64
  214. }
  215. func newRecoverInfo() *recoverInfo {
  216. recoverInfo := new(recoverInfo)
  217. recoverInfo.checkEpRecoverTs = 0
  218. return recoverInfo
  219. }
  220. type statInfo struct {
  221. constructNano int64
  222. connStat *connectionStat
  223. lastExecuteStartNano int64
  224. lastExecuteTimeNano int64
  225. lastExecuteType ExecuteTypeEnum
  226. firstResultSet bool
  227. lastExecuteSql string
  228. sqlStat *sqlStat
  229. sql string
  230. cursorIndex int
  231. closeCount int
  232. readStringLength int64
  233. readBytesLength int64
  234. openInputStreamCount int
  235. openReaderCount int
  236. }
  237. var (
  238. goStatMu sync.RWMutex
  239. goStat *GoStat
  240. )
  241. func newStatInfo() *statInfo {
  242. si := new(statInfo)
  243. return si
  244. }
  245. func (si *statInfo) init(conn *DmConnection) {
  246. si.connStat = goStat.createConnStat(conn)
  247. }
  248. func (si *statInfo) setConstructNano() {
  249. si.constructNano = time.Now().UnixNano()
  250. }
  251. func (si *statInfo) getConstructNano() int64 {
  252. return si.constructNano
  253. }
  254. func (si *statInfo) getConnStat() *connectionStat {
  255. return si.connStat
  256. }
  257. func (si *statInfo) getLastExecuteStartNano() int64 {
  258. return si.lastExecuteStartNano
  259. }
  260. func (si *statInfo) setLastExecuteStartNano(lastExecuteStartNano int64) {
  261. si.lastExecuteStartNano = lastExecuteStartNano
  262. }
  263. func (si *statInfo) getLastExecuteTimeNano() int64 {
  264. return si.lastExecuteTimeNano
  265. }
  266. func (si *statInfo) setLastExecuteTimeNano(lastExecuteTimeNano int64) {
  267. si.lastExecuteTimeNano = lastExecuteTimeNano
  268. }
  269. func (si *statInfo) getLastExecuteType() ExecuteTypeEnum {
  270. return si.lastExecuteType
  271. }
  272. func (si *statInfo) setLastExecuteType(lastExecuteType ExecuteTypeEnum) {
  273. si.lastExecuteType = lastExecuteType
  274. }
  275. func (si *statInfo) isFirstResultSet() bool {
  276. return si.firstResultSet
  277. }
  278. func (si *statInfo) setFirstResultSet(firstResultSet bool) {
  279. si.firstResultSet = firstResultSet
  280. }
  281. func (si *statInfo) getLastExecuteSql() string {
  282. return si.lastExecuteSql
  283. }
  284. func (si *statInfo) setLastExecuteSql(lastExecuteSql string) {
  285. si.lastExecuteSql = lastExecuteSql
  286. }
  287. func (si *statInfo) getSqlStat() *sqlStat {
  288. return si.sqlStat
  289. }
  290. func (si *statInfo) setSqlStat(sqlStat *sqlStat) {
  291. si.sqlStat = sqlStat
  292. }
  293. func (si *statInfo) setConnStat(connStat *connectionStat) {
  294. si.connStat = connStat
  295. }
  296. func (si *statInfo) setConstructNanoWithConstructNano(constructNano int64) {
  297. si.constructNano = constructNano
  298. }
  299. func (si *statInfo) afterExecute(nanoSpan int64) {
  300. si.lastExecuteTimeNano = nanoSpan
  301. }
  302. func (si *statInfo) beforeExecute() {
  303. si.lastExecuteStartNano = time.Now().UnixNano()
  304. }
  305. func (si *statInfo) getSql() string {
  306. return si.sql
  307. }
  308. func (si *statInfo) setSql(sql string) {
  309. si.sql = sql
  310. }
  311. func (si *statInfo) getCursorIndex() int {
  312. return si.cursorIndex
  313. }
  314. func (si *statInfo) setCursorIndex(cursorIndex int) {
  315. si.cursorIndex = cursorIndex
  316. }
  317. func (si *statInfo) getCloseCount() int {
  318. return si.closeCount
  319. }
  320. func (si *statInfo) setCloseCount(closeCount int) {
  321. si.closeCount = closeCount
  322. }
  323. func (si *statInfo) getReadStringLength() int64 {
  324. return si.readStringLength
  325. }
  326. func (si *statInfo) setReadStringLength(readStringLength int64) {
  327. si.readStringLength = readStringLength
  328. }
  329. func (si *statInfo) getReadBytesLength() int64 {
  330. return si.readBytesLength
  331. }
  332. func (si *statInfo) setReadBytesLength(readBytesLength int64) {
  333. si.readBytesLength = readBytesLength
  334. }
  335. func (si *statInfo) getOpenInputStreamCount() int {
  336. return si.openInputStreamCount
  337. }
  338. func (si *statInfo) setOpenInputStreamCount(openInputStreamCount int) {
  339. si.openInputStreamCount = openInputStreamCount
  340. }
  341. func (si *statInfo) getOpenReaderCount() int {
  342. return si.openReaderCount
  343. }
  344. func (si *statInfo) setOpenReaderCount(openReaderCount int) {
  345. si.openReaderCount = openReaderCount
  346. }
  347. func (si *statInfo) incrementCloseCount() {
  348. si.closeCount++
  349. }