zf.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "context"
  8. "database/sql/driver"
  9. "io"
  10. "reflect"
  11. "time"
  12. "gitee.com/chunanyong/dm/util"
  13. )
  14. const SQL_GET_DSC_EP_SITE = "SELECT " +
  15. "dsc.ep_seqno, " +
  16. "(CASE mal.MAL_INST_HOST WHEN '' THEN mal.MAL_HOST ELSE mal.MAL_INST_HOST END) as ep_host, " +
  17. "dcr.EP_PORT, " +
  18. "dsc.EP_STATUS " +
  19. "FROM V$DSC_EP_INFO dsc " +
  20. "LEFT join V$DM_MAL_INI mal " +
  21. "on dsc.EP_NAME = mal.MAL_INST_NAME " +
  22. "LEFT join (SELECT grp.GROUP_TYPE GROUP_TYPE, ep.* FROM SYS.\"V$DCR_GROUP\" grp, SYS.\"V$DCR_EP\" ep where grp.GROUP_NAME = ep.GROUP_NAME) dcr " +
  23. "on dsc.EP_NAME = dcr.EP_NAME and GROUP_TYPE = 'DB' order by dsc.ep_seqno asc;"
  24. type reconnectFilter struct {
  25. }
  26. // 一定抛错
  27. func (rf *reconnectFilter) autoReconnect(connection *DmConnection, err error) error {
  28. if dmErr, ok := err.(*DmError); ok {
  29. if dmErr.ErrCode == ECGO_COMMUNITION_ERROR.ErrCode || dmErr.ErrCode == ECGO_CONNECTION_CLOSED.ErrCode {
  30. if connection.dmConnector.driverReconnect {
  31. return rf.reconnect(connection, dmErr.getErrText())
  32. } else {
  33. connection.Access.Close()
  34. connection.closed.Set(true)
  35. return driver.ErrBadConn
  36. }
  37. }
  38. }
  39. return err
  40. }
  41. // 一定抛错
  42. func (rf *reconnectFilter) reconnect(connection *DmConnection, reason string) error {
  43. // 读写分离,重连需要处理备机
  44. var err error
  45. if connection.dmConnector.rwSeparate {
  46. err = RWUtil.reconnect(connection)
  47. } else {
  48. err = connection.reconnect()
  49. }
  50. if err != nil {
  51. connection.closed.Set(true)
  52. return ECGO_CONNECTION_SWITCH_FAILED.addDetailln(reason).throw()
  53. }
  54. // 重连成功
  55. connection.closed.Set(false)
  56. return ECGO_CONNECTION_SWITCHED.addDetailln(reason).throw()
  57. }
  58. func (rf *reconnectFilter) loadDscEpSites(conn *DmConnection) []*ep {
  59. stmt, rs, err := conn.driverQuery(SQL_GET_DSC_EP_SITE)
  60. if err != nil {
  61. return nil
  62. }
  63. defer func() {
  64. rs.close()
  65. stmt.close()
  66. }()
  67. epList := make([]*ep, 0)
  68. dest := make([]driver.Value, 4)
  69. for err = rs.next(dest); err != io.EOF; err = rs.next(dest) {
  70. ep := newEP(dest[1].(string), dest[2].(int32))
  71. ep.epSeqno = dest[0].(int32)
  72. if util.StringUtil.EqualsIgnoreCase(dest[3].(string), "OK") {
  73. ep.epStatus = EP_STATUS_OK
  74. } else {
  75. ep.epStatus = EP_STATUS_ERROR
  76. }
  77. epList = append(epList, ep)
  78. }
  79. return epList
  80. }
  81. func (rf *reconnectFilter) checkAndRecover(conn *DmConnection) error {
  82. if conn.dmConnector.doSwitch != DO_SWITCH_WHEN_EP_RECOVER {
  83. return nil
  84. }
  85. // check trx finish
  86. if !conn.trxFinish {
  87. return nil
  88. }
  89. var curIndex = conn.getIndexOnEPGroup()
  90. if curIndex == 0 || (time.Now().UnixNano()/1000000-conn.recoverInfo.checkEpRecoverTs) < int64(conn.dmConnector.switchInterval) {
  91. return nil
  92. }
  93. // check db recover
  94. var dscEps []*ep
  95. if conn.dmConnector.cluster == CLUSTER_TYPE_DSC {
  96. dscEps = rf.loadDscEpSites(conn)
  97. }
  98. if dscEps == nil || len(dscEps) == 0 {
  99. return nil
  100. }
  101. var recover = false
  102. for _, okEp := range dscEps {
  103. if okEp.epStatus != EP_STATUS_OK {
  104. continue
  105. }
  106. for i := int32(0); i < curIndex; i++ {
  107. ep := conn.dmConnector.group.epList[i]
  108. if okEp.host == ep.host && okEp.port == ep.port {
  109. recover = true
  110. break
  111. }
  112. }
  113. if recover {
  114. break
  115. }
  116. }
  117. conn.recoverInfo.checkEpRecoverTs = time.Now().UnixNano() / 1000000
  118. if !recover {
  119. return nil
  120. }
  121. if conn.dmConnector.driverReconnect {
  122. return conn.reconnect()
  123. } else {
  124. conn.Access.Close()
  125. conn.closed.Set(false)
  126. return ECGO_CONNECTION_CLOSED.throw()
  127. }
  128. //return driver.ErrBadConn
  129. // do reconnect
  130. //return conn.reconnect()
  131. }
  132. // DmDriver
  133. func (rf *reconnectFilter) DmDriverOpen(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnection, error) {
  134. return filterChain.DmDriverOpen(d, dsn)
  135. }
  136. func (rf *reconnectFilter) DmDriverOpenConnector(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnector, error) {
  137. return filterChain.DmDriverOpenConnector(d, dsn)
  138. }
  139. // DmConnector
  140. func (rf *reconnectFilter) DmConnectorConnect(filterChain *filterChain, c *DmConnector, ctx context.Context) (*DmConnection, error) {
  141. return filterChain.DmConnectorConnect(c, ctx)
  142. }
  143. func (rf *reconnectFilter) DmConnectorDriver(filterChain *filterChain, c *DmConnector) *DmDriver {
  144. return filterChain.DmConnectorDriver(c)
  145. }
  146. // DmConnection
  147. func (rf *reconnectFilter) DmConnectionBegin(filterChain *filterChain, c *DmConnection) (*DmConnection, error) {
  148. dc, err := filterChain.DmConnectionBegin(c)
  149. if err != nil {
  150. return nil, rf.autoReconnect(c, err)
  151. }
  152. return dc, err
  153. }
  154. func (rf *reconnectFilter) DmConnectionBeginTx(filterChain *filterChain, c *DmConnection, ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
  155. dc, err := filterChain.DmConnectionBeginTx(c, ctx, opts)
  156. if err != nil {
  157. return nil, rf.autoReconnect(c, err)
  158. }
  159. return dc, err
  160. }
  161. func (rf *reconnectFilter) DmConnectionCommit(filterChain *filterChain, c *DmConnection) error {
  162. if err := filterChain.DmConnectionCommit(c); err != nil {
  163. return rf.autoReconnect(c, err)
  164. }
  165. if err := rf.checkAndRecover(c); err != nil {
  166. return rf.autoReconnect(c, err)
  167. }
  168. return nil
  169. }
  170. func (rf *reconnectFilter) DmConnectionRollback(filterChain *filterChain, c *DmConnection) error {
  171. err := filterChain.DmConnectionRollback(c)
  172. if err != nil {
  173. err = rf.autoReconnect(c, err)
  174. }
  175. return err
  176. }
  177. func (rf *reconnectFilter) DmConnectionClose(filterChain *filterChain, c *DmConnection) error {
  178. err := filterChain.DmConnectionClose(c)
  179. if err != nil {
  180. err = rf.autoReconnect(c, err)
  181. }
  182. return err
  183. }
  184. func (rf *reconnectFilter) DmConnectionPing(filterChain *filterChain, c *DmConnection, ctx context.Context) error {
  185. err := filterChain.DmConnectionPing(c, ctx)
  186. if err != nil {
  187. err = rf.autoReconnect(c, err)
  188. }
  189. return err
  190. }
  191. func (rf *reconnectFilter) DmConnectionExec(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmResult, error) {
  192. if err := rf.checkAndRecover(c); err != nil {
  193. return nil, rf.autoReconnect(c, err)
  194. }
  195. dr, err := filterChain.DmConnectionExec(c, query, args)
  196. if err != nil {
  197. return nil, rf.autoReconnect(c, err)
  198. }
  199. return dr, err
  200. }
  201. func (rf *reconnectFilter) DmConnectionExecContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
  202. if err := rf.checkAndRecover(c); err != nil {
  203. return nil, rf.autoReconnect(c, err)
  204. }
  205. dr, err := filterChain.DmConnectionExecContext(c, ctx, query, args)
  206. if err != nil {
  207. return nil, rf.autoReconnect(c, err)
  208. }
  209. return dr, err
  210. }
  211. func (rf *reconnectFilter) DmConnectionQuery(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmRows, error) {
  212. if err := rf.checkAndRecover(c); err != nil {
  213. return nil, rf.autoReconnect(c, err)
  214. }
  215. dr, err := filterChain.DmConnectionQuery(c, query, args)
  216. if err != nil {
  217. return nil, rf.autoReconnect(c, err)
  218. }
  219. return dr, err
  220. }
  221. func (rf *reconnectFilter) DmConnectionQueryContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
  222. if err := rf.checkAndRecover(c); err != nil {
  223. return nil, rf.autoReconnect(c, err)
  224. }
  225. dr, err := filterChain.DmConnectionQueryContext(c, ctx, query, args)
  226. if err != nil {
  227. return nil, rf.autoReconnect(c, err)
  228. }
  229. return dr, err
  230. }
  231. func (rf *reconnectFilter) DmConnectionPrepare(filterChain *filterChain, c *DmConnection, query string) (*DmStatement, error) {
  232. ds, err := filterChain.DmConnectionPrepare(c, query)
  233. if err != nil {
  234. return nil, rf.autoReconnect(c, err)
  235. }
  236. return ds, err
  237. }
  238. func (rf *reconnectFilter) DmConnectionPrepareContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string) (*DmStatement, error) {
  239. ds, err := filterChain.DmConnectionPrepareContext(c, ctx, query)
  240. if err != nil {
  241. return nil, rf.autoReconnect(c, err)
  242. }
  243. return ds, err
  244. }
  245. func (rf *reconnectFilter) DmConnectionResetSession(filterChain *filterChain, c *DmConnection, ctx context.Context) error {
  246. err := filterChain.DmConnectionResetSession(c, ctx)
  247. if err != nil {
  248. err = rf.autoReconnect(c, err)
  249. }
  250. return err
  251. }
  252. func (rf *reconnectFilter) DmConnectionCheckNamedValue(filterChain *filterChain, c *DmConnection, nv *driver.NamedValue) error {
  253. err := filterChain.DmConnectionCheckNamedValue(c, nv)
  254. if err != nil {
  255. err = rf.autoReconnect(c, err)
  256. }
  257. return err
  258. }
  259. // DmStatement
  260. func (rf *reconnectFilter) DmStatementClose(filterChain *filterChain, s *DmStatement) error {
  261. err := filterChain.DmStatementClose(s)
  262. if err != nil {
  263. err = rf.autoReconnect(s.dmConn, err)
  264. }
  265. return err
  266. }
  267. func (rf *reconnectFilter) DmStatementNumInput(filterChain *filterChain, s *DmStatement) int {
  268. var ret int
  269. defer func() {
  270. err := recover()
  271. if err != nil {
  272. rf.autoReconnect(s.dmConn, err.(error))
  273. ret = 0
  274. }
  275. }()
  276. ret = filterChain.DmStatementNumInput(s)
  277. return ret
  278. }
  279. func (rf *reconnectFilter) DmStatementExec(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmResult, error) {
  280. if err := rf.checkAndRecover(s.dmConn); err != nil {
  281. return nil, rf.autoReconnect(s.dmConn, err)
  282. }
  283. dr, err := filterChain.DmStatementExec(s, args)
  284. if err != nil {
  285. return nil, rf.autoReconnect(s.dmConn, err)
  286. }
  287. return dr, err
  288. }
  289. func (rf *reconnectFilter) DmStatementExecContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmResult, error) {
  290. if err := rf.checkAndRecover(s.dmConn); err != nil {
  291. return nil, rf.autoReconnect(s.dmConn, err)
  292. }
  293. dr, err := filterChain.DmStatementExecContext(s, ctx, args)
  294. if err != nil {
  295. return nil, rf.autoReconnect(s.dmConn, err)
  296. }
  297. return dr, err
  298. }
  299. func (rf *reconnectFilter) DmStatementQuery(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmRows, error) {
  300. if err := rf.checkAndRecover(s.dmConn); err != nil {
  301. return nil, rf.autoReconnect(s.dmConn, err)
  302. }
  303. dr, err := filterChain.DmStatementQuery(s, args)
  304. if err != nil {
  305. return nil, rf.autoReconnect(s.dmConn, err)
  306. }
  307. return dr, err
  308. }
  309. func (rf *reconnectFilter) DmStatementQueryContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmRows, error) {
  310. if err := rf.checkAndRecover(s.dmConn); err != nil {
  311. return nil, rf.autoReconnect(s.dmConn, err)
  312. }
  313. dr, err := filterChain.DmStatementQueryContext(s, ctx, args)
  314. if err != nil {
  315. return nil, rf.autoReconnect(s.dmConn, err)
  316. }
  317. return dr, err
  318. }
  319. func (rf *reconnectFilter) DmStatementCheckNamedValue(filterChain *filterChain, s *DmStatement, nv *driver.NamedValue) error {
  320. err := filterChain.DmStatementCheckNamedValue(s, nv)
  321. if err != nil {
  322. err = rf.autoReconnect(s.dmConn, err)
  323. }
  324. return err
  325. }
  326. // DmResult
  327. func (rf *reconnectFilter) DmResultLastInsertId(filterChain *filterChain, r *DmResult) (int64, error) {
  328. i, err := filterChain.DmResultLastInsertId(r)
  329. if err != nil {
  330. err = rf.autoReconnect(r.dmStmt.dmConn, err)
  331. return 0, err
  332. }
  333. return i, err
  334. }
  335. func (rf *reconnectFilter) DmResultRowsAffected(filterChain *filterChain, r *DmResult) (int64, error) {
  336. i, err := filterChain.DmResultRowsAffected(r)
  337. if err != nil {
  338. err = rf.autoReconnect(r.dmStmt.dmConn, err)
  339. return 0, err
  340. }
  341. return i, err
  342. }
  343. // DmRows
  344. func (rf *reconnectFilter) DmRowsColumns(filterChain *filterChain, r *DmRows) []string {
  345. var ret []string
  346. defer func() {
  347. err := recover()
  348. if err != nil {
  349. rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err.(error))
  350. ret = nil
  351. }
  352. }()
  353. ret = filterChain.DmRowsColumns(r)
  354. return ret
  355. }
  356. func (rf *reconnectFilter) DmRowsClose(filterChain *filterChain, r *DmRows) error {
  357. err := filterChain.DmRowsClose(r)
  358. if err != nil {
  359. err = rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err)
  360. }
  361. return err
  362. }
  363. func (rf *reconnectFilter) DmRowsNext(filterChain *filterChain, r *DmRows, dest []driver.Value) error {
  364. err := filterChain.DmRowsNext(r, dest)
  365. if err != nil {
  366. err = rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err)
  367. }
  368. return err
  369. }
  370. func (rf *reconnectFilter) DmRowsHasNextResultSet(filterChain *filterChain, r *DmRows) bool {
  371. var ret bool
  372. defer func() {
  373. err := recover()
  374. if err != nil {
  375. rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err.(error))
  376. ret = false
  377. }
  378. }()
  379. ret = filterChain.DmRowsHasNextResultSet(r)
  380. return ret
  381. }
  382. func (rf *reconnectFilter) DmRowsNextResultSet(filterChain *filterChain, r *DmRows) error {
  383. err := filterChain.DmRowsNextResultSet(r)
  384. if err != nil {
  385. err = rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err)
  386. }
  387. return err
  388. }
  389. func (rf *reconnectFilter) DmRowsColumnTypeScanType(filterChain *filterChain, r *DmRows, index int) reflect.Type {
  390. var ret reflect.Type
  391. defer func() {
  392. err := recover()
  393. if err != nil {
  394. rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err.(error))
  395. ret = scanTypeUnknown
  396. }
  397. }()
  398. ret = filterChain.DmRowsColumnTypeScanType(r, index)
  399. return ret
  400. }
  401. func (rf *reconnectFilter) DmRowsColumnTypeDatabaseTypeName(filterChain *filterChain, r *DmRows, index int) string {
  402. var ret string
  403. defer func() {
  404. err := recover()
  405. if err != nil {
  406. rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err.(error))
  407. ret = ""
  408. }
  409. }()
  410. ret = filterChain.DmRowsColumnTypeDatabaseTypeName(r, index)
  411. return ret
  412. }
  413. func (rf *reconnectFilter) DmRowsColumnTypeLength(filterChain *filterChain, r *DmRows, index int) (length int64, ok bool) {
  414. defer func() {
  415. err := recover()
  416. if err != nil {
  417. rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err.(error))
  418. length, ok = 0, false
  419. }
  420. }()
  421. return filterChain.DmRowsColumnTypeLength(r, index)
  422. }
  423. func (rf *reconnectFilter) DmRowsColumnTypeNullable(filterChain *filterChain, r *DmRows, index int) (nullable, ok bool) {
  424. defer func() {
  425. err := recover()
  426. if err != nil {
  427. rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err.(error))
  428. nullable, ok = false, false
  429. }
  430. }()
  431. return filterChain.DmRowsColumnTypeNullable(r, index)
  432. }
  433. func (rf *reconnectFilter) DmRowsColumnTypePrecisionScale(filterChain *filterChain, r *DmRows, index int) (precision, scale int64, ok bool) {
  434. defer func() {
  435. err := recover()
  436. if err != nil {
  437. rf.autoReconnect(r.CurrentRows.dmStmt.dmConn, err.(error))
  438. precision, scale, ok = 0, 0, false
  439. }
  440. }()
  441. return filterChain.DmRowsColumnTypePrecisionScale(r, index)
  442. }