array.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package column
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "strings"
  8. "time"
  9. "github.com/ClickHouse/clickhouse-go/lib/binary"
  10. )
  11. type columnDecoder func() (interface{}, error)
  12. var unsupportedArrayTypeErrTemp = "unsupported Array type '%s'"
  13. // If you add Nullable type, that can be used in Array(Nullable(T)) add this type to ../codegen/nullable_appender/main.go in structure values.Types.
  14. // Run code generation.
  15. //go:generate go run ../codegen/nullable_appender -package $GOPACKAGE -file nullable_appender.go
  16. type Array struct {
  17. base
  18. depth int
  19. column Column
  20. nullable bool
  21. }
  22. func (array *Array) Read(decoder *binary.Decoder, isNull bool) (interface{}, error) {
  23. return nil, fmt.Errorf("do not use Read method for Array(T) column")
  24. }
  25. func (array *Array) WriteNull(nulls, encoder *binary.Encoder, v interface{}) error {
  26. if array.nullable {
  27. column, ok := array.column.(*Nullable)
  28. if !ok {
  29. return fmt.Errorf("cannot convert to nullable type")
  30. }
  31. return column.WriteNull(nulls, encoder, v)
  32. }
  33. return fmt.Errorf("write null to not nullable array")
  34. }
  35. func (array *Array) Write(encoder *binary.Encoder, v interface{}) error {
  36. return array.column.Write(encoder, v)
  37. }
  38. func (array *Array) ReadArray(decoder *binary.Decoder, rows int) (_ []interface{}, err error) {
  39. var (
  40. offsets = make([][]uint64, array.depth)
  41. values = make([]interface{}, rows)
  42. )
  43. // Read offsets
  44. lastOffset := uint64(rows)
  45. for i := 0; i < array.depth; i++ {
  46. offset := make([]uint64, lastOffset)
  47. for j := uint64(0); j < lastOffset; j++ {
  48. if offset[j], err = decoder.UInt64(); err != nil {
  49. return nil, err
  50. }
  51. }
  52. offsets[i] = offset
  53. lastOffset = 0
  54. if len(offset) > 0 {
  55. lastOffset = offset[len(offset)-1]
  56. }
  57. }
  58. var cd columnDecoder
  59. switch column := array.column.(type) {
  60. case *Nullable:
  61. nullRows, err := column.ReadNull(decoder, int(lastOffset))
  62. if err != nil {
  63. return nil, err
  64. }
  65. cd = func(rows []interface{}) columnDecoder {
  66. i := 0
  67. return func() (interface{}, error) {
  68. if i > len(rows) {
  69. return nil, errors.New("not enough rows to return while parsing Null column")
  70. }
  71. ret := rows[i]
  72. i++
  73. return ret, nil
  74. }
  75. }(nullRows)
  76. case *Tuple:
  77. tupleRows, err := column.ReadTuple(decoder, int(lastOffset))
  78. if err != nil {
  79. return nil, err
  80. }
  81. // closure to return fully assembled tuple values as if they
  82. // were decoded one at a time
  83. cd = func(rows []interface{}) columnDecoder {
  84. i := 0
  85. return func() (interface{}, error) {
  86. if i > len(rows) {
  87. return nil, errors.New("not enough rows to return while parsing Tuple column")
  88. }
  89. ret := rows[i]
  90. i++
  91. return ret, nil
  92. }
  93. }(tupleRows)
  94. default:
  95. cd = func(decoder *binary.Decoder) columnDecoder {
  96. return func() (interface{}, error) { return array.column.Read(decoder, array.nullable) }
  97. }(decoder)
  98. }
  99. // Read values
  100. for i := 0; i < rows; i++ {
  101. if values[i], err = array.read(cd, offsets, uint64(i), 0); err != nil {
  102. return nil, err
  103. }
  104. }
  105. return values, nil
  106. }
  107. func (array *Array) read(readColumn columnDecoder, offsets [][]uint64, index uint64, level int) (interface{}, error) {
  108. end := offsets[level][index]
  109. start := uint64(0)
  110. if index > 0 {
  111. start = offsets[level][index-1]
  112. }
  113. scanT := array.column.ScanType()
  114. slice := reflect.MakeSlice(array.arrayType(level), 0, int(end-start))
  115. for i := start; i < end; i++ {
  116. var (
  117. value interface{}
  118. err error
  119. )
  120. if level == array.depth-1 {
  121. value, err = readColumn()
  122. } else {
  123. value, err = array.read(readColumn, offsets, i, level+1)
  124. }
  125. if err != nil {
  126. return nil, err
  127. }
  128. if array.nullable && level == array.depth-1 {
  129. f, ok := nullableAppender[scanT.String()]
  130. if !ok {
  131. return nil, fmt.Errorf(unsupportedArrayTypeErrTemp, scanT.String())
  132. }
  133. cSlice, err := f(value, slice)
  134. if err != nil {
  135. return nil, err
  136. }
  137. slice = cSlice
  138. } else {
  139. slice = reflect.Append(slice, reflect.ValueOf(value))
  140. }
  141. }
  142. return slice.Interface(), nil
  143. }
  144. func (array *Array) arrayType(level int) reflect.Type {
  145. t := array.column.ScanType()
  146. for i := 0; i < array.depth-level; i++ {
  147. t = reflect.SliceOf(t)
  148. }
  149. return t
  150. }
  151. func (array *Array) Depth() int {
  152. return array.depth
  153. }
  154. func parseArray(name, chType string, timezone *time.Location) (*Array, error) {
  155. if len(chType) < 11 {
  156. return nil, fmt.Errorf("invalid Array column type: %s", chType)
  157. }
  158. var (
  159. depth int
  160. columnType = chType
  161. )
  162. loop:
  163. for _, str := range strings.Split(chType, "Array(") {
  164. switch {
  165. case len(str) == 0:
  166. depth++
  167. default:
  168. chType = str[:len(str)-depth]
  169. break loop
  170. }
  171. }
  172. column, err := Factory(name, chType, timezone)
  173. if err != nil {
  174. return nil, fmt.Errorf("Array(T): %v", err)
  175. }
  176. var scanType interface{}
  177. switch t := column.ScanType(); t {
  178. case arrayBaseTypes[int8(0)]:
  179. scanType = []int8{}
  180. case arrayBaseTypes[int16(0)]:
  181. scanType = []int16{}
  182. case arrayBaseTypes[int32(0)]:
  183. scanType = []int32{}
  184. case arrayBaseTypes[int64(0)]:
  185. scanType = []int64{}
  186. case arrayBaseTypes[uint8(0)]:
  187. scanType = []uint8{}
  188. case arrayBaseTypes[uint16(0)]:
  189. scanType = []uint16{}
  190. case arrayBaseTypes[uint32(0)]:
  191. scanType = []uint32{}
  192. case arrayBaseTypes[uint64(0)]:
  193. scanType = []uint64{}
  194. case arrayBaseTypes[float32(0)]:
  195. scanType = []float32{}
  196. case arrayBaseTypes[float64(0)]:
  197. scanType = []float64{}
  198. case arrayBaseTypes[string("")]:
  199. scanType = []string{}
  200. case arrayBaseTypes[time.Time{}]:
  201. scanType = []time.Time{}
  202. case arrayBaseTypes[IPv4{}], arrayBaseTypes[IPv6{}]:
  203. scanType = []net.IP{}
  204. case reflect.ValueOf([]interface{}{}).Type():
  205. scanType = [][]interface{}{}
  206. //nullable
  207. case arrayBaseTypes[ptrInt8T]:
  208. scanType = []*int8{}
  209. case arrayBaseTypes[ptrInt16T]:
  210. scanType = []*int16{}
  211. case arrayBaseTypes[ptrInt32T]:
  212. scanType = []*int32{}
  213. case arrayBaseTypes[ptrInt64T]:
  214. scanType = []*int64{}
  215. case arrayBaseTypes[ptrUInt8T]:
  216. scanType = []*uint8{}
  217. case arrayBaseTypes[ptrUInt16T]:
  218. scanType = []*uint16{}
  219. case arrayBaseTypes[ptrUInt32T]:
  220. scanType = []*uint32{}
  221. case arrayBaseTypes[ptrUInt64T]:
  222. scanType = []*uint64{}
  223. case arrayBaseTypes[ptrFloat32]:
  224. scanType = []*float32{}
  225. case arrayBaseTypes[ptrFloat64]:
  226. scanType = []*float64{}
  227. case arrayBaseTypes[ptrString]:
  228. scanType = []*string{}
  229. case arrayBaseTypes[ptrTime]:
  230. scanType = []*time.Time{}
  231. case arrayBaseTypes[ptrIPv4], arrayBaseTypes[ptrIPv6]:
  232. scanType = []*net.IP{}
  233. default:
  234. return nil, fmt.Errorf(unsupportedArrayTypeErrTemp, column.ScanType().Name())
  235. }
  236. return &Array{
  237. base: base{
  238. name: name,
  239. chType: columnType,
  240. valueOf: reflect.ValueOf(scanType),
  241. },
  242. depth: depth,
  243. column: column,
  244. nullable: strings.HasPrefix(column.CHType(), "Nullable"),
  245. }, nil
  246. }