tbf.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package vnet
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. const (
  7. // Bit is a single bit
  8. Bit = 1
  9. // KBit is a kilobit
  10. KBit = 1000 * Bit
  11. // MBit is a Megabit
  12. MBit = 1000 * KBit
  13. )
  14. // TokenBucketFilter implements a token bucket rate limit algorithm.
  15. type TokenBucketFilter struct {
  16. NIC
  17. currentTokensInBucket int
  18. c chan Chunk
  19. queue *chunkQueue
  20. queueSize int // in bytes
  21. mutex sync.Mutex
  22. rate int
  23. maxBurst int
  24. wg sync.WaitGroup
  25. done chan struct{}
  26. }
  27. // TBFOption is the option type to configure a TokenBucketFilter
  28. type TBFOption func(*TokenBucketFilter) TBFOption
  29. // TBFQueueSizeInBytes sets the max number of bytes waiting in the queue. Can
  30. // only be set in constructor before using the TBF.
  31. func TBFQueueSizeInBytes(bytes int) TBFOption {
  32. return func(t *TokenBucketFilter) TBFOption {
  33. prev := t.queueSize
  34. t.queueSize = bytes
  35. return TBFQueueSizeInBytes(prev)
  36. }
  37. }
  38. // TBFRate sets the bitrate of a TokenBucketFilter
  39. func TBFRate(rate int) TBFOption {
  40. return func(t *TokenBucketFilter) TBFOption {
  41. t.mutex.Lock()
  42. defer t.mutex.Unlock()
  43. previous := t.rate
  44. t.rate = rate
  45. return TBFRate(previous)
  46. }
  47. }
  48. // TBFMaxBurst sets the bucket size of the token bucket filter. This is the
  49. // maximum size that can instantly leave the filter, if the bucket is full.
  50. func TBFMaxBurst(size int) TBFOption {
  51. return func(t *TokenBucketFilter) TBFOption {
  52. t.mutex.Lock()
  53. defer t.mutex.Unlock()
  54. previous := t.maxBurst
  55. t.maxBurst = size
  56. return TBFMaxBurst(previous)
  57. }
  58. }
  59. // Set updates a setting on the token bucket filter
  60. func (t *TokenBucketFilter) Set(opts ...TBFOption) (previous TBFOption) {
  61. for _, opt := range opts {
  62. previous = opt(t)
  63. }
  64. return previous
  65. }
  66. // NewTokenBucketFilter creates and starts a new TokenBucketFilter
  67. func NewTokenBucketFilter(n NIC, opts ...TBFOption) (*TokenBucketFilter, error) {
  68. tbf := &TokenBucketFilter{
  69. NIC: n,
  70. currentTokensInBucket: 0,
  71. c: make(chan Chunk),
  72. queue: nil,
  73. queueSize: 50000,
  74. mutex: sync.Mutex{},
  75. rate: 1 * MBit,
  76. maxBurst: 2 * KBit,
  77. wg: sync.WaitGroup{},
  78. done: make(chan struct{}),
  79. }
  80. tbf.Set(opts...)
  81. tbf.queue = newChunkQueue(0, tbf.queueSize)
  82. tbf.wg.Add(1)
  83. go tbf.run()
  84. return tbf, nil
  85. }
  86. func (t *TokenBucketFilter) onInboundChunk(c Chunk) {
  87. t.c <- c
  88. }
  89. func (t *TokenBucketFilter) run() {
  90. defer t.wg.Done()
  91. ticker := time.NewTicker(1 * time.Millisecond)
  92. for {
  93. select {
  94. case <-t.done:
  95. ticker.Stop()
  96. t.drainQueue()
  97. return
  98. case <-ticker.C:
  99. t.mutex.Lock()
  100. if t.currentTokensInBucket < t.maxBurst {
  101. // add (bitrate * S) / 1000 converted to bytes (divide by 8) S
  102. // is the update interval in milliseconds
  103. t.currentTokensInBucket += (t.rate / 1000) / 8
  104. }
  105. t.mutex.Unlock()
  106. t.drainQueue()
  107. case chunk := <-t.c:
  108. t.queue.push(chunk)
  109. t.drainQueue()
  110. }
  111. }
  112. }
  113. func (t *TokenBucketFilter) drainQueue() {
  114. for {
  115. next := t.queue.peek()
  116. if next == nil {
  117. break
  118. }
  119. tokens := len(next.UserData())
  120. if t.currentTokensInBucket < tokens {
  121. break
  122. }
  123. t.queue.pop()
  124. t.NIC.onInboundChunk(next)
  125. t.currentTokensInBucket -= tokens
  126. }
  127. }
  128. // Close closes and stops the token bucket filter queue
  129. func (t *TokenBucketFilter) Close() error {
  130. close(t.done)
  131. t.wg.Wait()
  132. return nil
  133. }