| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- // Unless explicitly stated otherwise all files in this repository are licensed
- // under the Apache License 2.0.
- // This product includes software developed at Datadog (https://www.datadoghq.com/).
- // Copyright 2021 Datadog, Inc.
- package store
- import (
- "errors"
- "sort"
- enc "github.com/DataDog/sketches-go/ddsketch/encoding"
- "github.com/DataDog/sketches-go/ddsketch/pb/sketchpb"
- )
- type SparseStore struct {
- counts map[int]float64
- }
- func NewSparseStore() *SparseStore {
- return &SparseStore{counts: make(map[int]float64)}
- }
- func (s *SparseStore) Add(index int) {
- s.counts[index]++
- }
- func (s *SparseStore) AddBin(bin Bin) {
- s.AddWithCount(bin.index, bin.count)
- }
- func (s *SparseStore) AddWithCount(index int, count float64) {
- if count == 0 {
- return
- }
- s.counts[index] += count
- }
- func (s *SparseStore) Bins() <-chan Bin {
- orderedBins := s.orderedBins()
- ch := make(chan Bin)
- go func() {
- defer close(ch)
- for _, bin := range orderedBins {
- ch <- bin
- }
- }()
- return ch
- }
- func (s *SparseStore) orderedBins() []Bin {
- bins := make([]Bin, 0, len(s.counts))
- for index, count := range s.counts {
- bins = append(bins, Bin{index: index, count: count})
- }
- sort.Slice(bins, func(i, j int) bool { return bins[i].index < bins[j].index })
- return bins
- }
- func (s *SparseStore) ForEach(f func(index int, count float64) (stop bool)) {
- for index, count := range s.counts {
- if f(index, count) {
- return
- }
- }
- }
- func (s *SparseStore) Copy() Store {
- countsCopy := make(map[int]float64)
- for index, count := range s.counts {
- countsCopy[index] = count
- }
- return &SparseStore{counts: countsCopy}
- }
- func (s *SparseStore) Clear() {
- for index := range s.counts {
- delete(s.counts, index)
- }
- }
- func (s *SparseStore) IsEmpty() bool {
- return len(s.counts) == 0
- }
- func (s *SparseStore) MaxIndex() (int, error) {
- if s.IsEmpty() {
- return 0, errUndefinedMaxIndex
- }
- maxIndex := minInt
- for index := range s.counts {
- if index > maxIndex {
- maxIndex = index
- }
- }
- return maxIndex, nil
- }
- func (s *SparseStore) MinIndex() (int, error) {
- if s.IsEmpty() {
- return 0, errUndefinedMinIndex
- }
- minIndex := maxInt
- for index := range s.counts {
- if index < minIndex {
- minIndex = index
- }
- }
- return minIndex, nil
- }
- func (s *SparseStore) TotalCount() float64 {
- totalCount := float64(0)
- for _, count := range s.counts {
- totalCount += count
- }
- return totalCount
- }
- func (s *SparseStore) KeyAtRank(rank float64) int {
- orderedBins := s.orderedBins()
- cumulCount := float64(0)
- for _, bin := range orderedBins {
- cumulCount += bin.count
- if cumulCount > rank {
- return bin.index
- }
- }
- maxIndex, err := s.MaxIndex()
- if err == nil {
- return maxIndex
- } else {
- // FIXME: make Store's KeyAtRank consistent with MinIndex and MaxIndex
- return 0
- }
- }
- func (s *SparseStore) MergeWith(store Store) {
- for bin := range store.Bins() {
- s.AddBin(bin)
- }
- }
- func (s *SparseStore) ToProto() *sketchpb.Store {
- binCounts := make(map[int32]float64)
- for index, count := range s.counts {
- binCounts[int32(index)] = count
- }
- return &sketchpb.Store{BinCounts: binCounts}
- }
- func (s *SparseStore) Reweight(w float64) error {
- if w <= 0 {
- return errors.New("can't reweight by a negative factor")
- }
- if w == 1 {
- return nil
- }
- for index := range s.counts {
- s.counts[index] *= w
- }
- return nil
- }
- func (s *SparseStore) Encode(b *[]byte, t enc.FlagType) {
- if s.IsEmpty() {
- return
- }
- enc.EncodeFlag(b, enc.NewFlag(t, enc.BinEncodingIndexDeltasAndCounts))
- enc.EncodeUvarint64(b, uint64(len(s.counts)))
- previousIndex := 0
- for index, count := range s.counts {
- enc.EncodeVarint64(b, int64(index-previousIndex))
- enc.EncodeVarfloat64(b, count)
- previousIndex = index
- }
- }
- func (s *SparseStore) DecodeAndMergeWith(b *[]byte, encodingMode enc.SubFlag) error {
- return DecodeAndMergeWith(s, b, encodingMode)
- }
- var _ Store = (*SparseStore)(nil)
|