peer-conn-msg-writer.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package torrent
  2. import (
  3. "bytes"
  4. "io"
  5. "time"
  6. "github.com/anacrolix/chansync"
  7. "github.com/anacrolix/log"
  8. "github.com/anacrolix/sync"
  9. pp "github.com/anacrolix/torrent/peer_protocol"
  10. )
  11. func (pc *PeerConn) initMessageWriter() {
  12. w := &pc.messageWriter
  13. *w = peerConnMsgWriter{
  14. fillWriteBuffer: func() {
  15. pc.locker().Lock()
  16. defer pc.locker().Unlock()
  17. if pc.closed.IsSet() {
  18. return
  19. }
  20. pc.fillWriteBuffer()
  21. },
  22. closed: &pc.closed,
  23. logger: pc.logger,
  24. w: pc.w,
  25. keepAlive: func() bool {
  26. pc.locker().RLock()
  27. defer pc.locker().RUnlock()
  28. return pc.useful()
  29. },
  30. writeBuffer: new(bytes.Buffer),
  31. }
  32. }
  33. func (pc *PeerConn) startMessageWriter() {
  34. pc.initMessageWriter()
  35. go pc.messageWriterRunner()
  36. }
  37. func (pc *PeerConn) messageWriterRunner() {
  38. defer pc.locker().Unlock()
  39. defer pc.close()
  40. defer pc.locker().Lock()
  41. pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
  42. }
  43. type peerConnMsgWriter struct {
  44. // Must not be called with the local mutex held, as it will call back into the write method.
  45. fillWriteBuffer func()
  46. closed *chansync.SetOnce
  47. logger log.Logger
  48. w io.Writer
  49. keepAlive func() bool
  50. mu sync.Mutex
  51. writeCond chansync.BroadcastCond
  52. // Pointer so we can swap with the "front buffer".
  53. writeBuffer *bytes.Buffer
  54. }
  55. // Routine that writes to the peer. Some of what to write is buffered by
  56. // activity elsewhere in the Client, and some is determined locally when the
  57. // connection is writable.
  58. func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
  59. lastWrite := time.Now()
  60. keepAliveTimer := time.NewTimer(keepAliveTimeout)
  61. frontBuf := new(bytes.Buffer)
  62. for {
  63. if cn.closed.IsSet() {
  64. return
  65. }
  66. cn.fillWriteBuffer()
  67. keepAlive := cn.keepAlive()
  68. cn.mu.Lock()
  69. if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
  70. cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
  71. torrent.Add("written keepalives", 1)
  72. }
  73. if cn.writeBuffer.Len() == 0 {
  74. writeCond := cn.writeCond.Signaled()
  75. cn.mu.Unlock()
  76. select {
  77. case <-cn.closed.Done():
  78. case <-writeCond:
  79. case <-keepAliveTimer.C:
  80. }
  81. continue
  82. }
  83. // Flip the buffers.
  84. frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
  85. cn.mu.Unlock()
  86. if frontBuf.Len() == 0 {
  87. panic("expected non-empty front buffer")
  88. }
  89. var err error
  90. for frontBuf.Len() != 0 {
  91. // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59.
  92. next := frontBuf.Next(1<<16 - 1)
  93. var n int
  94. n, err = cn.w.Write(next)
  95. if err == nil && n != len(next) {
  96. panic("expected full write")
  97. }
  98. if err != nil {
  99. break
  100. }
  101. }
  102. if err != nil {
  103. cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
  104. return
  105. }
  106. lastWrite = time.Now()
  107. keepAliveTimer.Reset(keepAliveTimeout)
  108. }
  109. }
  110. func (cn *peerConnMsgWriter) writeToBuffer(msg pp.Message) (err error) {
  111. originalLen := cn.writeBuffer.Len()
  112. defer func() {
  113. if err != nil {
  114. // Since an error occurred during buffer write, revert buffer to its original state before the write.
  115. cn.writeBuffer.Truncate(originalLen)
  116. }
  117. }()
  118. return msg.WriteTo(cn.writeBuffer)
  119. }
  120. func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
  121. cn.mu.Lock()
  122. defer cn.mu.Unlock()
  123. cn.writeToBuffer(msg)
  124. cn.writeCond.Broadcast()
  125. return !cn.writeBufferFull()
  126. }
  127. func (cn *peerConnMsgWriter) writeBufferFull() bool {
  128. return cn.writeBuffer.Len() >= writeBufferHighWaterLen
  129. }