compress_writer.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. // +build !clz4
  2. package binary
  3. import (
  4. "encoding/binary"
  5. "io"
  6. "github.com/ClickHouse/clickhouse-go/lib/cityhash102"
  7. "github.com/ClickHouse/clickhouse-go/lib/lz4"
  8. )
  9. type compressWriter struct {
  10. writer io.Writer
  11. // data uncompressed
  12. data []byte
  13. // data position
  14. pos int
  15. // data compressed
  16. zdata []byte
  17. }
  18. // NewCompressWriter wrap the io.Writer
  19. func NewCompressWriter(w io.Writer) *compressWriter {
  20. p := &compressWriter{writer: w}
  21. p.data = make([]byte, BlockMaxSize, BlockMaxSize)
  22. zlen := lz4.CompressBound(BlockMaxSize) + HeaderSize
  23. p.zdata = make([]byte, zlen, zlen)
  24. return p
  25. }
  26. func (cw *compressWriter) Write(buf []byte) (int, error) {
  27. var n int
  28. for len(buf) > 0 {
  29. // Accumulate the data to be compressed.
  30. m := copy(cw.data[cw.pos:], buf)
  31. cw.pos += m
  32. buf = buf[m:]
  33. if cw.pos == len(cw.data) {
  34. err := cw.Flush()
  35. if err != nil {
  36. return n, err
  37. }
  38. }
  39. n += m
  40. }
  41. return n, nil
  42. }
  43. func (cw *compressWriter) Flush() (err error) {
  44. if cw.pos == 0 {
  45. return
  46. }
  47. // write the headers
  48. compressedSize, err := lz4.Encode(cw.zdata[HeaderSize:], cw.data[:cw.pos])
  49. if err != nil {
  50. return err
  51. }
  52. compressedSize += CompressHeaderSize
  53. // fill the header, compressed_size_32 + uncompressed_size_32
  54. cw.zdata[16] = LZ4
  55. binary.LittleEndian.PutUint32(cw.zdata[17:], uint32(compressedSize))
  56. binary.LittleEndian.PutUint32(cw.zdata[21:], uint32(cw.pos))
  57. // fill the checksum
  58. checkSum := cityhash102.CityHash128(cw.zdata[16:], uint32(compressedSize))
  59. binary.LittleEndian.PutUint64(cw.zdata[0:], checkSum.Lower64())
  60. binary.LittleEndian.PutUint64(cw.zdata[8:], checkSum.Higher64())
  61. cw.writer.Write(cw.zdata[:compressedSize+ChecksumSize])
  62. if w, ok := cw.writer.(WriteFlusher); ok {
  63. err = w.Flush()
  64. }
  65. cw.pos = 0
  66. return
  67. }