| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- package clickhouse
- import (
- "database/sql/driver"
- "fmt"
- "io"
- "reflect"
- "sync"
- "time"
- "github.com/ClickHouse/clickhouse-go/lib/column"
- "github.com/ClickHouse/clickhouse-go/lib/data"
- "github.com/ClickHouse/clickhouse-go/lib/protocol"
- )
- type rows struct {
- ch *clickhouse
- err error
- mutex sync.RWMutex
- finish func()
- offset int
- block *data.Block
- totals *data.Block
- extremes *data.Block
- stream chan *data.Block
- columns []string
- blockColumns []column.Column
- }
- func (rows *rows) Columns() []string {
- return rows.columns
- }
- func (rows *rows) ColumnTypeScanType(idx int) reflect.Type {
- return rows.blockColumns[idx].ScanType()
- }
- func (rows *rows) ColumnTypeDatabaseTypeName(idx int) string {
- return rows.blockColumns[idx].CHType()
- }
- func (rows *rows) Next(dest []driver.Value) error {
- if rows.block == nil || int(rows.block.NumRows) <= rows.offset {
- switch block, ok := <-rows.stream; true {
- case !ok:
- if err := rows.error(); err != nil {
- return err
- }
- return io.EOF
- default:
- rows.block = block
- rows.offset = 0
- }
- }
- for i := range dest {
- dest[i] = rows.block.Values[i][rows.offset]
- }
- rows.offset++
- return nil
- }
- func (rows *rows) HasNextResultSet() bool {
- return rows.totals != nil || rows.extremes != nil
- }
- func (rows *rows) NextResultSet() error {
- switch {
- case rows.totals != nil:
- rows.block = rows.totals
- rows.offset = 0
- rows.totals = nil
- case rows.extremes != nil:
- rows.block = rows.extremes
- rows.offset = 0
- rows.extremes = nil
- default:
- return io.EOF
- }
- return nil
- }
- func (rows *rows) receiveData() error {
- defer close(rows.stream)
- var (
- err error
- packet uint64
- progress *progress
- profileInfo *profileInfo
- )
- for {
- if packet, err = rows.ch.decoder.Uvarint(); err != nil {
- return rows.setError(err)
- }
- switch packet {
- case protocol.ServerException:
- rows.ch.logf("[rows] <- exception")
- return rows.setError(rows.ch.exception())
- case protocol.ServerProgress:
- if progress, err = rows.ch.progress(); err != nil {
- return rows.setError(err)
- }
- rows.ch.logf("[rows] <- progress: rows=%d, bytes=%d, total rows=%d",
- progress.rows,
- progress.bytes,
- progress.totalRows,
- )
- case protocol.ServerProfileInfo:
- if profileInfo, err = rows.ch.profileInfo(); err != nil {
- return rows.setError(err)
- }
- rows.ch.logf("[rows] <- profiling: rows=%d, bytes=%d, blocks=%d", profileInfo.rows, profileInfo.bytes, profileInfo.blocks)
- case protocol.ServerData, protocol.ServerTotals, protocol.ServerExtremes:
- var (
- block *data.Block
- begin = time.Now()
- )
- if block, err = rows.ch.readBlock(); err != nil {
- return rows.setError(err)
- }
- rows.ch.logf("[rows] <- data: packet=%d, columns=%d, rows=%d, elapsed=%s", packet, block.NumColumns, block.NumRows, time.Since(begin))
- if block.NumRows == 0 {
- continue
- }
- switch packet {
- case protocol.ServerData:
- rows.stream <- block
- case protocol.ServerTotals:
- rows.totals = block
- case protocol.ServerExtremes:
- rows.extremes = block
- }
- case protocol.ServerEndOfStream:
- rows.ch.logf("[rows] <- end of stream")
- return nil
- default:
- rows.ch.conn.Close()
- rows.ch.logf("[rows] unexpected packet [%d]", packet)
- return rows.setError(fmt.Errorf("[rows] unexpected packet [%d] from server", packet))
- }
- }
- }
- func (rows *rows) Close() error {
- rows.ch.logf("[rows] close")
- rows.columns = nil
- for range rows.stream {
- }
- rows.finish()
- return nil
- }
- func (rows *rows) error() error {
- rows.mutex.RLock()
- defer rows.mutex.RUnlock()
- return rows.err
- }
- func (rows *rows) setError(err error) error {
- rows.mutex.Lock()
- rows.err = err
- rows.mutex.Unlock()
- return err
- }
- func (rows *rows) ColumnTypeNullable(idx int) (nullable, ok bool) {
- _, ok = rows.blockColumns[idx].(*column.Nullable)
- return ok, true
- }
- func (rows *rows) ColumnTypePrecisionScale(idx int) (precision, scale int64, ok bool) {
- decimalVal, ok := rows.blockColumns[idx].(*column.Decimal)
- if !ok {
- if nullable, nullOk := rows.blockColumns[idx].(*column.Nullable); nullOk {
- decimalVal, ok = nullable.GetColumn().(*column.Decimal)
- }
- }
- if ok {
- return int64(decimalVal.GetPrecision()), int64(decimalVal.GetScale()), ok
- }
- return 0, 0, false
- }
|