compress_reader.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. // +build !clz4
  2. package binary
  3. import (
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "github.com/ClickHouse/clickhouse-go/lib/lz4"
  8. )
  9. type compressReader struct {
  10. reader io.Reader
  11. // data uncompressed
  12. data []byte
  13. // data position
  14. pos int
  15. // data compressed
  16. zdata []byte
  17. // lz4 headers
  18. header []byte
  19. }
  20. // NewCompressReader wrap the io.Reader
  21. func NewCompressReader(r io.Reader) *compressReader {
  22. p := &compressReader{
  23. reader: r,
  24. header: make([]byte, HeaderSize),
  25. }
  26. p.data = make([]byte, BlockMaxSize, BlockMaxSize)
  27. zlen := lz4.CompressBound(BlockMaxSize) + HeaderSize
  28. p.zdata = make([]byte, zlen, zlen)
  29. p.pos = len(p.data)
  30. return p
  31. }
  32. func (cr *compressReader) Read(buf []byte) (n int, err error) {
  33. var bytesRead = 0
  34. n = len(buf)
  35. if cr.pos < len(cr.data) {
  36. copyedSize := copy(buf, cr.data[cr.pos:])
  37. bytesRead += copyedSize
  38. cr.pos += copyedSize
  39. }
  40. for bytesRead < n {
  41. if err = cr.readCompressedData(); err != nil {
  42. return bytesRead, err
  43. }
  44. copyedSize := copy(buf[bytesRead:], cr.data)
  45. bytesRead += copyedSize
  46. cr.pos = copyedSize
  47. }
  48. return n, nil
  49. }
  50. func (cr *compressReader) readCompressedData() (err error) {
  51. cr.pos = 0
  52. var n int
  53. n, err = cr.reader.Read(cr.header)
  54. if err != nil {
  55. return
  56. }
  57. if n != len(cr.header) {
  58. return fmt.Errorf("Lz4 decompression header EOF")
  59. }
  60. compressedSize := int(binary.LittleEndian.Uint32(cr.header[17:])) - 9
  61. decompressedSize := int(binary.LittleEndian.Uint32(cr.header[21:]))
  62. if compressedSize > cap(cr.zdata) {
  63. cr.zdata = make([]byte, compressedSize)
  64. }
  65. if decompressedSize > cap(cr.data) {
  66. cr.data = make([]byte, decompressedSize)
  67. }
  68. cr.zdata = cr.zdata[:compressedSize]
  69. cr.data = cr.data[:decompressedSize]
  70. // @TODO checksum
  71. if cr.header[16] == LZ4 {
  72. n, err = cr.reader.Read(cr.zdata)
  73. if err != nil {
  74. return
  75. }
  76. if n != len(cr.zdata) {
  77. return fmt.Errorf("Decompress read size not match")
  78. }
  79. _, err = lz4.Decode(cr.data, cr.zdata)
  80. if err != nil {
  81. return
  82. }
  83. } else {
  84. return fmt.Errorf("Unknown compression method: 0x%02x ", cr.header[16])
  85. }
  86. return nil
  87. }