reader.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. /*
  2. Copyright (c) 2014-2015 VMware, Inc. All Rights Reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package progress
  14. import (
  15. "container/list"
  16. "context"
  17. "fmt"
  18. "io"
  19. "sync/atomic"
  20. "time"
  21. )
  22. type readerReport struct {
  23. pos int64 // Keep first to ensure 64-bit alignment
  24. size int64 // Keep first to ensure 64-bit alignment
  25. bps *uint64 // Keep first to ensure 64-bit alignment
  26. t time.Time
  27. err error
  28. }
  29. func (r readerReport) Percentage() float32 {
  30. if r.size <= 0 {
  31. return 0
  32. }
  33. return 100.0 * float32(r.pos) / float32(r.size)
  34. }
  35. func (r readerReport) Detail() string {
  36. const (
  37. KiB = 1024
  38. MiB = 1024 * KiB
  39. GiB = 1024 * MiB
  40. )
  41. // Use the reader's bps field, so this report returns an up-to-date number.
  42. //
  43. // For example: if there hasn't been progress for the last 5 seconds, the
  44. // most recent report should return "0B/s".
  45. //
  46. bps := atomic.LoadUint64(r.bps)
  47. switch {
  48. case bps >= GiB:
  49. return fmt.Sprintf("%.1fGiB/s", float32(bps)/float32(GiB))
  50. case bps >= MiB:
  51. return fmt.Sprintf("%.1fMiB/s", float32(bps)/float32(MiB))
  52. case bps >= KiB:
  53. return fmt.Sprintf("%.1fKiB/s", float32(bps)/float32(KiB))
  54. default:
  55. return fmt.Sprintf("%dB/s", bps)
  56. }
  57. }
  58. func (p readerReport) Error() error {
  59. return p.err
  60. }
  61. // reader wraps an io.Reader and sends a progress report over a channel for
  62. // every read it handles.
  63. type reader struct {
  64. r io.Reader
  65. pos int64
  66. size int64
  67. bps uint64
  68. ch chan<- Report
  69. ctx context.Context
  70. }
  71. func NewReader(ctx context.Context, s Sinker, r io.Reader, size int64) *reader {
  72. pr := reader{
  73. r: r,
  74. ctx: ctx,
  75. size: size,
  76. }
  77. // Reports must be sent downstream and to the bps computation loop.
  78. pr.ch = Tee(s, newBpsLoop(&pr.bps)).Sink()
  79. return &pr
  80. }
  81. // Read calls the Read function on the underlying io.Reader. Additionally,
  82. // every read causes a progress report to be sent to the progress reader's
  83. // underlying channel.
  84. func (r *reader) Read(b []byte) (int, error) {
  85. n, err := r.r.Read(b)
  86. r.pos += int64(n)
  87. if err != nil && err != io.EOF {
  88. return n, err
  89. }
  90. q := readerReport{
  91. t: time.Now(),
  92. pos: r.pos,
  93. size: r.size,
  94. bps: &r.bps,
  95. }
  96. select {
  97. case r.ch <- q:
  98. case <-r.ctx.Done():
  99. }
  100. return n, err
  101. }
  102. // Done marks the progress reader as done, optionally including an error in the
  103. // progress report. After sending it, the underlying channel is closed.
  104. func (r *reader) Done(err error) {
  105. q := readerReport{
  106. t: time.Now(),
  107. pos: r.pos,
  108. size: r.size,
  109. bps: &r.bps,
  110. err: err,
  111. }
  112. select {
  113. case r.ch <- q:
  114. close(r.ch)
  115. case <-r.ctx.Done():
  116. }
  117. }
  118. // newBpsLoop returns a sink that monitors and stores throughput.
  119. func newBpsLoop(dst *uint64) SinkFunc {
  120. fn := func() chan<- Report {
  121. sink := make(chan Report)
  122. go bpsLoop(sink, dst)
  123. return sink
  124. }
  125. return fn
  126. }
  127. func bpsLoop(ch <-chan Report, dst *uint64) {
  128. l := list.New()
  129. for {
  130. var tch <-chan time.Time
  131. // Setup timer for front of list to become stale.
  132. if e := l.Front(); e != nil {
  133. dt := time.Second - time.Since(e.Value.(readerReport).t)
  134. tch = time.After(dt)
  135. }
  136. select {
  137. case q, ok := <-ch:
  138. if !ok {
  139. return
  140. }
  141. l.PushBack(q)
  142. case <-tch:
  143. l.Remove(l.Front())
  144. }
  145. // Compute new bps
  146. if l.Len() == 0 {
  147. atomic.StoreUint64(dst, 0)
  148. } else {
  149. f := l.Front().Value.(readerReport)
  150. b := l.Back().Value.(readerReport)
  151. atomic.StoreUint64(dst, uint64(b.pos-f.pos))
  152. }
  153. }
  154. }