| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- package data
- import (
- "bytes"
- "database/sql/driver"
- "fmt"
- "io"
- "reflect"
- "strings"
- "github.com/ClickHouse/clickhouse-go/lib/binary"
- "github.com/ClickHouse/clickhouse-go/lib/column"
- )
- type offset [][]int
- type Block struct {
- Values [][]interface{}
- Columns []column.Column
- NumRows uint64
- NumColumns uint64
- offsets []offset
- buffers []*buffer
- info blockInfo
- }
- func (block *Block) Copy() *Block {
- return &Block{
- Columns: block.Columns,
- NumColumns: block.NumColumns,
- info: block.info,
- }
- }
- func (block *Block) ColumnNames() []string {
- names := make([]string, 0, len(block.Columns))
- for _, column := range block.Columns {
- names = append(names, column.Name())
- }
- return names
- }
- func (block *Block) Read(serverInfo *ServerInfo, decoder *binary.Decoder) (err error) {
- if serverInfo.Revision > 0 {
- if err = block.info.read(decoder); err != nil {
- return err
- }
- }
- if block.NumColumns, err = decoder.Uvarint(); err != nil {
- return err
- }
- if block.NumRows, err = decoder.Uvarint(); err != nil {
- return err
- }
- block.Values = make([][]interface{}, block.NumColumns)
- if block.NumRows > 10 {
- for i := 0; i < int(block.NumColumns); i++ {
- block.Values[i] = make([]interface{}, 0, block.NumRows)
- }
- }
- for i := 0; i < int(block.NumColumns); i++ {
- var (
- value interface{}
- columnName string
- columnType string
- )
- if columnName, err = decoder.String(); err != nil {
- return err
- }
- if columnType, err = decoder.String(); err != nil {
- return err
- }
- c, err := column.Factory(columnName, columnType, serverInfo.Timezone)
- if err != nil {
- return err
- }
- block.Columns = append(block.Columns, c)
- switch column := c.(type) {
- case *column.Array:
- if block.Values[i], err = column.ReadArray(decoder, int(block.NumRows)); err != nil {
- return err
- }
- case *column.Nullable:
- if block.Values[i], err = column.ReadNull(decoder, int(block.NumRows)); err != nil {
- return err
- }
- case *column.Tuple:
- if block.Values[i], err = column.ReadTuple(decoder, int(block.NumRows)); err != nil {
- return err
- }
- default:
- for row := 0; row < int(block.NumRows); row++ {
- if value, err = column.Read(decoder, false); err != nil {
- return err
- }
- block.Values[i] = append(block.Values[i], value)
- }
- }
- }
- return nil
- }
- func (block *Block) writeArray(col column.Column, value Value, num, level int) error {
- if level > col.Depth() {
- arrColumn, ok := col.(*column.Array)
- if ok && strings.Contains(col.CHType(), "Nullable") {
- return arrColumn.WriteNull(block.buffers[num].Offset, block.buffers[num].Column, value.Interface())
- }
- return col.Write(block.buffers[num].Column, value.Interface())
- }
- switch {
- case value.Kind() == reflect.Slice:
- if len(block.offsets[num]) < level {
- block.offsets[num] = append(block.offsets[num], []int{value.Len()})
- } else {
- block.offsets[num][level-1] = append(
- block.offsets[num][level-1],
- block.offsets[num][level-1][len(block.offsets[num][level-1])-1]+value.Len(),
- )
- }
- for i := 0; i < value.Len(); i++ {
- if err := block.writeArray(col, value.Index(i), num, level+1); err != nil {
- return err
- }
- }
- default:
- if err := col.Write(block.buffers[num].Column, value.Interface()); err != nil {
- return err
- }
- }
- return nil
- }
- func (block *Block) AppendRow(args []driver.Value) error {
- if len(block.Columns) != len(args) {
- return fmt.Errorf("block: expected %d arguments (columns: %s), got %d", len(block.Columns), strings.Join(block.ColumnNames(), ", "), len(args))
- }
- block.Reserve()
- {
- block.NumRows++
- }
- for num, c := range block.Columns {
- switch column := c.(type) {
- case *column.Array:
- if args[num] == nil {
- return fmt.Errorf("unsupported [nil] value is passed in argument %d, column is not Nullable", num)
- }
- value := reflect.ValueOf(args[num])
- if value.Kind() != reflect.Slice {
- return fmt.Errorf("unsupported Array(T) type [%T]", value.Interface())
- }
- if err := block.writeArray(c, newValue(value), num, 1); err != nil {
- return err
- }
- case *column.Nullable:
- if err := column.WriteNull(block.buffers[num].Offset, block.buffers[num].Column, args[num]); err != nil {
- return err
- }
- default:
- if err := column.Write(block.buffers[num].Column, args[num]); err != nil {
- return err
- }
- }
- }
- return nil
- }
- func (block *Block) Reserve() {
- if len(block.buffers) == 0 {
- block.buffers = make([]*buffer, len(block.Columns))
- block.offsets = make([]offset, len(block.Columns))
- for i := 0; i < len(block.Columns); i++ {
- var (
- offsetBuffer = new(bytes.Buffer)
- columnBuffer = new(bytes.Buffer)
- )
- block.buffers[i] = &buffer{
- Offset: binary.NewEncoder(offsetBuffer),
- Column: binary.NewEncoder(columnBuffer),
- offsetBuffer: offsetBuffer,
- columnBuffer: columnBuffer,
- }
- }
- }
- }
- func (block *Block) Reset() {
- block.NumRows = 0
- block.NumColumns = 0
- block.Values = block.Values[:0]
- block.Columns = block.Columns[:0]
- block.info.reset()
- for _, buffer := range block.buffers {
- buffer.reset()
- }
- {
- block.offsets = nil
- block.buffers = nil
- }
- }
- func (block *Block) Write(serverInfo *ServerInfo, encoder *binary.Encoder) error {
- if serverInfo.Revision > 0 {
- if err := block.info.write(encoder); err != nil {
- return err
- }
- }
- if err := encoder.Uvarint(block.NumColumns); err != nil {
- return err
- }
- encoder.Uvarint(block.NumRows)
- defer func() {
- block.NumRows = 0
- for i := range block.offsets {
- block.offsets[i] = offset{}
- }
- }()
- for i, column := range block.Columns {
- encoder.String(column.Name())
- encoder.String(column.CHType())
- if len(block.buffers) == len(block.Columns) {
- for _, offsets := range block.offsets[i] {
- for _, offset := range offsets {
- if err := encoder.UInt64(uint64(offset)); err != nil {
- return err
- }
- }
- }
- if _, err := block.buffers[i].WriteTo(encoder); err != nil {
- return err
- }
- }
- }
- return nil
- }
- type blockInfo struct {
- num1 uint64
- isOverflows bool
- num2 uint64
- bucketNum int32
- num3 uint64
- }
- func (info *blockInfo) reset() {
- info.num1 = 0
- info.isOverflows = false
- info.num2 = 0
- info.bucketNum = 0
- info.num3 = 0
- }
- func (info *blockInfo) read(decoder *binary.Decoder) error {
- var err error
- if info.num1, err = decoder.Uvarint(); err != nil {
- return err
- }
- if info.isOverflows, err = decoder.Bool(); err != nil {
- return err
- }
- if info.num2, err = decoder.Uvarint(); err != nil {
- return err
- }
- if info.bucketNum, err = decoder.Int32(); err != nil {
- return err
- }
- if info.num3, err = decoder.Uvarint(); err != nil {
- return err
- }
- return nil
- }
- func (info *blockInfo) write(encoder *binary.Encoder) error {
- if err := encoder.Uvarint(1); err != nil {
- return err
- }
- if err := encoder.Bool(info.isOverflows); err != nil {
- return err
- }
- if err := encoder.Uvarint(2); err != nil {
- return err
- }
- if info.bucketNum == 0 {
- info.bucketNum = -1
- }
- if err := encoder.Int32(info.bucketNum); err != nil {
- return err
- }
- if err := encoder.Uvarint(0); err != nil {
- return err
- }
- return nil
- }
- type buffer struct {
- Offset *binary.Encoder
- Column *binary.Encoder
- offsetBuffer *bytes.Buffer
- columnBuffer *bytes.Buffer
- }
- func (buf *buffer) WriteTo(w io.Writer) (int64, error) {
- var size int64
- {
- ln, err := buf.offsetBuffer.WriteTo(w)
- if err != nil {
- return size, err
- }
- size += ln
- }
- {
- ln, err := buf.columnBuffer.WriteTo(w)
- if err != nil {
- return size, err
- }
- size += ln
- }
- return size, nil
- }
- func (buf *buffer) reset() {
- buf.offsetBuffer.Reset()
- buf.columnBuffer.Reset()
- }
|