tracker-client.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. package webtorrent
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "sync"
  9. "time"
  10. g "github.com/anacrolix/generics"
  11. "github.com/anacrolix/log"
  12. "github.com/gorilla/websocket"
  13. "github.com/pion/datachannel"
  14. "github.com/pion/webrtc/v3"
  15. "go.opentelemetry.io/otel/trace"
  16. "github.com/anacrolix/torrent/tracker"
  17. )
  18. type TrackerClientStats struct {
  19. Dials int64
  20. ConvertedInboundConns int64
  21. ConvertedOutboundConns int64
  22. }
  23. // Client represents the webtorrent client
  24. type TrackerClient struct {
  25. Url string
  26. GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
  27. PeerId [20]byte
  28. OnConn onDataChannelOpen
  29. Logger log.Logger
  30. Dialer *websocket.Dialer
  31. mu sync.Mutex
  32. cond sync.Cond
  33. outboundOffers map[string]outboundOfferValue // OfferID to outboundOfferValue
  34. wsConn *websocket.Conn
  35. closed bool
  36. stats TrackerClientStats
  37. pingTicker *time.Ticker
  38. WebsocketTrackerHttpHeader func() http.Header
  39. ICEServers []webrtc.ICEServer
  40. }
  41. func (me *TrackerClient) Stats() TrackerClientStats {
  42. me.mu.Lock()
  43. defer me.mu.Unlock()
  44. return me.stats
  45. }
  46. func (me *TrackerClient) peerIdBinary() string {
  47. return binaryToJsonString(me.PeerId[:])
  48. }
  49. type outboundOffer struct {
  50. offerId string
  51. outboundOfferValue
  52. }
  53. // outboundOfferValue represents an outstanding offer.
  54. type outboundOfferValue struct {
  55. originalOffer webrtc.SessionDescription
  56. peerConnection *wrappedPeerConnection
  57. infoHash [20]byte
  58. dataChannel *webrtc.DataChannel
  59. }
  60. type DataChannelContext struct {
  61. OfferId string
  62. LocalOffered bool
  63. InfoHash [20]byte
  64. // This is private as some methods might not be appropriate with data channel context.
  65. peerConnection *wrappedPeerConnection
  66. Span trace.Span
  67. Context context.Context
  68. }
  69. func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
  70. return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair()
  71. }
  72. type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
  73. func (tc *TrackerClient) doWebsocket() error {
  74. metrics.Add("websocket dials", 1)
  75. tc.mu.Lock()
  76. tc.stats.Dials++
  77. tc.mu.Unlock()
  78. var header http.Header
  79. if tc.WebsocketTrackerHttpHeader != nil {
  80. header = tc.WebsocketTrackerHttpHeader()
  81. }
  82. c, _, err := tc.Dialer.Dial(tc.Url, header)
  83. if err != nil {
  84. return fmt.Errorf("dialing tracker: %w", err)
  85. }
  86. defer c.Close()
  87. tc.Logger.WithDefaultLevel(log.Info).Printf("connected")
  88. tc.mu.Lock()
  89. tc.wsConn = c
  90. tc.cond.Broadcast()
  91. tc.mu.Unlock()
  92. tc.announceOffers()
  93. closeChan := make(chan struct{})
  94. go func() {
  95. for {
  96. select {
  97. case <-tc.pingTicker.C:
  98. tc.mu.Lock()
  99. err := c.WriteMessage(websocket.PingMessage, []byte{})
  100. tc.mu.Unlock()
  101. if err != nil {
  102. return
  103. }
  104. case <-closeChan:
  105. return
  106. }
  107. }
  108. }()
  109. err = tc.trackerReadLoop(tc.wsConn)
  110. close(closeChan)
  111. tc.mu.Lock()
  112. c.Close()
  113. tc.mu.Unlock()
  114. return err
  115. }
  116. // Finishes initialization and spawns the run routine, calling onStop when it completes with the
  117. // result. We don't let the caller just spawn the runner directly, since then we can race against
  118. // .Close to finish initialization.
  119. func (tc *TrackerClient) Start(onStop func(error)) {
  120. tc.pingTicker = time.NewTicker(60 * time.Second)
  121. tc.cond.L = &tc.mu
  122. go func() {
  123. onStop(tc.run())
  124. }()
  125. }
  126. func (tc *TrackerClient) run() error {
  127. tc.mu.Lock()
  128. for !tc.closed {
  129. tc.mu.Unlock()
  130. err := tc.doWebsocket()
  131. level := log.Info
  132. tc.mu.Lock()
  133. if tc.closed {
  134. level = log.Debug
  135. }
  136. tc.mu.Unlock()
  137. tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
  138. time.Sleep(time.Minute)
  139. tc.mu.Lock()
  140. }
  141. tc.mu.Unlock()
  142. return nil
  143. }
  144. func (tc *TrackerClient) Close() error {
  145. tc.mu.Lock()
  146. tc.closed = true
  147. if tc.wsConn != nil {
  148. tc.wsConn.Close()
  149. }
  150. tc.closeUnusedOffers()
  151. tc.pingTicker.Stop()
  152. tc.mu.Unlock()
  153. tc.cond.Broadcast()
  154. return nil
  155. }
  156. func (tc *TrackerClient) announceOffers() {
  157. // tc.Announce grabs a lock on tc.outboundOffers. It also handles the case where outboundOffers
  158. // is nil. Take ownership of outboundOffers here.
  159. tc.mu.Lock()
  160. offers := tc.outboundOffers
  161. tc.outboundOffers = nil
  162. tc.mu.Unlock()
  163. if offers == nil {
  164. return
  165. }
  166. // Iterate over our locally-owned offers, close any existing "invalid" ones from before the
  167. // socket reconnected, reannounce the infohash, adding it back into the tc.outboundOffers.
  168. tc.Logger.WithDefaultLevel(log.Info).Printf("reannouncing %d infohashes after restart", len(offers))
  169. for _, offer := range offers {
  170. // TODO: Capture the errors? Are we even in a position to do anything with them?
  171. offer.peerConnection.Close()
  172. // Use goroutine here to allow read loop to start and ensure the buffer drains.
  173. go tc.Announce(tracker.Started, offer.infoHash)
  174. }
  175. }
  176. func (tc *TrackerClient) closeUnusedOffers() {
  177. for _, offer := range tc.outboundOffers {
  178. offer.peerConnection.Close()
  179. offer.dataChannel.Close()
  180. }
  181. tc.outboundOffers = nil
  182. }
  183. func (tc *TrackerClient) CloseOffersForInfohash(infoHash [20]byte) {
  184. tc.mu.Lock()
  185. defer tc.mu.Unlock()
  186. for key, offer := range tc.outboundOffers {
  187. if offer.infoHash == infoHash {
  188. offer.peerConnection.Close()
  189. delete(tc.outboundOffers, key)
  190. }
  191. }
  192. }
  193. func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
  194. metrics.Add("outbound announces", 1)
  195. if event == tracker.Stopped {
  196. return tc.announce(event, infoHash, nil)
  197. }
  198. var randOfferId [20]byte
  199. _, err := rand.Read(randOfferId[:])
  200. if err != nil {
  201. return fmt.Errorf("generating offer_id bytes: %w", err)
  202. }
  203. offerIDBinary := binaryToJsonString(randOfferId[:])
  204. pc, dc, offer, err := tc.newOffer(tc.Logger, offerIDBinary, infoHash)
  205. if err != nil {
  206. return fmt.Errorf("creating offer: %w", err)
  207. }
  208. tc.Logger.Levelf(log.Debug, "announcing offer")
  209. err = tc.announce(event, infoHash, []outboundOffer{
  210. {
  211. offerId: offerIDBinary,
  212. outboundOfferValue: outboundOfferValue{
  213. originalOffer: offer,
  214. peerConnection: pc,
  215. infoHash: infoHash,
  216. dataChannel: dc,
  217. },
  218. },
  219. })
  220. if err != nil {
  221. dc.Close()
  222. pc.Close()
  223. }
  224. return err
  225. }
  226. func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error {
  227. request, err := tc.GetAnnounceRequest(event, infoHash)
  228. if err != nil {
  229. return fmt.Errorf("getting announce parameters: %w", err)
  230. }
  231. req := AnnounceRequest{
  232. Numwant: len(offers),
  233. Uploaded: request.Uploaded,
  234. Downloaded: request.Downloaded,
  235. Left: request.Left,
  236. Event: request.Event.String(),
  237. Action: "announce",
  238. InfoHash: binaryToJsonString(infoHash[:]),
  239. PeerID: tc.peerIdBinary(),
  240. }
  241. for _, offer := range offers {
  242. req.Offers = append(req.Offers, Offer{
  243. OfferID: offer.offerId,
  244. Offer: offer.originalOffer,
  245. })
  246. }
  247. data, err := json.Marshal(req)
  248. if err != nil {
  249. return fmt.Errorf("marshalling request: %w", err)
  250. }
  251. tc.mu.Lock()
  252. defer tc.mu.Unlock()
  253. err = tc.writeMessage(data)
  254. if err != nil {
  255. return fmt.Errorf("write AnnounceRequest: %w", err)
  256. }
  257. for _, offer := range offers {
  258. g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
  259. }
  260. return nil
  261. }
  262. func (tc *TrackerClient) writeMessage(data []byte) error {
  263. for tc.wsConn == nil {
  264. if tc.closed {
  265. return fmt.Errorf("%T closed", tc)
  266. }
  267. tc.cond.Wait()
  268. }
  269. return tc.wsConn.WriteMessage(websocket.TextMessage, data)
  270. }
  271. func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
  272. for {
  273. _, message, err := tracker.ReadMessage()
  274. if err != nil {
  275. return fmt.Errorf("read message error: %w", err)
  276. }
  277. tc.Logger.Levelf(log.Debug, "received message: %q", message)
  278. var ar AnnounceResponse
  279. if err := json.Unmarshal(message, &ar); err != nil {
  280. tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
  281. continue
  282. }
  283. switch {
  284. case ar.Offer != nil:
  285. ih, err := jsonStringToInfoHash(ar.InfoHash)
  286. if err != nil {
  287. tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
  288. break
  289. }
  290. err = tc.handleOffer(offerContext{
  291. SessDesc: *ar.Offer,
  292. Id: ar.OfferID,
  293. InfoHash: ih,
  294. }, ar.PeerID)
  295. if err != nil {
  296. tc.Logger.Levelf(log.Error, "handling offer for infohash %x: %v", ih, err)
  297. }
  298. case ar.Answer != nil:
  299. tc.handleAnswer(ar.OfferID, *ar.Answer)
  300. default:
  301. // wss://tracker.openwebtorrent.com appears to respond to an initial announces without
  302. // an offer or answer. I think that's fine. Let's check it at least contains an
  303. // infohash.
  304. _, err := jsonStringToInfoHash(ar.InfoHash)
  305. if err != nil {
  306. tc.Logger.Levelf(log.Warning, "unexpected announce response %q", message)
  307. }
  308. }
  309. }
  310. }
  311. type offerContext struct {
  312. SessDesc webrtc.SessionDescription
  313. Id string
  314. InfoHash [20]byte
  315. }
  316. func (tc *TrackerClient) handleOffer(
  317. offerContext offerContext,
  318. peerId string,
  319. ) error {
  320. peerConnection, answer, err := tc.newAnsweringPeerConnection(offerContext)
  321. if err != nil {
  322. return fmt.Errorf("creating answering peer connection: %w", err)
  323. }
  324. response := AnnounceResponse{
  325. Action: "announce",
  326. InfoHash: binaryToJsonString(offerContext.InfoHash[:]),
  327. PeerID: tc.peerIdBinary(),
  328. ToPeerID: peerId,
  329. Answer: &answer,
  330. OfferID: offerContext.Id,
  331. }
  332. data, err := json.Marshal(response)
  333. if err != nil {
  334. peerConnection.Close()
  335. return fmt.Errorf("marshalling response: %w", err)
  336. }
  337. tc.mu.Lock()
  338. defer tc.mu.Unlock()
  339. if err := tc.writeMessage(data); err != nil {
  340. peerConnection.Close()
  341. return fmt.Errorf("writing response: %w", err)
  342. }
  343. return nil
  344. }
  345. func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
  346. tc.mu.Lock()
  347. defer tc.mu.Unlock()
  348. offer, ok := tc.outboundOffers[offerId]
  349. if !ok {
  350. tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %+q", offerId)
  351. return
  352. }
  353. // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
  354. metrics.Add("outbound offers answered", 1)
  355. err := offer.peerConnection.SetRemoteDescription(answer)
  356. if err != nil {
  357. err = fmt.Errorf("using outbound offer answer: %w", err)
  358. offer.peerConnection.span.RecordError(err)
  359. tc.Logger.LevelPrint(log.Error, err)
  360. return
  361. }
  362. delete(tc.outboundOffers, offerId)
  363. go tc.Announce(tracker.None, offer.infoHash)
  364. }