callbacks.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package utp
  2. /*
  3. #include "utp.h"
  4. */
  5. import "C"
  6. import (
  7. "net"
  8. "reflect"
  9. "strings"
  10. "sync/atomic"
  11. "unsafe"
  12. "github.com/anacrolix/log"
  13. )
  14. type utpCallbackArguments C.utp_callback_arguments
  15. func (a *utpCallbackArguments) goContext() *utpContext {
  16. return (*utpContext)(a.context)
  17. }
  18. func (a *utpCallbackArguments) bufBytes() []byte {
  19. return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
  20. uintptr(unsafe.Pointer(a.buf)),
  21. int(a.len),
  22. int(a.len),
  23. }))
  24. }
  25. func (a *utpCallbackArguments) state() C.int {
  26. return *(*C.int)(unsafe.Pointer(&a.anon0))
  27. }
  28. func (a *utpCallbackArguments) error_code() C.int {
  29. return *(*C.int)(unsafe.Pointer(&a.anon0))
  30. }
  31. func (a *utpCallbackArguments) address() *C.struct_sockaddr {
  32. return *(**C.struct_sockaddr)(unsafe.Pointer(&a.anon0[0]))
  33. }
  34. func (a *utpCallbackArguments) addressLen() C.socklen_t {
  35. return *(*C.socklen_t)(unsafe.Pointer(&a.anon1[0]))
  36. }
  37. var sends int64
  38. //export sendtoCallback
  39. func sendtoCallback(a *utpCallbackArguments) (ret C.uint64) {
  40. s := getSocketForLibContext(a.goContext())
  41. b := a.bufBytes()
  42. var sendToUdpAddr net.UDPAddr
  43. if err := structSockaddrToUDPAddr(a.address(), &sendToUdpAddr); err != nil {
  44. panic(err)
  45. }
  46. newSends := atomic.AddInt64(&sends, 1)
  47. if logCallbacks {
  48. s.logger.Printf("sending %d bytes, %d packets", len(b), newSends)
  49. }
  50. expMap.Add("socket PacketConn writes", 1)
  51. n, err := s.pc.WriteTo(b, &sendToUdpAddr)
  52. c := s.conns[a.socket]
  53. if err != nil {
  54. expMap.Add("socket PacketConn write errors", 1)
  55. if c != nil && c.userOnError != nil {
  56. go c.userOnError(err)
  57. } else if c != nil &&
  58. (strings.Contains(err.Error(), "can't assign requested address") ||
  59. strings.Contains(err.Error(), "invalid argument")) {
  60. // Should be an bad argument or network configuration problem we
  61. // can't recover from.
  62. c.onError(err)
  63. } else if c != nil && strings.Contains(err.Error(), "operation not permitted") {
  64. // Rate-limited. Probably Linux. The implementation might try
  65. // again later.
  66. } else {
  67. s.logger.Levelf(log.Debug, "error sending packet: %v", err)
  68. }
  69. return
  70. }
  71. if n != len(b) {
  72. expMap.Add("socket PacketConn short writes", 1)
  73. s.logger.Printf("expected to send %d bytes but only sent %d", len(b), n)
  74. }
  75. return
  76. }
  77. //export errorCallback
  78. func errorCallback(a *utpCallbackArguments) C.uint64 {
  79. s := getSocketForLibContext(a.goContext())
  80. err := errorForCode(a.error_code())
  81. if logCallbacks {
  82. s.logger.Printf("error callback: socket %p: %s", a.socket, err)
  83. }
  84. libContextToSocket[a.goContext()].conns[a.socket].onError(err)
  85. return 0
  86. }
  87. //export logCallback
  88. func logCallback(a *utpCallbackArguments) C.uint64 {
  89. s := getSocketForLibContext(a.goContext())
  90. s.logger.Printf("libutp: %s", C.GoString((*C.char)(unsafe.Pointer(a.buf))))
  91. return 0
  92. }
  93. //export stateChangeCallback
  94. func stateChangeCallback(a *utpCallbackArguments) C.uint64 {
  95. s := libContextToSocket[a.goContext()]
  96. c := s.conns[a.socket]
  97. if logCallbacks {
  98. s.logger.Printf("state changed: conn %p: %s", c, libStateName(a.state()))
  99. }
  100. switch a.state() {
  101. case C.UTP_STATE_CONNECT:
  102. c.setConnected()
  103. // A dialled connection will not tell the remote it's ready until it
  104. // writes. If the dialer has no intention of writing, this will stall
  105. // everything. We do an empty write to get things rolling again. This
  106. // circumstance occurs when c1 in the RacyRead nettest is the dialer.
  107. C.utp_write(a.socket, nil, 0)
  108. case C.UTP_STATE_WRITABLE:
  109. c.cond.Broadcast()
  110. case C.UTP_STATE_EOF:
  111. c.setGotEOF()
  112. case C.UTP_STATE_DESTROYING:
  113. c.onDestroyed()
  114. s.onLibSocketDestroyed(a.socket)
  115. default:
  116. panic(a.state)
  117. }
  118. return 0
  119. }
  120. //export readCallback
  121. func readCallback(a *utpCallbackArguments) C.uint64 {
  122. s := libContextToSocket[a.goContext()]
  123. c := s.conns[a.socket]
  124. b := a.bufBytes()
  125. if logCallbacks {
  126. s.logger.Printf("read callback: conn %p: %d bytes", c, len(b))
  127. }
  128. if len(b) == 0 {
  129. panic("that will break the read drain invariant")
  130. }
  131. c.readBuf.Write(b)
  132. c.cond.Broadcast()
  133. return 0
  134. }
  135. //export acceptCallback
  136. func acceptCallback(a *utpCallbackArguments) C.uint64 {
  137. s := getSocketForLibContext(a.goContext())
  138. if logCallbacks {
  139. s.logger.Printf("accept callback: %#v", *a)
  140. }
  141. c := s.newConn(a.socket)
  142. c.setRemoteAddr()
  143. s.pushBacklog(c)
  144. return 0
  145. }
  146. //export getReadBufferSizeCallback
  147. func getReadBufferSizeCallback(a *utpCallbackArguments) (ret C.uint64) {
  148. s := libContextToSocket[a.goContext()]
  149. c := s.conns[a.socket]
  150. if c == nil {
  151. // socket hasn't been added to the Socket.conns yet. The read buffer
  152. // starts out empty, and the default implementation for this callback
  153. // returns 0, so we'll return that.
  154. return 0
  155. }
  156. ret = C.uint64(c.readBuf.Len())
  157. return
  158. }
  159. //export firewallCallback
  160. func firewallCallback(a *utpCallbackArguments) C.uint64 {
  161. s := getSocketForLibContext(a.goContext())
  162. if s.syncFirewallCallback != nil {
  163. var addr net.UDPAddr
  164. structSockaddrToUDPAddr(a.address(), &addr)
  165. if s.syncFirewallCallback(&addr) {
  166. return 1
  167. }
  168. } else if s.asyncBlock {
  169. return 1
  170. }
  171. return 0
  172. }