| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- package cos
- import (
- "fmt"
- "hash"
- "io"
- )
- type ProgressEventType int
- const (
- // 数据开始传输
- ProgressStartedEvent ProgressEventType = iota
- // 数据传输中
- ProgressDataEvent
- // 数据传输完成, 但不能表示对应API调用完成
- ProgressCompletedEvent
- // 只有在数据传输时发生错误才会返回
- ProgressFailedEvent
- )
- type ProgressEvent struct {
- EventType ProgressEventType
- RWBytes int64
- ConsumedBytes int64
- TotalBytes int64
- Err error
- }
- func newProgressEvent(eventType ProgressEventType, rwBytes, consumed, total int64, err ...error) *ProgressEvent {
- event := &ProgressEvent{
- EventType: eventType,
- RWBytes: rwBytes,
- ConsumedBytes: consumed,
- TotalBytes: total,
- }
- if len(err) > 0 {
- event.Err = err[0]
- }
- return event
- }
- // 用户自定义Listener需要实现该方法
- type ProgressListener interface {
- ProgressChangedCallback(event *ProgressEvent)
- }
- func progressCallback(listener ProgressListener, event *ProgressEvent) {
- if listener != nil && event != nil {
- listener.ProgressChangedCallback(event)
- }
- }
- type teeReader struct {
- reader io.Reader
- writer io.Writer
- consumedBytes int64
- totalBytes int64
- listener ProgressListener
- }
- func (r *teeReader) Read(p []byte) (int, error) {
- if r.consumedBytes == 0 {
- event := newProgressEvent(ProgressStartedEvent, 0, r.consumedBytes, r.totalBytes)
- progressCallback(r.listener, event)
- }
- n, err := r.reader.Read(p)
- if err != nil && err != io.EOF {
- event := newProgressEvent(ProgressFailedEvent, 0, r.consumedBytes, r.totalBytes, err)
- progressCallback(r.listener, event)
- }
- if n > 0 {
- r.consumedBytes += int64(n)
- if r.writer != nil {
- if n, err := r.writer.Write(p[:n]); err != nil {
- return n, err
- }
- }
- if r.listener != nil {
- event := newProgressEvent(ProgressDataEvent, int64(n), r.consumedBytes, r.totalBytes)
- progressCallback(r.listener, event)
- }
- }
- if err == io.EOF {
- event := newProgressEvent(ProgressCompletedEvent, int64(n), r.consumedBytes, r.totalBytes)
- progressCallback(r.listener, event)
- }
- return n, err
- }
- func (r *teeReader) Close() error {
- if rc, ok := r.reader.(io.ReadCloser); ok {
- return rc.Close()
- }
- return nil
- }
- func (r *teeReader) Size() int64 {
- return r.totalBytes
- }
- func (r *teeReader) Crc64() uint64 {
- if r.writer != nil {
- if th, ok := r.writer.(hash.Hash64); ok {
- return th.Sum64()
- }
- }
- return 0
- }
- func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader {
- return &teeReader{
- reader: reader,
- writer: writer,
- consumedBytes: 0,
- totalBytes: total,
- listener: listener,
- }
- }
- type FixedLengthReader interface {
- io.Reader
- Size() int64
- }
- type DefaultProgressListener struct {
- }
- func (l *DefaultProgressListener) ProgressChangedCallback(event *ProgressEvent) {
- switch event.EventType {
- case ProgressStartedEvent:
- fmt.Printf("Transfer Start [ConsumedBytes/TotalBytes: %d/%d]\n",
- event.ConsumedBytes, event.TotalBytes)
- case ProgressDataEvent:
- fmt.Printf("\rTransfer Data [ConsumedBytes/TotalBytes: %d/%d, %d%%]",
- event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
- case ProgressCompletedEvent:
- fmt.Printf("\nTransfer Complete [ConsumedBytes/TotalBytes: %d/%d]\n",
- event.ConsumedBytes, event.TotalBytes)
- case ProgressFailedEvent:
- fmt.Printf("\nTransfer Failed [ConsumedBytes/TotalBytes: %d/%d] [Err: %v]\n",
- event.ConsumedBytes, event.TotalBytes, event.Err)
- default:
- fmt.Printf("Progress Changed Error: unknown progress event type\n")
- }
- }
|