| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package utp
- /*
- #include "utp.h"
- */
- import "C"
- import (
- "net"
- "reflect"
- "strings"
- "sync/atomic"
- "unsafe"
- "github.com/anacrolix/log"
- )
- type utpCallbackArguments C.utp_callback_arguments
- func (a *utpCallbackArguments) goContext() *utpContext {
- return (*utpContext)(a.context)
- }
- func (a *utpCallbackArguments) bufBytes() []byte {
- return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
- uintptr(unsafe.Pointer(a.buf)),
- int(a.len),
- int(a.len),
- }))
- }
- func (a *utpCallbackArguments) state() C.int {
- return *(*C.int)(unsafe.Pointer(&a.anon0))
- }
- func (a *utpCallbackArguments) error_code() C.int {
- return *(*C.int)(unsafe.Pointer(&a.anon0))
- }
- func (a *utpCallbackArguments) address() *C.struct_sockaddr {
- return *(**C.struct_sockaddr)(unsafe.Pointer(&a.anon0[0]))
- }
- func (a *utpCallbackArguments) addressLen() C.socklen_t {
- return *(*C.socklen_t)(unsafe.Pointer(&a.anon1[0]))
- }
- var sends int64
- //export sendtoCallback
- func sendtoCallback(a *utpCallbackArguments) (ret C.uint64) {
- s := getSocketForLibContext(a.goContext())
- b := a.bufBytes()
- var sendToUdpAddr net.UDPAddr
- if err := structSockaddrToUDPAddr(a.address(), &sendToUdpAddr); err != nil {
- panic(err)
- }
- newSends := atomic.AddInt64(&sends, 1)
- if logCallbacks {
- s.logger.Printf("sending %d bytes, %d packets", len(b), newSends)
- }
- expMap.Add("socket PacketConn writes", 1)
- n, err := s.pc.WriteTo(b, &sendToUdpAddr)
- c := s.conns[a.socket]
- if err != nil {
- expMap.Add("socket PacketConn write errors", 1)
- if c != nil && c.userOnError != nil {
- go c.userOnError(err)
- } else if c != nil &&
- (strings.Contains(err.Error(), "can't assign requested address") ||
- strings.Contains(err.Error(), "invalid argument")) {
- // Should be an bad argument or network configuration problem we
- // can't recover from.
- c.onError(err)
- } else if c != nil && strings.Contains(err.Error(), "operation not permitted") {
- // Rate-limited. Probably Linux. The implementation might try
- // again later.
- } else {
- s.logger.Levelf(log.Debug, "error sending packet: %v", err)
- }
- return
- }
- if n != len(b) {
- expMap.Add("socket PacketConn short writes", 1)
- s.logger.Printf("expected to send %d bytes but only sent %d", len(b), n)
- }
- return
- }
- //export errorCallback
- func errorCallback(a *utpCallbackArguments) C.uint64 {
- s := getSocketForLibContext(a.goContext())
- err := errorForCode(a.error_code())
- if logCallbacks {
- s.logger.Printf("error callback: socket %p: %s", a.socket, err)
- }
- libContextToSocket[a.goContext()].conns[a.socket].onError(err)
- return 0
- }
- //export logCallback
- func logCallback(a *utpCallbackArguments) C.uint64 {
- s := getSocketForLibContext(a.goContext())
- s.logger.Printf("libutp: %s", C.GoString((*C.char)(unsafe.Pointer(a.buf))))
- return 0
- }
- //export stateChangeCallback
- func stateChangeCallback(a *utpCallbackArguments) C.uint64 {
- s := libContextToSocket[a.goContext()]
- c := s.conns[a.socket]
- if logCallbacks {
- s.logger.Printf("state changed: conn %p: %s", c, libStateName(a.state()))
- }
- switch a.state() {
- case C.UTP_STATE_CONNECT:
- c.setConnected()
- // A dialled connection will not tell the remote it's ready until it
- // writes. If the dialer has no intention of writing, this will stall
- // everything. We do an empty write to get things rolling again. This
- // circumstance occurs when c1 in the RacyRead nettest is the dialer.
- C.utp_write(a.socket, nil, 0)
- case C.UTP_STATE_WRITABLE:
- c.cond.Broadcast()
- case C.UTP_STATE_EOF:
- c.setGotEOF()
- case C.UTP_STATE_DESTROYING:
- c.onDestroyed()
- s.onLibSocketDestroyed(a.socket)
- default:
- panic(a.state)
- }
- return 0
- }
- //export readCallback
- func readCallback(a *utpCallbackArguments) C.uint64 {
- s := libContextToSocket[a.goContext()]
- c := s.conns[a.socket]
- b := a.bufBytes()
- if logCallbacks {
- s.logger.Printf("read callback: conn %p: %d bytes", c, len(b))
- }
- if len(b) == 0 {
- panic("that will break the read drain invariant")
- }
- c.readBuf.Write(b)
- c.cond.Broadcast()
- return 0
- }
- //export acceptCallback
- func acceptCallback(a *utpCallbackArguments) C.uint64 {
- s := getSocketForLibContext(a.goContext())
- if logCallbacks {
- s.logger.Printf("accept callback: %#v", *a)
- }
- c := s.newConn(a.socket)
- c.setRemoteAddr()
- s.pushBacklog(c)
- return 0
- }
- //export getReadBufferSizeCallback
- func getReadBufferSizeCallback(a *utpCallbackArguments) (ret C.uint64) {
- s := libContextToSocket[a.goContext()]
- c := s.conns[a.socket]
- if c == nil {
- // socket hasn't been added to the Socket.conns yet. The read buffer
- // starts out empty, and the default implementation for this callback
- // returns 0, so we'll return that.
- return 0
- }
- ret = C.uint64(c.readBuf.Len())
- return
- }
- //export firewallCallback
- func firewallCallback(a *utpCallbackArguments) C.uint64 {
- s := getSocketForLibContext(a.goContext())
- if s.syncFirewallCallback != nil {
- var addr net.UDPAddr
- structSockaddrToUDPAddr(a.address(), &addr)
- if s.syncFirewallCallback(&addr) {
- return 1
- }
- } else if s.asyncBlock {
- return 1
- }
- return 0
- }
|