chunk_queue.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package vnet
  2. import (
  3. "sync"
  4. )
  5. type chunkQueue struct {
  6. chunks []Chunk
  7. maxSize int // 0 or negative value: unlimited
  8. maxBytes int // 0 or negative value: unlimited
  9. currentBytes int
  10. mutex sync.RWMutex
  11. }
  12. func newChunkQueue(maxSize int, maxBytes int) *chunkQueue {
  13. return &chunkQueue{
  14. chunks: []Chunk{},
  15. maxSize: maxSize,
  16. maxBytes: maxBytes,
  17. currentBytes: 0,
  18. mutex: sync.RWMutex{},
  19. }
  20. }
  21. func (q *chunkQueue) push(c Chunk) bool {
  22. q.mutex.Lock()
  23. defer q.mutex.Unlock()
  24. if q.maxSize > 0 && len(q.chunks) >= q.maxSize {
  25. return false // dropped
  26. }
  27. if q.maxBytes > 0 && q.currentBytes+len(c.UserData()) >= q.maxBytes {
  28. return false
  29. }
  30. q.currentBytes += len(c.UserData())
  31. q.chunks = append(q.chunks, c)
  32. return true
  33. }
  34. func (q *chunkQueue) pop() (Chunk, bool) {
  35. q.mutex.Lock()
  36. defer q.mutex.Unlock()
  37. if len(q.chunks) == 0 {
  38. return nil, false
  39. }
  40. c := q.chunks[0]
  41. q.chunks = q.chunks[1:]
  42. q.currentBytes -= len(c.UserData())
  43. return c, true
  44. }
  45. func (q *chunkQueue) peek() Chunk {
  46. q.mutex.RLock()
  47. defer q.mutex.RUnlock()
  48. if len(q.chunks) == 0 {
  49. return nil
  50. }
  51. return q.chunks[0]
  52. }