block.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. package data
  2. import (
  3. "bytes"
  4. "database/sql/driver"
  5. "fmt"
  6. "io"
  7. "reflect"
  8. "strings"
  9. "github.com/ClickHouse/clickhouse-go/lib/binary"
  10. "github.com/ClickHouse/clickhouse-go/lib/column"
  11. )
  12. type offset [][]int
  13. type Block struct {
  14. Values [][]interface{}
  15. Columns []column.Column
  16. NumRows uint64
  17. NumColumns uint64
  18. offsets []offset
  19. buffers []*buffer
  20. info blockInfo
  21. }
  22. func (block *Block) Copy() *Block {
  23. return &Block{
  24. Columns: block.Columns,
  25. NumColumns: block.NumColumns,
  26. info: block.info,
  27. }
  28. }
  29. func (block *Block) ColumnNames() []string {
  30. names := make([]string, 0, len(block.Columns))
  31. for _, column := range block.Columns {
  32. names = append(names, column.Name())
  33. }
  34. return names
  35. }
  36. func (block *Block) Read(serverInfo *ServerInfo, decoder *binary.Decoder) (err error) {
  37. if serverInfo.Revision > 0 {
  38. if err = block.info.read(decoder); err != nil {
  39. return err
  40. }
  41. }
  42. if block.NumColumns, err = decoder.Uvarint(); err != nil {
  43. return err
  44. }
  45. if block.NumRows, err = decoder.Uvarint(); err != nil {
  46. return err
  47. }
  48. block.Values = make([][]interface{}, block.NumColumns)
  49. if block.NumRows > 10 {
  50. for i := 0; i < int(block.NumColumns); i++ {
  51. block.Values[i] = make([]interface{}, 0, block.NumRows)
  52. }
  53. }
  54. for i := 0; i < int(block.NumColumns); i++ {
  55. var (
  56. value interface{}
  57. columnName string
  58. columnType string
  59. )
  60. if columnName, err = decoder.String(); err != nil {
  61. return err
  62. }
  63. if columnType, err = decoder.String(); err != nil {
  64. return err
  65. }
  66. c, err := column.Factory(columnName, columnType, serverInfo.Timezone)
  67. if err != nil {
  68. return err
  69. }
  70. block.Columns = append(block.Columns, c)
  71. switch column := c.(type) {
  72. case *column.Array:
  73. if block.Values[i], err = column.ReadArray(decoder, int(block.NumRows)); err != nil {
  74. return err
  75. }
  76. case *column.Nullable:
  77. if block.Values[i], err = column.ReadNull(decoder, int(block.NumRows)); err != nil {
  78. return err
  79. }
  80. case *column.Tuple:
  81. if block.Values[i], err = column.ReadTuple(decoder, int(block.NumRows)); err != nil {
  82. return err
  83. }
  84. default:
  85. for row := 0; row < int(block.NumRows); row++ {
  86. if value, err = column.Read(decoder, false); err != nil {
  87. return err
  88. }
  89. block.Values[i] = append(block.Values[i], value)
  90. }
  91. }
  92. }
  93. return nil
  94. }
  95. func (block *Block) writeArray(col column.Column, value Value, num, level int) error {
  96. if level > col.Depth() {
  97. arrColumn, ok := col.(*column.Array)
  98. if ok && strings.Contains(col.CHType(), "Nullable") {
  99. return arrColumn.WriteNull(block.buffers[num].Offset, block.buffers[num].Column, value.Interface())
  100. }
  101. return col.Write(block.buffers[num].Column, value.Interface())
  102. }
  103. switch {
  104. case value.Kind() == reflect.Slice:
  105. if len(block.offsets[num]) < level {
  106. block.offsets[num] = append(block.offsets[num], []int{value.Len()})
  107. } else {
  108. block.offsets[num][level-1] = append(
  109. block.offsets[num][level-1],
  110. block.offsets[num][level-1][len(block.offsets[num][level-1])-1]+value.Len(),
  111. )
  112. }
  113. for i := 0; i < value.Len(); i++ {
  114. if err := block.writeArray(col, value.Index(i), num, level+1); err != nil {
  115. return err
  116. }
  117. }
  118. default:
  119. if err := col.Write(block.buffers[num].Column, value.Interface()); err != nil {
  120. return err
  121. }
  122. }
  123. return nil
  124. }
  125. func (block *Block) AppendRow(args []driver.Value) error {
  126. if len(block.Columns) != len(args) {
  127. return fmt.Errorf("block: expected %d arguments (columns: %s), got %d", len(block.Columns), strings.Join(block.ColumnNames(), ", "), len(args))
  128. }
  129. block.Reserve()
  130. {
  131. block.NumRows++
  132. }
  133. for num, c := range block.Columns {
  134. switch column := c.(type) {
  135. case *column.Array:
  136. if args[num] == nil {
  137. return fmt.Errorf("unsupported [nil] value is passed in argument %d, column is not Nullable", num)
  138. }
  139. value := reflect.ValueOf(args[num])
  140. if value.Kind() != reflect.Slice {
  141. return fmt.Errorf("unsupported Array(T) type [%T]", value.Interface())
  142. }
  143. if err := block.writeArray(c, newValue(value), num, 1); err != nil {
  144. return err
  145. }
  146. case *column.Nullable:
  147. if err := column.WriteNull(block.buffers[num].Offset, block.buffers[num].Column, args[num]); err != nil {
  148. return err
  149. }
  150. default:
  151. if err := column.Write(block.buffers[num].Column, args[num]); err != nil {
  152. return err
  153. }
  154. }
  155. }
  156. return nil
  157. }
  158. func (block *Block) Reserve() {
  159. if len(block.buffers) == 0 {
  160. block.buffers = make([]*buffer, len(block.Columns))
  161. block.offsets = make([]offset, len(block.Columns))
  162. for i := 0; i < len(block.Columns); i++ {
  163. var (
  164. offsetBuffer = new(bytes.Buffer)
  165. columnBuffer = new(bytes.Buffer)
  166. )
  167. block.buffers[i] = &buffer{
  168. Offset: binary.NewEncoder(offsetBuffer),
  169. Column: binary.NewEncoder(columnBuffer),
  170. offsetBuffer: offsetBuffer,
  171. columnBuffer: columnBuffer,
  172. }
  173. }
  174. }
  175. }
  176. func (block *Block) Reset() {
  177. block.NumRows = 0
  178. block.NumColumns = 0
  179. block.Values = block.Values[:0]
  180. block.Columns = block.Columns[:0]
  181. block.info.reset()
  182. for _, buffer := range block.buffers {
  183. buffer.reset()
  184. }
  185. {
  186. block.offsets = nil
  187. block.buffers = nil
  188. }
  189. }
  190. func (block *Block) Write(serverInfo *ServerInfo, encoder *binary.Encoder) error {
  191. if serverInfo.Revision > 0 {
  192. if err := block.info.write(encoder); err != nil {
  193. return err
  194. }
  195. }
  196. if err := encoder.Uvarint(block.NumColumns); err != nil {
  197. return err
  198. }
  199. encoder.Uvarint(block.NumRows)
  200. defer func() {
  201. block.NumRows = 0
  202. for i := range block.offsets {
  203. block.offsets[i] = offset{}
  204. }
  205. }()
  206. for i, column := range block.Columns {
  207. encoder.String(column.Name())
  208. encoder.String(column.CHType())
  209. if len(block.buffers) == len(block.Columns) {
  210. for _, offsets := range block.offsets[i] {
  211. for _, offset := range offsets {
  212. if err := encoder.UInt64(uint64(offset)); err != nil {
  213. return err
  214. }
  215. }
  216. }
  217. if _, err := block.buffers[i].WriteTo(encoder); err != nil {
  218. return err
  219. }
  220. }
  221. }
  222. return nil
  223. }
  224. type blockInfo struct {
  225. num1 uint64
  226. isOverflows bool
  227. num2 uint64
  228. bucketNum int32
  229. num3 uint64
  230. }
  231. func (info *blockInfo) reset() {
  232. info.num1 = 0
  233. info.isOverflows = false
  234. info.num2 = 0
  235. info.bucketNum = 0
  236. info.num3 = 0
  237. }
  238. func (info *blockInfo) read(decoder *binary.Decoder) error {
  239. var err error
  240. if info.num1, err = decoder.Uvarint(); err != nil {
  241. return err
  242. }
  243. if info.isOverflows, err = decoder.Bool(); err != nil {
  244. return err
  245. }
  246. if info.num2, err = decoder.Uvarint(); err != nil {
  247. return err
  248. }
  249. if info.bucketNum, err = decoder.Int32(); err != nil {
  250. return err
  251. }
  252. if info.num3, err = decoder.Uvarint(); err != nil {
  253. return err
  254. }
  255. return nil
  256. }
  257. func (info *blockInfo) write(encoder *binary.Encoder) error {
  258. if err := encoder.Uvarint(1); err != nil {
  259. return err
  260. }
  261. if err := encoder.Bool(info.isOverflows); err != nil {
  262. return err
  263. }
  264. if err := encoder.Uvarint(2); err != nil {
  265. return err
  266. }
  267. if info.bucketNum == 0 {
  268. info.bucketNum = -1
  269. }
  270. if err := encoder.Int32(info.bucketNum); err != nil {
  271. return err
  272. }
  273. if err := encoder.Uvarint(0); err != nil {
  274. return err
  275. }
  276. return nil
  277. }
  278. type buffer struct {
  279. Offset *binary.Encoder
  280. Column *binary.Encoder
  281. offsetBuffer *bytes.Buffer
  282. columnBuffer *bytes.Buffer
  283. }
  284. func (buf *buffer) WriteTo(w io.Writer) (int64, error) {
  285. var size int64
  286. {
  287. ln, err := buf.offsetBuffer.WriteTo(w)
  288. if err != nil {
  289. return size, err
  290. }
  291. size += ln
  292. }
  293. {
  294. ln, err := buf.columnBuffer.WriteTo(w)
  295. if err != nil {
  296. return size, err
  297. }
  298. size += ln
  299. }
  300. return size, nil
  301. }
  302. func (buf *buffer) reset() {
  303. buf.offsetBuffer.Reset()
  304. buf.columnBuffer.Reset()
  305. }