m.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875
  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "bytes"
  8. "context"
  9. "database/sql"
  10. "database/sql/driver"
  11. "fmt"
  12. "sync"
  13. "sync/atomic"
  14. "gitee.com/chunanyong/dm/parser"
  15. "gitee.com/chunanyong/dm/util"
  16. "golang.org/x/text/encoding"
  17. )
  18. type DmConnection struct {
  19. filterable
  20. mu sync.Mutex
  21. dmConnector *DmConnector
  22. Access *dm_build_1345
  23. stmtMap map[int32]*DmStatement
  24. lastExecInfo *execRetInfo
  25. lexer *parser.Lexer
  26. encode encoding.Encoding
  27. encodeBuffer *bytes.Buffer
  28. transformReaderDst []byte
  29. transformReaderSrc []byte
  30. serverEncoding string
  31. GlobalServerSeries int
  32. ServerVersion string
  33. Malini2 bool
  34. Execute2 bool
  35. LobEmptyCompOrcl bool
  36. IsoLevel int32
  37. ReadOnly bool
  38. NewLobFlag bool
  39. sslEncrypt int
  40. MaxRowSize int32
  41. DDLAutoCommit bool
  42. BackslashEscape bool
  43. SvrStat int32
  44. SvrMode int32
  45. ConstParaOpt bool
  46. DbTimezone int16
  47. LifeTimeRemainder int16
  48. InstanceName string
  49. Schema string
  50. LastLoginIP string
  51. LastLoginTime string
  52. FailedAttempts int32
  53. LoginWarningID int32
  54. GraceTimeRemainder int32
  55. Guid string
  56. DbName string
  57. StandbyHost string
  58. StandbyPort int32
  59. StandbyCount int32
  60. SessionID int64
  61. OracleDateLanguage byte
  62. FormatDate string
  63. FormatTimestamp string
  64. FormatTimestampTZ string
  65. FormatTime string
  66. FormatTimeTZ string
  67. Local bool
  68. MsgVersion int32
  69. TrxStatus int32
  70. dscControl bool
  71. trxFinish bool
  72. autoCommit bool
  73. isBatch bool
  74. watching bool
  75. watcher chan<- context.Context
  76. closech chan struct{}
  77. finished chan<- struct{}
  78. canceled atomicError
  79. closed atomicBool
  80. }
  81. func (conn *DmConnection) setTrxFinish(status int32) {
  82. switch status & Dm_build_132 {
  83. case Dm_build_129, Dm_build_130, Dm_build_131:
  84. conn.trxFinish = true
  85. default:
  86. conn.trxFinish = false
  87. }
  88. }
  89. func (dmConn *DmConnection) init() {
  90. dmConn.stmtMap = make(map[int32]*DmStatement)
  91. dmConn.DbTimezone = 0
  92. dmConn.GlobalServerSeries = 0
  93. dmConn.MaxRowSize = 0
  94. dmConn.LobEmptyCompOrcl = false
  95. dmConn.ReadOnly = false
  96. dmConn.DDLAutoCommit = false
  97. dmConn.ConstParaOpt = false
  98. dmConn.IsoLevel = -1
  99. dmConn.Malini2 = true
  100. dmConn.NewLobFlag = true
  101. dmConn.Execute2 = true
  102. dmConn.serverEncoding = ENCODING_GB18030
  103. dmConn.TrxStatus = Dm_build_80
  104. dmConn.setTrxFinish(dmConn.TrxStatus)
  105. dmConn.OracleDateLanguage = byte(Locale)
  106. dmConn.lastExecInfo = NewExceInfo()
  107. dmConn.MsgVersion = Dm_build_13
  108. dmConn.idGenerator = dmConnIDGenerator
  109. }
  110. func (dmConn *DmConnection) reset() {
  111. dmConn.DbTimezone = 0
  112. dmConn.GlobalServerSeries = 0
  113. dmConn.MaxRowSize = 0
  114. dmConn.LobEmptyCompOrcl = false
  115. dmConn.ReadOnly = false
  116. dmConn.DDLAutoCommit = false
  117. dmConn.ConstParaOpt = false
  118. dmConn.IsoLevel = -1
  119. dmConn.Malini2 = true
  120. dmConn.NewLobFlag = true
  121. dmConn.Execute2 = true
  122. dmConn.serverEncoding = ENCODING_GB18030
  123. dmConn.TrxStatus = Dm_build_80
  124. dmConn.setTrxFinish(dmConn.TrxStatus)
  125. }
  126. func (dc *DmConnection) checkClosed() error {
  127. if dc.closed.IsSet() {
  128. return driver.ErrBadConn
  129. }
  130. return nil
  131. }
  132. func (dc *DmConnection) executeInner(query string, execType int16) (interface{}, error) {
  133. stmt, err := NewDmStmt(dc, query)
  134. if err != nil {
  135. return nil, err
  136. }
  137. if execType == Dm_build_97 {
  138. defer stmt.close()
  139. }
  140. stmt.innerUsed = true
  141. if stmt.dmConn.dmConnector.escapeProcess {
  142. stmt.nativeSql, err = stmt.dmConn.escape(stmt.nativeSql, stmt.dmConn.dmConnector.keyWords)
  143. if err != nil {
  144. stmt.close()
  145. return nil, err
  146. }
  147. }
  148. var optParamList []OptParameter
  149. if stmt.dmConn.ConstParaOpt {
  150. optParamList = make([]OptParameter, 0)
  151. stmt.nativeSql, optParamList, err = stmt.dmConn.execOpt(stmt.nativeSql, optParamList, stmt.dmConn.getServerEncoding())
  152. if err != nil {
  153. stmt.close()
  154. optParamList = nil
  155. }
  156. }
  157. if execType == Dm_build_96 && dc.dmConnector.enRsCache {
  158. rpv, err := rp.get(stmt, query)
  159. if err != nil {
  160. return nil, err
  161. }
  162. if rpv != nil {
  163. stmt.execInfo = rpv.execInfo
  164. dc.lastExecInfo = rpv.execInfo
  165. return newDmRows(rpv.getResultSet(stmt)), nil
  166. }
  167. }
  168. var info *execRetInfo
  169. if optParamList != nil && len(optParamList) > 0 {
  170. info, err = dc.Access.Dm_build_1428(stmt, optParamList)
  171. if err != nil {
  172. stmt.nativeSql = query
  173. info, err = dc.Access.Dm_build_1434(stmt, execType)
  174. }
  175. } else {
  176. info, err = dc.Access.Dm_build_1434(stmt, execType)
  177. }
  178. if err != nil {
  179. stmt.close()
  180. return nil, err
  181. }
  182. dc.lastExecInfo = info
  183. if execType == Dm_build_96 && info.hasResultSet {
  184. return newDmRows(newInnerRows(0, stmt, info)), nil
  185. } else {
  186. return newDmResult(stmt, info), nil
  187. }
  188. }
  189. func g2dbIsoLevel(isoLevel int32) int32 {
  190. switch isoLevel {
  191. case 1:
  192. return Dm_build_84
  193. case 2:
  194. return Dm_build_85
  195. case 4:
  196. return Dm_build_86
  197. case 6:
  198. return Dm_build_87
  199. default:
  200. return -1
  201. }
  202. }
  203. func (dc *DmConnection) Begin() (driver.Tx, error) {
  204. if len(dc.filterChain.filters) == 0 {
  205. return dc.begin()
  206. } else {
  207. return dc.filterChain.reset().DmConnectionBegin(dc)
  208. }
  209. }
  210. func (dc *DmConnection) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
  211. if len(dc.filterChain.filters) == 0 {
  212. return dc.beginTx(ctx, opts)
  213. }
  214. return dc.filterChain.reset().DmConnectionBeginTx(dc, ctx, opts)
  215. }
  216. func (dc *DmConnection) Commit() error {
  217. if len(dc.filterChain.filters) == 0 {
  218. return dc.commit()
  219. } else {
  220. return dc.filterChain.reset().DmConnectionCommit(dc)
  221. }
  222. }
  223. func (dc *DmConnection) Rollback() error {
  224. if len(dc.filterChain.filters) == 0 {
  225. return dc.rollback()
  226. } else {
  227. return dc.filterChain.reset().DmConnectionRollback(dc)
  228. }
  229. }
  230. func (dc *DmConnection) Close() error {
  231. if len(dc.filterChain.filters) == 0 {
  232. return dc.close()
  233. } else {
  234. return dc.filterChain.reset().DmConnectionClose(dc)
  235. }
  236. }
  237. func (dc *DmConnection) Ping(ctx context.Context) error {
  238. if len(dc.filterChain.filters) == 0 {
  239. return dc.ping(ctx)
  240. } else {
  241. return dc.filterChain.reset().DmConnectionPing(dc, ctx)
  242. }
  243. }
  244. func (dc *DmConnection) Exec(query string, args []driver.Value) (driver.Result, error) {
  245. if len(dc.filterChain.filters) == 0 {
  246. return dc.exec(query, args)
  247. }
  248. return dc.filterChain.reset().DmConnectionExec(dc, query, args)
  249. }
  250. func (dc *DmConnection) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
  251. if len(dc.filterChain.filters) == 0 {
  252. return dc.execContext(ctx, query, args)
  253. }
  254. return dc.filterChain.reset().DmConnectionExecContext(dc, ctx, query, args)
  255. }
  256. func (dc *DmConnection) Query(query string, args []driver.Value) (driver.Rows, error) {
  257. if len(dc.filterChain.filters) == 0 {
  258. return dc.query(query, args)
  259. }
  260. return dc.filterChain.reset().DmConnectionQuery(dc, query, args)
  261. }
  262. func (dc *DmConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
  263. if len(dc.filterChain.filters) == 0 {
  264. return dc.queryContext(ctx, query, args)
  265. }
  266. return dc.filterChain.reset().DmConnectionQueryContext(dc, ctx, query, args)
  267. }
  268. func (dc *DmConnection) Prepare(query string) (driver.Stmt, error) {
  269. if len(dc.filterChain.filters) == 0 {
  270. return dc.prepare(query)
  271. }
  272. return dc.filterChain.reset().DmConnectionPrepare(dc, query)
  273. }
  274. func (dc *DmConnection) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
  275. if len(dc.filterChain.filters) == 0 {
  276. return dc.prepareContext(ctx, query)
  277. }
  278. return dc.filterChain.reset().DmConnectionPrepareContext(dc, ctx, query)
  279. }
  280. func (dc *DmConnection) ResetSession(ctx context.Context) error {
  281. if len(dc.filterChain.filters) == 0 {
  282. return dc.resetSession(ctx)
  283. }
  284. if err := dc.filterChain.reset().DmConnectionResetSession(dc, ctx); err != nil {
  285. return driver.ErrBadConn
  286. } else {
  287. return nil
  288. }
  289. }
  290. func (dc *DmConnection) CheckNamedValue(nv *driver.NamedValue) error {
  291. if len(dc.filterChain.filters) == 0 {
  292. return dc.checkNamedValue(nv)
  293. }
  294. return dc.filterChain.reset().DmConnectionCheckNamedValue(dc, nv)
  295. }
  296. func (dc *DmConnection) begin() (*DmConnection, error) {
  297. return dc.beginTx(context.Background(), driver.TxOptions{driver.IsolationLevel(sql.LevelDefault), false})
  298. }
  299. func (dc *DmConnection) beginTx(ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
  300. if err := dc.watchCancel(ctx); err != nil {
  301. return nil, err
  302. }
  303. defer dc.finish()
  304. err := dc.checkClosed()
  305. if err != nil {
  306. return nil, err
  307. }
  308. dc.autoCommit = false
  309. if dc.ReadOnly != opts.ReadOnly {
  310. dc.ReadOnly = opts.ReadOnly
  311. var readonly = 0
  312. if opts.ReadOnly {
  313. readonly = 1
  314. }
  315. dc.exec(fmt.Sprintf("SP_SET_SESSION_READONLY(%d)", readonly), nil)
  316. }
  317. if dc.IsoLevel != int32(opts.Isolation) {
  318. switch sql.IsolationLevel(opts.Isolation) {
  319. case sql.LevelDefault:
  320. dc.IsoLevel = int32(sql.LevelReadCommitted)
  321. case sql.LevelReadUncommitted, sql.LevelReadCommitted, sql.LevelSerializable:
  322. dc.IsoLevel = int32(opts.Isolation)
  323. case sql.LevelRepeatableRead:
  324. if dc.CompatibleMysql() {
  325. dc.IsoLevel = int32(sql.LevelReadCommitted)
  326. } else {
  327. return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
  328. }
  329. default:
  330. return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
  331. }
  332. err = dc.Access.Dm_build_1488(dc)
  333. if err != nil {
  334. return nil, err
  335. }
  336. }
  337. return dc, nil
  338. }
  339. func (dc *DmConnection) commit() error {
  340. err := dc.checkClosed()
  341. if err != nil {
  342. return err
  343. }
  344. defer func() {
  345. dc.autoCommit = dc.dmConnector.autoCommit
  346. if dc.ReadOnly {
  347. dc.exec("SP_SET_SESSION_READONLY(0)", nil)
  348. }
  349. }()
  350. if !dc.autoCommit {
  351. err = dc.Access.Commit()
  352. if err != nil {
  353. return err
  354. }
  355. dc.trxFinish = true
  356. return nil
  357. } else if !dc.dmConnector.alwayseAllowCommit {
  358. return ECGO_COMMIT_IN_AUTOCOMMIT_MODE.throw()
  359. }
  360. return nil
  361. }
  362. func (dc *DmConnection) rollback() error {
  363. err := dc.checkClosed()
  364. if err != nil {
  365. return err
  366. }
  367. defer func() {
  368. dc.autoCommit = dc.dmConnector.autoCommit
  369. if dc.ReadOnly {
  370. dc.exec("SP_SET_SESSION_READONLY(0)", nil)
  371. }
  372. }()
  373. if !dc.autoCommit {
  374. err = dc.Access.Rollback()
  375. if err != nil {
  376. return err
  377. }
  378. dc.trxFinish = true
  379. return nil
  380. } else if !dc.dmConnector.alwayseAllowCommit {
  381. return ECGO_ROLLBACK_IN_AUTOCOMMIT_MODE.throw()
  382. }
  383. return nil
  384. }
  385. func (dc *DmConnection) reconnect() error {
  386. err := dc.Access.Close()
  387. if err != nil {
  388. return err
  389. }
  390. for _, stmt := range dc.stmtMap {
  391. for id, rs := range stmt.rsMap {
  392. rs.Close()
  393. delete(stmt.rsMap, id)
  394. }
  395. }
  396. var newConn *DmConnection
  397. if dc.dmConnector.group != nil {
  398. if newConn, err = dc.dmConnector.group.connect(dc.dmConnector); err != nil {
  399. return err
  400. }
  401. } else {
  402. newConn, err = dc.dmConnector.connect(context.Background())
  403. }
  404. oldMap := dc.stmtMap
  405. newConn.mu = dc.mu
  406. newConn.filterable = dc.filterable
  407. *dc = *newConn
  408. for _, stmt := range oldMap {
  409. if stmt.closed {
  410. continue
  411. }
  412. err = dc.Access.Dm_build_1406(stmt)
  413. if err != nil {
  414. stmt.free()
  415. continue
  416. }
  417. if stmt.prepared || stmt.paramCount > 0 {
  418. if err = stmt.prepare(); err != nil {
  419. continue
  420. }
  421. }
  422. dc.stmtMap[stmt.id] = stmt
  423. }
  424. return nil
  425. }
  426. func (dc *DmConnection) cleanup() {
  427. dc.close()
  428. }
  429. func (dc *DmConnection) close() error {
  430. if !dc.closed.TrySet(true) {
  431. return nil
  432. }
  433. util.AbsorbPanic(func() {
  434. close(dc.closech)
  435. })
  436. if dc.Access == nil {
  437. return nil
  438. }
  439. dc.rollback()
  440. for _, stmt := range dc.stmtMap {
  441. stmt.free()
  442. }
  443. dc.Access.Close()
  444. return nil
  445. }
  446. func (dc *DmConnection) ping(ctx context.Context) error {
  447. if err := dc.watchCancel(ctx); err != nil {
  448. return err
  449. }
  450. defer dc.finish()
  451. rows, err := dc.query("select 1", nil)
  452. if err != nil {
  453. return err
  454. }
  455. return rows.close()
  456. }
  457. func (dc *DmConnection) exec(query string, args []driver.Value) (*DmResult, error) {
  458. err := dc.checkClosed()
  459. if err != nil {
  460. return nil, err
  461. }
  462. if args != nil && len(args) > 0 {
  463. stmt, err := dc.prepare(query)
  464. if err != nil {
  465. return nil, err
  466. }
  467. defer stmt.close()
  468. dc.lastExecInfo = stmt.execInfo
  469. return stmt.exec(args)
  470. } else {
  471. r1, err := dc.executeInner(query, Dm_build_97)
  472. if err != nil {
  473. return nil, err
  474. }
  475. if r2, ok := r1.(*DmResult); ok {
  476. return r2, nil
  477. } else {
  478. return nil, ECGO_NOT_EXEC_SQL.throw()
  479. }
  480. }
  481. }
  482. func (dc *DmConnection) execContext(ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
  483. if err := dc.watchCancel(ctx); err != nil {
  484. return nil, err
  485. }
  486. defer dc.finish()
  487. err := dc.checkClosed()
  488. if err != nil {
  489. return nil, err
  490. }
  491. if args != nil && len(args) > 0 {
  492. stmt, err := dc.prepare(query)
  493. if err != nil {
  494. return nil, err
  495. }
  496. defer stmt.close()
  497. dc.lastExecInfo = stmt.execInfo
  498. dargs, err := namedValueToValue(stmt, args)
  499. if err != nil {
  500. return nil, err
  501. }
  502. return stmt.exec(dargs)
  503. } else {
  504. r1, err := dc.executeInner(query, Dm_build_97)
  505. if err != nil {
  506. return nil, err
  507. }
  508. if r2, ok := r1.(*DmResult); ok {
  509. return r2, nil
  510. } else {
  511. return nil, ECGO_NOT_EXEC_SQL.throw()
  512. }
  513. }
  514. }
  515. func (dc *DmConnection) query(query string, args []driver.Value) (*DmRows, error) {
  516. err := dc.checkClosed()
  517. if err != nil {
  518. return nil, err
  519. }
  520. if args != nil && len(args) > 0 {
  521. stmt, err := dc.prepare(query)
  522. if err != nil {
  523. return nil, err
  524. }
  525. dc.lastExecInfo = stmt.execInfo
  526. stmt.innerUsed = true
  527. return stmt.query(args)
  528. } else {
  529. r1, err := dc.executeInner(query, Dm_build_96)
  530. if err != nil {
  531. return nil, err
  532. }
  533. if r2, ok := r1.(*DmRows); ok {
  534. return r2, nil
  535. } else {
  536. return nil, ECGO_NOT_QUERY_SQL.throw()
  537. }
  538. }
  539. }
  540. func (dc *DmConnection) queryContext(ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
  541. if err := dc.watchCancel(ctx); err != nil {
  542. return nil, err
  543. }
  544. defer dc.finish()
  545. err := dc.checkClosed()
  546. if err != nil {
  547. return nil, err
  548. }
  549. if args != nil && len(args) > 0 {
  550. stmt, err := dc.prepare(query)
  551. if err != nil {
  552. return nil, err
  553. }
  554. dc.lastExecInfo = stmt.execInfo
  555. stmt.innerUsed = true
  556. dargs, err := namedValueToValue(stmt, args)
  557. if err != nil {
  558. return nil, err
  559. }
  560. return stmt.query(dargs)
  561. } else {
  562. r1, err := dc.executeInner(query, Dm_build_96)
  563. if err != nil {
  564. return nil, err
  565. }
  566. if r2, ok := r1.(*DmRows); ok {
  567. return r2, nil
  568. } else {
  569. return nil, ECGO_NOT_QUERY_SQL.throw()
  570. }
  571. }
  572. }
  573. func (dc *DmConnection) prepare(query string) (stmt *DmStatement, err error) {
  574. if err = dc.checkClosed(); err != nil {
  575. return
  576. }
  577. if stmt, err = NewDmStmt(dc, query); err != nil {
  578. return
  579. }
  580. if err = stmt.prepare(); err != nil {
  581. stmt.close()
  582. stmt = nil
  583. return
  584. }
  585. return
  586. }
  587. func (dc *DmConnection) prepareContext(ctx context.Context, query string) (*DmStatement, error) {
  588. if err := dc.watchCancel(ctx); err != nil {
  589. return nil, err
  590. }
  591. defer dc.finish()
  592. return dc.prepare(query)
  593. }
  594. func (dc *DmConnection) resetSession(ctx context.Context) error {
  595. if err := dc.watchCancel(ctx); err != nil {
  596. return err
  597. }
  598. defer dc.finish()
  599. err := dc.checkClosed()
  600. if err != nil {
  601. return err
  602. }
  603. return nil
  604. }
  605. func (dc *DmConnection) checkNamedValue(nv *driver.NamedValue) error {
  606. var err error
  607. var cvt = converter{dc, false}
  608. nv.Value, err = cvt.ConvertValue(nv.Value)
  609. dc.isBatch = cvt.isBatch
  610. return err
  611. }
  612. func (dc *DmConnection) driverQuery(query string) (*DmStatement, *DmRows, error) {
  613. stmt, err := NewDmStmt(dc, query)
  614. if err != nil {
  615. return nil, nil, err
  616. }
  617. stmt.innerUsed = true
  618. stmt.innerExec = true
  619. info, err := dc.Access.Dm_build_1434(stmt, Dm_build_96)
  620. if err != nil {
  621. return nil, nil, err
  622. }
  623. dc.lastExecInfo = info
  624. stmt.innerExec = false
  625. return stmt, newDmRows(newInnerRows(0, stmt, info)), nil
  626. }
  627. func (dc *DmConnection) getIndexOnEPGroup() int32 {
  628. if dc.dmConnector.group == nil || dc.dmConnector.group.epList == nil {
  629. return -1
  630. }
  631. for i := 0; i < len(dc.dmConnector.group.epList); i++ {
  632. ep := dc.dmConnector.group.epList[i]
  633. if dc.dmConnector.host == ep.host && dc.dmConnector.port == ep.port {
  634. return int32(i)
  635. }
  636. }
  637. return -1
  638. }
  639. func (dc *DmConnection) getServerEncoding() string {
  640. if dc.dmConnector.charCode != "" {
  641. return dc.dmConnector.charCode
  642. }
  643. return dc.serverEncoding
  644. }
  645. func (dc *DmConnection) lobFetchAll() bool {
  646. return dc.dmConnector.lobMode == 2
  647. }
  648. func (conn *DmConnection) CompatibleOracle() bool {
  649. return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_ORACLE
  650. }
  651. func (conn *DmConnection) CompatibleMysql() bool {
  652. return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_MYSQL
  653. }
  654. func (conn *DmConnection) cancel(err error) {
  655. conn.canceled.Set(err)
  656. conn.close()
  657. }
  658. func (conn *DmConnection) finish() {
  659. if !conn.watching || conn.finished == nil {
  660. return
  661. }
  662. select {
  663. case conn.finished <- struct{}{}:
  664. conn.watching = false
  665. case <-conn.closech:
  666. }
  667. }
  668. func (conn *DmConnection) startWatcher() {
  669. watcher := make(chan context.Context, 1)
  670. conn.watcher = watcher
  671. finished := make(chan struct{})
  672. conn.finished = finished
  673. go func() {
  674. for {
  675. var ctx context.Context
  676. select {
  677. case ctx = <-watcher:
  678. case <-conn.closech:
  679. return
  680. }
  681. select {
  682. case <-ctx.Done():
  683. conn.cancel(ctx.Err())
  684. case <-finished:
  685. case <-conn.closech:
  686. return
  687. }
  688. }
  689. }()
  690. }
  691. func (conn *DmConnection) watchCancel(ctx context.Context) error {
  692. if conn.watching {
  693. conn.cleanup()
  694. return nil
  695. }
  696. if err := ctx.Err(); err != nil {
  697. return err
  698. }
  699. if ctx.Done() == nil {
  700. return nil
  701. }
  702. if conn.watcher == nil {
  703. return nil
  704. }
  705. conn.watching = true
  706. conn.watcher <- ctx
  707. return nil
  708. }
  709. type noCopy struct{}
  710. func (*noCopy) Lock() {}
  711. type atomicBool struct {
  712. _noCopy noCopy
  713. value uint32
  714. }
  715. func (ab *atomicBool) IsSet() bool {
  716. return atomic.LoadUint32(&ab.value) > 0
  717. }
  718. func (ab *atomicBool) Set(value bool) {
  719. if value {
  720. atomic.StoreUint32(&ab.value, 1)
  721. } else {
  722. atomic.StoreUint32(&ab.value, 0)
  723. }
  724. }
  725. func (ab *atomicBool) TrySet(value bool) bool {
  726. if value {
  727. return atomic.SwapUint32(&ab.value, 1) == 0
  728. }
  729. return atomic.SwapUint32(&ab.value, 0) > 0
  730. }
  731. type atomicError struct {
  732. _noCopy noCopy
  733. value atomic.Value
  734. }
  735. func (ae *atomicError) Set(value error) {
  736. ae.value.Store(value)
  737. }
  738. func (ae *atomicError) Value() error {
  739. if v := ae.value.Load(); v != nil {
  740. return v.(error)
  741. }
  742. return nil
  743. }