utp_internal.cpp 106 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489
  1. /*
  2. * Copyright (c) 2010-2013 BitTorrent, Inc.
  3. *
  4. * Permission is hereby granted, free of charge, to any person obtaining a copy
  5. * of this software and associated documentation files (the "Software"), to deal
  6. * in the Software without restriction, including without limitation the rights
  7. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. * copies of the Software, and to permit persons to whom the Software is
  9. * furnished to do so, subject to the following conditions:
  10. *
  11. * The above copyright notice and this permission notice shall be included in
  12. * all copies or substantial portions of the Software.
  13. *
  14. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  20. * THE SOFTWARE.
  21. */
  22. #include <stdio.h>
  23. #include <assert.h>
  24. #include <string.h>
  25. #include <string.h>
  26. #include <stdlib.h>
  27. #include <errno.h>
  28. #include <limits.h> // for UINT_MAX
  29. #include <time.h>
  30. #include "utp_types.h"
  31. #include "utp_packedsockaddr.h"
  32. #include "utp_internal.h"
  33. #include "utp_hash.h"
  34. #define TIMEOUT_CHECK_INTERVAL 500
  35. // number of bytes to increase max window size by, per RTT. This is
  36. // scaled down linearly proportional to off_target. i.e. if all packets
  37. // in one window have 0 delay, window size will increase by this number.
  38. // Typically it's less. TCP increases one MSS per RTT, which is 1500
  39. #define MAX_CWND_INCREASE_BYTES_PER_RTT 3000
  40. #define CUR_DELAY_SIZE 3
  41. // experiments suggest that a clock skew of 10 ms per 325 seconds
  42. // is not impossible. Reset delay_base every 13 minutes. The clock
  43. // skew is dealt with by observing the delay base in the other
  44. // direction, and adjusting our own upwards if the opposite direction
  45. // delay base keeps going down
  46. #define DELAY_BASE_HISTORY 13
  47. #define MAX_WINDOW_DECAY 100 // ms
  48. #define REORDER_BUFFER_SIZE 32
  49. #define REORDER_BUFFER_MAX_SIZE 1024
  50. #define OUTGOING_BUFFER_MAX_SIZE 1024
  51. #define PACKET_SIZE 1435
  52. // this is the minimum max_window value. It can never drop below this
  53. #define MIN_WINDOW_SIZE 10
  54. // if we receive 4 or more duplicate acks, we resend the packet
  55. // that hasn't been acked yet
  56. #define DUPLICATE_ACKS_BEFORE_RESEND 3
  57. // Allow a reception window of at least 3 ack_nrs behind seq_nr
  58. // A non-SYN packet with an ack_nr difference greater than this is
  59. // considered suspicious and ignored
  60. #define ACK_NR_ALLOWED_WINDOW DUPLICATE_ACKS_BEFORE_RESEND
  61. #define RST_INFO_TIMEOUT 10000
  62. #define RST_INFO_LIMIT 1000
  63. // 29 seconds determined from measuring many home NAT devices
  64. #define KEEPALIVE_INTERVAL 29000
  65. #define SEQ_NR_MASK 0xFFFF
  66. #define ACK_NR_MASK 0xFFFF
  67. #define TIMESTAMP_MASK 0xFFFFFFFF
  68. #define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom)
  69. // The totals are derived from the following data:
  70. // 45: IPv6 address including embedded IPv4 address
  71. // 11: Scope Id
  72. // 2: Brackets around IPv6 address when port is present
  73. // 6: Port (including colon)
  74. // 1: Terminating null byte
  75. char addrbuf[65];
  76. #define addrfmt(x, s) x.fmt(s, sizeof(s))
  77. #if (defined(__SVR4) && defined(__sun))
  78. #pragma pack(1)
  79. #else
  80. #pragma pack(push,1)
  81. #endif
  82. // these packet sizes are including the uTP header wich
  83. // is either 20 or 23 bytes depending on version
  84. #define PACKET_SIZE_EMPTY_BUCKET 0
  85. #define PACKET_SIZE_EMPTY 23
  86. #define PACKET_SIZE_SMALL_BUCKET 1
  87. #define PACKET_SIZE_SMALL 373
  88. #define PACKET_SIZE_MID_BUCKET 2
  89. #define PACKET_SIZE_MID 723
  90. #define PACKET_SIZE_BIG_BUCKET 3
  91. #define PACKET_SIZE_BIG 1400
  92. #define PACKET_SIZE_HUGE_BUCKET 4
  93. struct PACKED_ATTRIBUTE PacketFormatV1 {
  94. // packet_type (4 high bits)
  95. // protocol version (4 low bits)
  96. byte ver_type;
  97. byte version() const { return ver_type & 0xf; }
  98. byte type() const { return ver_type >> 4; }
  99. void set_version(byte v) { ver_type = (ver_type & 0xf0) | (v & 0xf); }
  100. void set_type(byte t) { ver_type = (ver_type & 0xf) | (t << 4); }
  101. // Type of the first extension header
  102. byte ext;
  103. // connection ID
  104. uint16_big connid;
  105. uint32_big tv_usec;
  106. uint32_big reply_micro;
  107. // receive window size in bytes
  108. uint32_big windowsize;
  109. // Sequence number
  110. uint16_big seq_nr;
  111. // Acknowledgment number
  112. uint16_big ack_nr;
  113. };
  114. struct PACKED_ATTRIBUTE PacketFormatAckV1 {
  115. PacketFormatV1 pf;
  116. byte ext_next;
  117. byte ext_len;
  118. byte acks[4];
  119. };
  120. #if (defined(__SVR4) && defined(__sun))
  121. #pragma pack(0)
  122. #else
  123. #pragma pack(pop)
  124. #endif
  125. enum {
  126. ST_DATA = 0, // Data packet.
  127. ST_FIN = 1, // Finalize the connection. This is the last packet.
  128. ST_STATE = 2, // State packet. Used to transmit an ACK with no data.
  129. ST_RESET = 3, // Terminate connection forcefully.
  130. ST_SYN = 4, // Connect SYN
  131. ST_NUM_STATES, // used for bounds checking
  132. };
  133. static const cstr flagnames[] = {
  134. "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN"
  135. };
  136. enum CONN_STATE {
  137. CS_UNINITIALIZED = 0,
  138. CS_IDLE,
  139. CS_SYN_SENT,
  140. CS_SYN_RECV,
  141. CS_CONNECTED,
  142. CS_CONNECTED_FULL,
  143. CS_RESET,
  144. CS_DESTROY
  145. };
  146. static const cstr statenames[] = {
  147. "UNINITIALIZED", "IDLE","SYN_SENT", "SYN_RECV", "CONNECTED","CONNECTED_FULL","DESTROY_DELAY","RESET","DESTROY"
  148. };
  149. struct OutgoingPacket {
  150. size_t length;
  151. size_t payload;
  152. uint64 time_sent; // microseconds
  153. uint transmissions:31;
  154. bool need_resend:1;
  155. byte data[1];
  156. };
  157. struct SizableCircularBuffer {
  158. // This is the mask. Since it's always a power of 2, adding 1 to this value will return the size.
  159. size_t mask;
  160. // This is the elements that the circular buffer points to
  161. void **elements;
  162. void *get(size_t i) const { assert(elements); return elements ? elements[i & mask] : NULL; }
  163. void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; }
  164. void grow(size_t item, size_t index);
  165. void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); }
  166. size_t size() { return mask + 1; }
  167. };
  168. // Item contains the element we want to make space for
  169. // index is the index in the list.
  170. void SizableCircularBuffer::grow(size_t item, size_t index)
  171. {
  172. // Figure out the new size.
  173. size_t size = mask + 1;
  174. do size *= 2; while (index >= size);
  175. // Allocate the new buffer
  176. void **buf = (void**)calloc(size, sizeof(void*));
  177. size--;
  178. // Copy elements from the old buffer to the new buffer
  179. for (size_t i = 0; i <= mask; i++) {
  180. buf[(item - index + i) & size] = get(item - index + i);
  181. }
  182. // Swap to the newly allocated buffer
  183. mask = size;
  184. free(elements);
  185. elements = buf;
  186. }
  187. // compare if lhs is less than rhs, taking wrapping
  188. // into account. if lhs is close to UINT_MAX and rhs
  189. // is close to 0, lhs is assumed to have wrapped and
  190. // considered smaller
  191. bool wrapping_compare_less(uint32 lhs, uint32 rhs, uint32 mask)
  192. {
  193. // distance walking from lhs to rhs, downwards
  194. const uint32 dist_down = (lhs - rhs) & mask;
  195. // distance walking from lhs to rhs, upwards
  196. const uint32 dist_up = (rhs - lhs) & mask;
  197. // if the distance walking up is shorter, lhs
  198. // is less than rhs. If the distance walking down
  199. // is shorter, then rhs is less than lhs
  200. return dist_up < dist_down;
  201. }
  202. struct DelayHist {
  203. uint32 delay_base;
  204. // this is the history of delay samples,
  205. // normalized by using the delay_base. These
  206. // values are always greater than 0 and measures
  207. // the queuing delay in microseconds
  208. uint32 cur_delay_hist[CUR_DELAY_SIZE];
  209. size_t cur_delay_idx;
  210. // this is the history of delay_base. It's
  211. // a number that doesn't have an absolute meaning
  212. // only relative. It doesn't make sense to initialize
  213. // it to anything other than values relative to
  214. // what's been seen in the real world.
  215. uint32 delay_base_hist[DELAY_BASE_HISTORY];
  216. size_t delay_base_idx;
  217. // the time when we last stepped the delay_base_idx
  218. uint64 delay_base_time;
  219. bool delay_base_initialized;
  220. void clear(uint64 current_ms)
  221. {
  222. delay_base_initialized = false;
  223. delay_base = 0;
  224. cur_delay_idx = 0;
  225. delay_base_idx = 0;
  226. delay_base_time = current_ms;
  227. for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
  228. cur_delay_hist[i] = 0;
  229. }
  230. for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
  231. delay_base_hist[i] = 0;
  232. }
  233. }
  234. void shift(const uint32 offset)
  235. {
  236. // the offset should never be "negative"
  237. // assert(offset < 0x10000000);
  238. // increase all of our base delays by this amount
  239. // this is used to take clock skew into account
  240. // by observing the other side's changes in its base_delay
  241. for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
  242. delay_base_hist[i] += offset;
  243. }
  244. delay_base += offset;
  245. }
  246. void add_sample(const uint32 sample, uint64 current_ms)
  247. {
  248. // The two clocks (in the two peers) are assumed not to
  249. // progress at the exact same rate. They are assumed to be
  250. // drifting, which causes the delay samples to contain
  251. // a systematic error, either they are under-
  252. // estimated or over-estimated. This is why we update the
  253. // delay_base every two minutes, to adjust for this.
  254. // This means the values will keep drifting and eventually wrap.
  255. // We can cross the wrapping boundry in two directions, either
  256. // going up, crossing the highest value, or going down, crossing 0.
  257. // if the delay_base is close to the max value and sample actually
  258. // wrapped on the other end we would see something like this:
  259. // delay_base = 0xffffff00, sample = 0x00000400
  260. // sample - delay_base = 0x500 which is the correct difference
  261. // if the delay_base is instead close to 0, and we got an even lower
  262. // sample (that will eventually update the delay_base), we may see
  263. // something like this:
  264. // delay_base = 0x00000400, sample = 0xffffff00
  265. // sample - delay_base = 0xfffffb00
  266. // this needs to be interpreted as a negative number and the actual
  267. // recorded delay should be 0.
  268. // It is important that all arithmetic that assume wrapping
  269. // is done with unsigned intergers. Signed integers are not guaranteed
  270. // to wrap the way unsigned integers do. At least GCC takes advantage
  271. // of this relaxed rule and won't necessarily wrap signed ints.
  272. // remove the clock offset and propagation delay.
  273. // delay base is min of the sample and the current
  274. // delay base. This min-operation is subject to wrapping
  275. // and care needs to be taken to correctly choose the
  276. // true minimum.
  277. // specifically the problem case is when delay_base is very small
  278. // and sample is very large (because it wrapped past zero), sample
  279. // needs to be considered the smaller
  280. if (!delay_base_initialized) {
  281. // delay_base being 0 suggests that we haven't initialized
  282. // it or its history with any real measurements yet. Initialize
  283. // everything with this sample.
  284. for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
  285. // if we don't have a value, set it to the current sample
  286. delay_base_hist[i] = sample;
  287. continue;
  288. }
  289. delay_base = sample;
  290. delay_base_initialized = true;
  291. }
  292. if (wrapping_compare_less(sample, delay_base_hist[delay_base_idx], TIMESTAMP_MASK)) {
  293. // sample is smaller than the current delay_base_hist entry
  294. // update it
  295. delay_base_hist[delay_base_idx] = sample;
  296. }
  297. // is sample lower than delay_base? If so, update delay_base
  298. if (wrapping_compare_less(sample, delay_base, TIMESTAMP_MASK)) {
  299. // sample is smaller than the current delay_base
  300. // update it
  301. delay_base = sample;
  302. }
  303. // this operation may wrap, and is supposed to
  304. const uint32 delay = sample - delay_base;
  305. // sanity check. If this is triggered, something fishy is going on
  306. // it means the measured sample was greater than 32 seconds!
  307. //assert(delay < 0x2000000);
  308. cur_delay_hist[cur_delay_idx] = delay;
  309. cur_delay_idx = (cur_delay_idx + 1) % CUR_DELAY_SIZE;
  310. // once every minute
  311. if (current_ms - delay_base_time > 60 * 1000) {
  312. delay_base_time = current_ms;
  313. delay_base_idx = (delay_base_idx + 1) % DELAY_BASE_HISTORY;
  314. // clear up the new delay base history spot by initializing
  315. // it to the current sample, then update it
  316. delay_base_hist[delay_base_idx] = sample;
  317. delay_base = delay_base_hist[0];
  318. // Assign the lowest delay in the last 2 minutes to delay_base
  319. for (size_t i = 0; i < DELAY_BASE_HISTORY; i++) {
  320. if (wrapping_compare_less(delay_base_hist[i], delay_base, TIMESTAMP_MASK))
  321. delay_base = delay_base_hist[i];
  322. }
  323. }
  324. }
  325. uint32 get_value()
  326. {
  327. uint32 value = UINT_MAX;
  328. for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
  329. value = min<uint32>(cur_delay_hist[i], value);
  330. }
  331. // value could be UINT_MAX if we have no samples yet...
  332. return value;
  333. }
  334. };
  335. struct UTPSocket {
  336. ~UTPSocket();
  337. PackedSockAddr addr;
  338. utp_context *ctx;
  339. int ida; //for ack socket list
  340. uint16 retransmit_count;
  341. uint16 reorder_count;
  342. byte duplicate_ack;
  343. // the number of packets in the send queue. Packets that haven't
  344. // yet been sent count as well as packets marked as needing resend
  345. // the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
  346. uint16 cur_window_packets;
  347. // how much of the window is used, number of bytes in-flight
  348. // packets that have not yet been sent do not count, packets
  349. // that are marked as needing to be re-sent (due to a timeout)
  350. // don't count either
  351. size_t cur_window;
  352. // maximum window size, in bytes
  353. size_t max_window;
  354. // UTP_SNDBUF setting, in bytes
  355. size_t opt_sndbuf;
  356. // UTP_RCVBUF setting, in bytes
  357. size_t opt_rcvbuf;
  358. // this is the target delay, in microseconds
  359. // for this socket. defaults to 100000.
  360. size_t target_delay;
  361. // Is a FIN packet in the reassembly buffer?
  362. bool got_fin:1;
  363. // Have we reached the FIN?
  364. bool got_fin_reached:1;
  365. // Have we sent our FIN?
  366. bool fin_sent:1;
  367. // Has our fin been ACKed?
  368. bool fin_sent_acked:1;
  369. // Reading is disabled
  370. bool read_shutdown:1;
  371. // User called utp_close()
  372. bool close_requested:1;
  373. // Timeout procedure
  374. bool fast_timeout:1;
  375. // max receive window for other end, in bytes
  376. size_t max_window_user;
  377. CONN_STATE state;
  378. // TickCount when we last decayed window (wraps)
  379. int64 last_rwin_decay;
  380. // the sequence number of the FIN packet. This field is only set
  381. // when we have received a FIN, and the flag field has the FIN flag set.
  382. // it is used to know when it is safe to destroy the socket, we must have
  383. // received all packets up to this sequence number first.
  384. uint16 eof_pkt;
  385. // All sequence numbers up to including this have been properly received
  386. // by us
  387. uint16 ack_nr;
  388. // This is the sequence number for the next packet to be sent.
  389. uint16 seq_nr;
  390. uint16 timeout_seq_nr;
  391. // This is the sequence number of the next packet we're allowed to
  392. // do a fast resend with. This makes sure we only do a fast-resend
  393. // once per packet. We can resend the packet with this sequence number
  394. // or any later packet (with a higher sequence number).
  395. uint16 fast_resend_seq_nr;
  396. uint32 reply_micro;
  397. uint64 last_got_packet;
  398. uint64 last_sent_packet;
  399. uint64 last_measured_delay;
  400. // timestamp of the last time the cwnd was full
  401. // this is used to prevent the congestion window
  402. // from growing when we're not sending at capacity
  403. mutable uint64 last_maxed_out_window;
  404. void *userdata;
  405. // Round trip time
  406. uint rtt;
  407. // Round trip time variance
  408. uint rtt_var;
  409. // Round trip timeout
  410. uint rto;
  411. DelayHist rtt_hist;
  412. uint retransmit_timeout;
  413. // The RTO timer will timeout here.
  414. uint64 rto_timeout;
  415. // When the window size is set to zero, start this timer. It will send a new packet every 30secs.
  416. uint64 zerowindow_time;
  417. uint32 conn_seed;
  418. // Connection ID for packets I receive
  419. uint32 conn_id_recv;
  420. // Connection ID for packets I send
  421. uint32 conn_id_send;
  422. // Last rcv window we advertised, in bytes
  423. size_t last_rcv_win;
  424. DelayHist our_hist;
  425. DelayHist their_hist;
  426. // extension bytes from SYN packet
  427. byte extensions[8];
  428. // MTU Discovery
  429. // time when we should restart the MTU discovery
  430. uint64 mtu_discover_time;
  431. // ceiling and floor of binary search. last is the mtu size
  432. // we're currently using
  433. uint32 mtu_ceiling, mtu_floor, mtu_last;
  434. // we only ever have a single probe in flight at any given time.
  435. // this is the sequence number of that probe, and the size of
  436. // that packet
  437. uint32 mtu_probe_seq, mtu_probe_size;
  438. // this is the average delay samples, as compared to the initial
  439. // sample. It's averaged over 5 seconds
  440. int32 average_delay;
  441. // this is the sum of all the delay samples
  442. // we've made recently. The important distinction
  443. // of these samples is that they are all made compared
  444. // to the initial sample, this is to deal with
  445. // wrapping in a simple way.
  446. int64 current_delay_sum;
  447. // number of sample ins current_delay_sum
  448. int current_delay_samples;
  449. // initialized to 0, set to the first raw delay sample
  450. // each sample that's added to current_delay_sum
  451. // is subtracted from the value first, to make it
  452. // a delay relative to this sample
  453. uint32 average_delay_base;
  454. // the next time we should add an average delay
  455. // sample into average_delay_hist
  456. uint64 average_sample_time;
  457. // the estimated clock drift between our computer
  458. // and the endpoint computer. The unit is microseconds
  459. // per 5 seconds
  460. int32 clock_drift;
  461. // just used for logging
  462. int32 clock_drift_raw;
  463. SizableCircularBuffer inbuf, outbuf;
  464. #ifdef _DEBUG
  465. // Public per-socket statistics, returned by utp_get_stats()
  466. utp_socket_stats _stats;
  467. #endif
  468. // true if we're in slow-start (exponential growth) phase
  469. bool slow_start;
  470. // the slow-start threshold, in bytes
  471. size_t ssthresh;
  472. void log(int level, char const *fmt, ...)
  473. {
  474. va_list va;
  475. char buf[4096], buf2[4096];
  476. // don't bother with vsnprintf() etc calls if we're not going to log.
  477. if (!ctx->would_log(level)) {
  478. return;
  479. }
  480. va_start(va, fmt);
  481. vsnprintf(buf, 4096, fmt, va);
  482. va_end(va);
  483. buf[4095] = '\0';
  484. snprintf(buf2, 4096, "%p %s %06u %s", this, addrfmt(addr, addrbuf), conn_id_recv, buf);
  485. buf2[4095] = '\0';
  486. ctx->log_unchecked(this, buf2);
  487. }
  488. void schedule_ack();
  489. // called every time mtu_floor or mtu_ceiling are adjusted
  490. void mtu_search_update();
  491. void mtu_reset();
  492. // Calculates the current receive window
  493. size_t get_rcv_window()
  494. {
  495. // Trim window down according to what's already in buffer.
  496. const size_t numbuf = utp_call_get_read_buffer_size(this->ctx, this);
  497. assert((int)numbuf >= 0);
  498. return opt_rcvbuf > numbuf ? opt_rcvbuf - numbuf : 0;
  499. }
  500. // Test if we're ready to decay max_window
  501. // XXX this breaks when spaced by > INT_MAX/2, which is 49
  502. // days; the failure mode in that case is we do an extra decay
  503. // or fail to do one when we really shouldn't.
  504. bool can_decay_win(int64 msec) const
  505. {
  506. return (msec - last_rwin_decay) >= MAX_WINDOW_DECAY;
  507. }
  508. // If we can, decay max window, returns true if we actually did so
  509. void maybe_decay_win(uint64 current_ms)
  510. {
  511. if (can_decay_win(current_ms)) {
  512. // TCP uses 0.5
  513. max_window = (size_t)(max_window * .5);
  514. last_rwin_decay = current_ms;
  515. if (max_window < MIN_WINDOW_SIZE)
  516. max_window = MIN_WINDOW_SIZE;
  517. slow_start = false;
  518. ssthresh = max_window;
  519. }
  520. }
  521. size_t get_header_size() const
  522. {
  523. return sizeof(PacketFormatV1);
  524. }
  525. size_t get_udp_mtu()
  526. {
  527. socklen_t len;
  528. SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
  529. return utp_call_get_udp_mtu(this->ctx, this, (const struct sockaddr *)&sa, len);
  530. }
  531. size_t get_udp_overhead()
  532. {
  533. socklen_t len;
  534. SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&len);
  535. return utp_call_get_udp_overhead(this->ctx, this, (const struct sockaddr *)&sa, len);
  536. }
  537. size_t get_overhead()
  538. {
  539. return get_udp_overhead() + get_header_size();
  540. }
  541. void send_data(byte* b, size_t length, bandwidth_type_t type, uint32 flags = 0);
  542. void send_ack(bool synack = false);
  543. void send_keep_alive();
  544. static void send_rst(utp_context *ctx,
  545. const PackedSockAddr &addr, uint32 conn_id_send,
  546. uint16 ack_nr, uint16 seq_nr);
  547. void send_packet(OutgoingPacket *pkt);
  548. bool is_full(int bytes = -1);
  549. bool flush_packets();
  550. void write_outgoing_packet(size_t payload, uint flags, struct utp_iovec *iovec, size_t num_iovecs);
  551. #ifdef _DEBUG
  552. void check_invariant();
  553. #endif
  554. void check_timeouts();
  555. int ack_packet(uint16 seq);
  556. size_t selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt);
  557. void selective_ack(uint base, const byte *mask, byte len);
  558. void apply_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt);
  559. size_t get_packet_size() const;
  560. };
  561. void removeSocketFromAckList(UTPSocket *conn)
  562. {
  563. if (conn->ida >= 0)
  564. {
  565. UTPSocket *last = conn->ctx->ack_sockets[conn->ctx->ack_sockets.GetCount() - 1];
  566. assert(last->ida < (int)(conn->ctx->ack_sockets.GetCount()));
  567. assert(conn->ctx->ack_sockets[last->ida] == last);
  568. last->ida = conn->ida;
  569. conn->ctx->ack_sockets[conn->ida] = last;
  570. conn->ida = -1;
  571. // Decrease the count
  572. conn->ctx->ack_sockets.SetCount(conn->ctx->ack_sockets.GetCount() - 1);
  573. }
  574. }
  575. static void utp_register_sent_packet(utp_context *ctx, size_t length)
  576. {
  577. if (length <= PACKET_SIZE_MID) {
  578. if (length <= PACKET_SIZE_EMPTY) {
  579. ctx->context_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++;
  580. } else if (length <= PACKET_SIZE_SMALL) {
  581. ctx->context_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++;
  582. } else
  583. ctx->context_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++;
  584. } else {
  585. if (length <= PACKET_SIZE_BIG) {
  586. ctx->context_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++;
  587. } else
  588. ctx->context_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++;
  589. }
  590. }
  591. void send_to_addr(utp_context *ctx, utp_socket *socket, const byte *p, size_t len, const PackedSockAddr &addr, int flags = 0)
  592. {
  593. socklen_t tolen;
  594. SOCKADDR_STORAGE to = addr.get_sockaddr_storage(&tolen);
  595. utp_register_sent_packet(ctx, len);
  596. utp_call_sendto(ctx, socket, p, len, (const struct sockaddr *)&to, tolen, flags);
  597. }
  598. void UTPSocket::schedule_ack()
  599. {
  600. if (ida == -1){
  601. #if UTP_DEBUG_LOGGING
  602. log(UTP_LOG_DEBUG, "schedule_ack");
  603. #endif
  604. ida = ctx->ack_sockets.Append(this);
  605. } else {
  606. #if UTP_DEBUG_LOGGING
  607. log(UTP_LOG_DEBUG, "schedule_ack: already in list");
  608. #endif
  609. }
  610. }
  611. void UTPSocket::send_data(byte* b, size_t length, bandwidth_type_t type, uint32 flags)
  612. {
  613. // time stamp this packet with local time, the stamp goes into
  614. // the header of every packet at the 8th byte for 8 bytes :
  615. // two integers, check packet.h for more
  616. uint64 time = utp_call_get_microseconds(ctx, this);
  617. PacketFormatV1* b1 = (PacketFormatV1*)b;
  618. b1->tv_usec = (uint32)time;
  619. b1->reply_micro = reply_micro;
  620. last_sent_packet = ctx->current_ms;
  621. #ifdef _DEBUG
  622. _stats.nbytes_xmit += length;
  623. ++_stats.nxmit;
  624. #endif
  625. if (ctx->callbacks[UTP_ON_OVERHEAD_STATISTICS]) {
  626. size_t n;
  627. if (type == payload_bandwidth) {
  628. // if this packet carries payload, just
  629. // count the header as overhead
  630. type = header_overhead;
  631. n = get_overhead();
  632. } else {
  633. n = length + get_udp_overhead();
  634. }
  635. utp_call_on_overhead_statistics(ctx, this, true, n, type);
  636. }
  637. #if UTP_DEBUG_LOGGING
  638. int flags2 = b1->type();
  639. uint16 seq_nr = b1->seq_nr;
  640. uint16 ack_nr = b1->ack_nr;
  641. log(UTP_LOG_DEBUG, "send %s len:%u id:%u timestamp:" I64u " reply_micro:%u flags:%s seq_nr:%u ack_nr:%u",
  642. addrfmt(addr, addrbuf), (uint)length, conn_id_send, time, reply_micro, flagnames[flags2],
  643. seq_nr, ack_nr);
  644. #endif
  645. send_to_addr(ctx, this, b, length, addr, flags);
  646. removeSocketFromAckList(this);
  647. }
  648. void UTPSocket::send_ack(bool synack)
  649. {
  650. PacketFormatAckV1 pfa;
  651. zeromem(&pfa);
  652. size_t len;
  653. last_rcv_win = get_rcv_window();
  654. pfa.pf.set_version(1);
  655. pfa.pf.set_type(ST_STATE);
  656. pfa.pf.ext = 0;
  657. pfa.pf.connid = conn_id_send;
  658. pfa.pf.ack_nr = ack_nr;
  659. pfa.pf.seq_nr = seq_nr;
  660. pfa.pf.windowsize = (uint32)last_rcv_win;
  661. len = sizeof(PacketFormatV1);
  662. // we never need to send EACK for connections
  663. // that are shutting down
  664. if (reorder_count != 0 && !got_fin_reached) {
  665. // if reorder count > 0, send an EACK.
  666. // reorder count should always be 0
  667. // for synacks, so this should not be
  668. // as synack
  669. assert(!synack);
  670. pfa.pf.ext = 1;
  671. pfa.ext_next = 0;
  672. pfa.ext_len = 4;
  673. uint m = 0;
  674. // reorder count should only be non-zero
  675. // if the packet ack_nr + 1 has not yet
  676. // been received
  677. assert(inbuf.get(ack_nr + 1) == NULL);
  678. size_t window = min<size_t>(14+16, inbuf.size());
  679. // Generate bit mask of segments received.
  680. for (size_t i = 0; i < window; i++) {
  681. if (inbuf.get(ack_nr + i + 2) != NULL) {
  682. m |= 1 << i;
  683. #if UTP_DEBUG_LOGGING
  684. log(UTP_LOG_DEBUG, "EACK packet [%u]", ack_nr + i + 2);
  685. #endif
  686. }
  687. }
  688. pfa.acks[0] = (byte)m;
  689. pfa.acks[1] = (byte)(m >> 8);
  690. pfa.acks[2] = (byte)(m >> 16);
  691. pfa.acks[3] = (byte)(m >> 24);
  692. len += 4 + 2;
  693. #if UTP_DEBUG_LOGGING
  694. log(UTP_LOG_DEBUG, "Sending EACK %u [%u] bits:[%032b]", ack_nr, conn_id_send, m);
  695. #endif
  696. } else {
  697. #if UTP_DEBUG_LOGGING
  698. log(UTP_LOG_DEBUG, "Sending ACK %u [%u]", ack_nr, conn_id_send);
  699. #endif
  700. }
  701. send_data((byte*)&pfa, len, ack_overhead);
  702. removeSocketFromAckList(this);
  703. }
  704. void UTPSocket::send_keep_alive()
  705. {
  706. ack_nr--;
  707. #if UTP_DEBUG_LOGGING
  708. log(UTP_LOG_DEBUG, "Sending KeepAlive ACK %u [%u]", ack_nr, conn_id_send);
  709. #endif
  710. send_ack();
  711. ack_nr++;
  712. }
  713. void UTPSocket::send_rst(utp_context *ctx,
  714. const PackedSockAddr &addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr)
  715. {
  716. PacketFormatV1 pf1;
  717. zeromem(&pf1);
  718. size_t len;
  719. pf1.set_version(1);
  720. pf1.set_type(ST_RESET);
  721. pf1.ext = 0;
  722. pf1.connid = conn_id_send;
  723. pf1.ack_nr = ack_nr;
  724. pf1.seq_nr = seq_nr;
  725. pf1.windowsize = 0;
  726. len = sizeof(PacketFormatV1);
  727. // LOG_DEBUG("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr);
  728. // LOG_DEBUG("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send);
  729. send_to_addr(ctx, NULL, (const byte*)&pf1, len, addr);
  730. }
  731. void UTPSocket::send_packet(OutgoingPacket *pkt)
  732. {
  733. // only count against the quota the first time we
  734. // send the packet. Don't enforce quota when closing
  735. // a socket. Only enforce the quota when we're sending
  736. // at slow rates (max window < packet size)
  737. //size_t max_send = min(max_window, opt_sndbuf, max_window_user);
  738. time_t cur_time = utp_call_get_milliseconds(this->ctx, this);
  739. if (pkt->transmissions == 0 || pkt->need_resend) {
  740. cur_window += pkt->payload;
  741. }
  742. pkt->need_resend = false;
  743. PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
  744. p1->ack_nr = ack_nr;
  745. pkt->time_sent = utp_call_get_microseconds(this->ctx, this);
  746. //socklen_t salen;
  747. //SOCKADDR_STORAGE sa = addr.get_sockaddr_storage(&salen);
  748. bool use_as_mtu_probe = false;
  749. // TODO: this is subject to nasty wrapping issues! Below as well
  750. if (mtu_discover_time < (uint64)cur_time) {
  751. // it's time to reset our MTU assupmtions
  752. // and trigger a new search
  753. mtu_reset();
  754. }
  755. // don't use packets that are larger then mtu_ceiling
  756. // as probes, since they were probably used as probes
  757. // already and failed, now we need it to fragment
  758. // just to get it through
  759. // if seq_nr == 1, the probe would end up being 0
  760. // which is a magic number representing no-probe
  761. // that why we don't send a probe for a packet with
  762. // sequence number 0
  763. if (mtu_floor < mtu_ceiling
  764. && pkt->length > mtu_floor
  765. && pkt->length <= mtu_ceiling
  766. && mtu_probe_seq == 0
  767. && seq_nr != 1
  768. && pkt->transmissions == 0) {
  769. // we've already incremented seq_nr
  770. // for this packet
  771. mtu_probe_seq = (seq_nr - 1) & ACK_NR_MASK;
  772. mtu_probe_size = pkt->length;
  773. assert(pkt->length >= mtu_floor);
  774. assert(pkt->length <= mtu_ceiling);
  775. use_as_mtu_probe = true;
  776. log(UTP_LOG_MTU, "MTU [PROBE] floor:%d ceiling:%d current:%d"
  777. , mtu_floor, mtu_ceiling, mtu_probe_size);
  778. }
  779. pkt->transmissions++;
  780. send_data((byte*)pkt->data, pkt->length,
  781. (state == CS_SYN_SENT) ? connect_overhead
  782. : (pkt->transmissions == 1) ? payload_bandwidth
  783. : retransmit_overhead, use_as_mtu_probe ? UTP_UDP_DONTFRAG : 0);
  784. }
  785. bool UTPSocket::is_full(int bytes)
  786. {
  787. size_t packet_size = get_packet_size();
  788. if (bytes < 0) bytes = packet_size;
  789. else if (bytes > (int)packet_size) bytes = (int)packet_size;
  790. size_t max_send = min(max_window, opt_sndbuf, max_window_user);
  791. // subtract one to save space for the FIN packet
  792. if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) {
  793. #if UTP_DEBUG_LOGGING
  794. log(UTP_LOG_DEBUG, "is_full:false cur_window_packets:%d MAX:%d", cur_window_packets, OUTGOING_BUFFER_MAX_SIZE - 1);
  795. #endif
  796. last_maxed_out_window = ctx->current_ms;
  797. return true;
  798. }
  799. #if UTP_DEBUG_LOGGING
  800. log(UTP_LOG_DEBUG, "is_full:%s. cur_window:%u pkt:%u max:%u cur_window_packets:%u max_window:%u"
  801. , (cur_window + bytes > max_send) ? "true" : "false"
  802. , cur_window, bytes, max_send, cur_window_packets
  803. , max_window);
  804. #endif
  805. if (cur_window + bytes > max_send) {
  806. last_maxed_out_window = ctx->current_ms;
  807. return true;
  808. }
  809. return false;
  810. }
  811. bool UTPSocket::flush_packets()
  812. {
  813. size_t packet_size = get_packet_size();
  814. // send packets that are waiting on the pacer to be sent
  815. // i has to be an unsigned 16 bit counter to wrap correctly
  816. // signed types are not guaranteed to wrap the way you expect
  817. for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) {
  818. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i);
  819. if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue;
  820. // have we run out of quota?
  821. if (is_full()) return true;
  822. // Nagle check
  823. // don't send the last packet if we have one packet in-flight
  824. // and the current packet is still smaller than packet_size.
  825. if (i != ((seq_nr - 1) & ACK_NR_MASK) ||
  826. cur_window_packets == 1 ||
  827. pkt->payload >= packet_size) {
  828. send_packet(pkt);
  829. }
  830. }
  831. return false;
  832. }
  833. // @payload: number of bytes to send
  834. // @flags: either ST_DATA, or ST_FIN
  835. // @iovec: base address of iovec array
  836. // @num_iovecs: number of iovecs in array
  837. void UTPSocket::write_outgoing_packet(size_t payload, uint flags, struct utp_iovec *iovec, size_t num_iovecs)
  838. {
  839. // Setup initial timeout timer
  840. if (cur_window_packets == 0) {
  841. retransmit_timeout = rto;
  842. rto_timeout = ctx->current_ms + retransmit_timeout;
  843. assert(cur_window == 0);
  844. }
  845. size_t packet_size = get_packet_size();
  846. do {
  847. assert(cur_window_packets < OUTGOING_BUFFER_MAX_SIZE);
  848. assert(flags == ST_DATA || flags == ST_FIN);
  849. size_t added = 0;
  850. OutgoingPacket *pkt = NULL;
  851. if (cur_window_packets > 0) {
  852. pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1);
  853. }
  854. const size_t header_size = get_header_size();
  855. bool append = true;
  856. // if there's any room left in the last packet in the window
  857. // and it hasn't been sent yet, fill that frame first
  858. if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
  859. // Use the previous unsent packet
  860. added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload;
  861. pkt = (OutgoingPacket*)realloc(pkt,
  862. (sizeof(OutgoingPacket) - 1) +
  863. header_size +
  864. pkt->payload + added);
  865. outbuf.put(seq_nr - 1, pkt);
  866. append = false;
  867. assert(!pkt->need_resend);
  868. } else {
  869. // Create the packet to send.
  870. added = payload;
  871. pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) +
  872. header_size +
  873. added);
  874. pkt->payload = 0;
  875. pkt->transmissions = 0;
  876. pkt->need_resend = false;
  877. }
  878. if (added) {
  879. assert(flags == ST_DATA);
  880. // Fill it with data from the upper layer.
  881. unsigned char *p = pkt->data + header_size + pkt->payload;
  882. size_t needed = added;
  883. /*
  884. while (needed) {
  885. *p = *(char*)iovec[0].iov_base;
  886. p++;
  887. iovec[0].iov_base = (char *)iovec[0].iov_base + 1;
  888. needed--;
  889. }
  890. */
  891. for (size_t i = 0; i < num_iovecs && needed; i++) {
  892. if (iovec[i].iov_len == 0)
  893. continue;
  894. size_t num = min<size_t>(needed, iovec[i].iov_len);
  895. memcpy(p, iovec[i].iov_base, num);
  896. p += num;
  897. iovec[i].iov_len -= num;
  898. iovec[i].iov_base = (byte*)iovec[i].iov_base + num; // iovec[i].iov_base += num, but without void* pointers
  899. needed -= num;
  900. }
  901. assert(needed == 0);
  902. }
  903. pkt->payload += added;
  904. pkt->length = header_size + pkt->payload;
  905. last_rcv_win = get_rcv_window();
  906. PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
  907. p1->set_version(1);
  908. p1->set_type(flags);
  909. p1->ext = 0;
  910. p1->connid = conn_id_send;
  911. p1->windowsize = (uint32)last_rcv_win;
  912. p1->ack_nr = ack_nr;
  913. if (append) {
  914. // Remember the message in the outgoing queue.
  915. outbuf.ensure_size(seq_nr, cur_window_packets);
  916. outbuf.put(seq_nr, pkt);
  917. p1->seq_nr = seq_nr;
  918. seq_nr++;
  919. cur_window_packets++;
  920. }
  921. payload -= added;
  922. } while (payload);
  923. flush_packets();
  924. }
  925. #ifdef _DEBUG
  926. void UTPSocket::check_invariant()
  927. {
  928. if (reorder_count > 0) {
  929. assert(inbuf.get(ack_nr + 1) == NULL);
  930. }
  931. size_t outstanding_bytes = 0;
  932. for (int i = 0; i < cur_window_packets; ++i) {
  933. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
  934. if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
  935. outstanding_bytes += pkt->payload;
  936. }
  937. assert(outstanding_bytes == cur_window);
  938. }
  939. #endif
  940. void UTPSocket::check_timeouts()
  941. {
  942. #ifdef _DEBUG
  943. check_invariant();
  944. #endif
  945. // this invariant should always be true
  946. assert(cur_window_packets == 0 || outbuf.get(seq_nr - cur_window_packets));
  947. #if UTP_DEBUG_LOGGING
  948. log(UTP_LOG_DEBUG, "CheckTimeouts timeout:%d max_window:%u cur_window:%u "
  949. "state:%s cur_window_packets:%u",
  950. (int)(rto_timeout - ctx->current_ms), (uint)max_window, (uint)cur_window,
  951. statenames[state], cur_window_packets);
  952. #endif
  953. if (state != CS_DESTROY) flush_packets();
  954. switch (state) {
  955. case CS_SYN_SENT:
  956. case CS_SYN_RECV:
  957. case CS_CONNECTED_FULL:
  958. case CS_CONNECTED: {
  959. // Reset max window...
  960. if ((int)(ctx->current_ms - zerowindow_time) >= 0 && max_window_user == 0) {
  961. max_window_user = PACKET_SIZE;
  962. }
  963. if ((int)(ctx->current_ms - rto_timeout) >= 0
  964. && rto_timeout > 0) {
  965. bool ignore_loss = false;
  966. if (cur_window_packets == 1
  967. && ((seq_nr - 1) & ACK_NR_MASK) == mtu_probe_seq
  968. && mtu_probe_seq != 0) {
  969. // we only had a single outstanding packet that timed out, and it was the probe
  970. mtu_ceiling = mtu_probe_size - 1;
  971. mtu_search_update();
  972. // this packet was most likely dropped because the packet size being
  973. // too big and not because congestion. To accelerate the binary search for
  974. // the MTU, resend immediately and don't reset the window size
  975. ignore_loss = true;
  976. log(UTP_LOG_MTU, "MTU [PROBE-TIMEOUT] floor:%d ceiling:%d current:%d"
  977. , mtu_floor, mtu_ceiling, mtu_last);
  978. }
  979. // we dropepd the probe, clear these fields to
  980. // allow us to send a new one
  981. mtu_probe_seq = mtu_probe_size = 0;
  982. log(UTP_LOG_MTU, "MTU [TIMEOUT]");
  983. /*
  984. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
  985. // If there were a lot of retransmissions, force recomputation of round trip time
  986. if (pkt->transmissions >= 4)
  987. rtt = 0;
  988. */
  989. // Increase RTO
  990. const uint new_timeout = ignore_loss ? retransmit_timeout : retransmit_timeout * 2;
  991. // They initiated the connection but failed to respond before the rto.
  992. // A malicious client can also spoof the destination address of a ST_SYN bringing us to this state.
  993. // Kill the connection and do not notify the upper layer
  994. if (state == CS_SYN_RECV) {
  995. state = CS_DESTROY;
  996. utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
  997. return;
  998. }
  999. // We initiated the connection but the other side failed to respond before the rto
  1000. if (retransmit_count >= 4 || (state == CS_SYN_SENT && retransmit_count >= 2)) {
  1001. // 4 consecutive transmissions have timed out. Kill it. If we
  1002. // haven't even connected yet, give up after only 2 consecutive
  1003. // failed transmissions.
  1004. if (close_requested)
  1005. state = CS_DESTROY;
  1006. else
  1007. state = CS_RESET;
  1008. utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
  1009. return;
  1010. }
  1011. retransmit_timeout = new_timeout;
  1012. rto_timeout = ctx->current_ms + new_timeout;
  1013. if (!ignore_loss) {
  1014. // On Timeout
  1015. duplicate_ack = 0;
  1016. int packet_size = get_packet_size();
  1017. if ((cur_window_packets == 0) && ((int)max_window > packet_size)) {
  1018. // we don't have any packets in-flight, even though
  1019. // we could. This implies that the connection is just
  1020. // idling. No need to be aggressive about resetting the
  1021. // congestion window. Just let it decay by a 3:rd.
  1022. // don't set it any lower than the packet size though
  1023. max_window = max(max_window * 2 / 3, size_t(packet_size));
  1024. } else {
  1025. // our delay was so high that our congestion window
  1026. // was shrunk below one packet, preventing us from
  1027. // sending anything for one time-out period. Now, reset
  1028. // the congestion window to fit one packet, to start over
  1029. // again
  1030. max_window = packet_size;
  1031. slow_start = true;
  1032. }
  1033. }
  1034. // every packet should be considered lost
  1035. for (int i = 0; i < cur_window_packets; ++i) {
  1036. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
  1037. if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
  1038. pkt->need_resend = true;
  1039. assert(cur_window >= pkt->payload);
  1040. cur_window -= pkt->payload;
  1041. }
  1042. if (cur_window_packets > 0) {
  1043. retransmit_count++;
  1044. // used in parse_log.py
  1045. log(UTP_LOG_NORMAL, "Packet timeout. Resend. seq_nr:%u. timeout:%u "
  1046. "max_window:%u cur_window_packets:%d"
  1047. , seq_nr - cur_window_packets, retransmit_timeout
  1048. , (uint)max_window, int(cur_window_packets));
  1049. fast_timeout = true;
  1050. timeout_seq_nr = seq_nr;
  1051. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
  1052. assert(pkt);
  1053. // Re-send the packet.
  1054. send_packet(pkt);
  1055. }
  1056. }
  1057. // Mark the socket as writable. If the cwnd has grown, or if the number of
  1058. // bytes in-flight is lower than cwnd, we need to make the socket writable again
  1059. // in case it isn't
  1060. if (state == CS_CONNECTED_FULL && !is_full()) {
  1061. state = CS_CONNECTED;
  1062. #if UTP_DEBUG_LOGGING
  1063. log(UTP_LOG_DEBUG, "Socket writable. max_window:%u cur_window:%u packet_size:%u",
  1064. (uint)max_window, (uint)cur_window, (uint)get_packet_size());
  1065. #endif
  1066. utp_call_on_state_change(this->ctx, this, UTP_STATE_WRITABLE);
  1067. }
  1068. if (state >= CS_CONNECTED && !fin_sent) {
  1069. if ((int)(ctx->current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
  1070. send_keep_alive();
  1071. }
  1072. }
  1073. break;
  1074. }
  1075. // prevent warning
  1076. case CS_UNINITIALIZED:
  1077. case CS_IDLE:
  1078. case CS_RESET:
  1079. case CS_DESTROY:
  1080. break;
  1081. }
  1082. }
  1083. // this should be called every time we change mtu_floor or mtu_ceiling
  1084. void UTPSocket::mtu_search_update()
  1085. {
  1086. assert(mtu_floor <= mtu_ceiling);
  1087. // binary search
  1088. mtu_last = (mtu_floor + mtu_ceiling) / 2;
  1089. // enable a new probe to be sent
  1090. mtu_probe_seq = mtu_probe_size = 0;
  1091. // if the floor and ceiling are close enough, consider the
  1092. // MTU binary search complete. We set the current value
  1093. // to floor since that's the only size we know can go through
  1094. // also set the ceiling to floor to terminate the searching
  1095. if (mtu_ceiling - mtu_floor <= 16) {
  1096. mtu_last = mtu_floor;
  1097. log(UTP_LOG_MTU, "MTU [DONE] floor:%d ceiling:%d current:%d"
  1098. , mtu_floor, mtu_ceiling, mtu_last);
  1099. mtu_ceiling = mtu_floor;
  1100. assert(mtu_floor <= mtu_ceiling);
  1101. // Do another search in 30 minutes
  1102. mtu_discover_time = utp_call_get_milliseconds(this->ctx, this) + 30 * 60 * 1000;
  1103. }
  1104. }
  1105. void UTPSocket::mtu_reset()
  1106. {
  1107. mtu_ceiling = get_udp_mtu();
  1108. // Less would not pass TCP...
  1109. mtu_floor = 576;
  1110. log(UTP_LOG_MTU, "MTU [RESET] floor:%d ceiling:%d current:%d"
  1111. , mtu_floor, mtu_ceiling, mtu_last);
  1112. assert(mtu_floor <= mtu_ceiling);
  1113. mtu_discover_time = utp_call_get_milliseconds(this->ctx, this) + 30 * 60 * 1000;
  1114. }
  1115. // returns:
  1116. // 0: the packet was acked.
  1117. // 1: it means that the packet had already been acked
  1118. // 2: the packet has not been sent yet
  1119. int UTPSocket::ack_packet(uint16 seq)
  1120. {
  1121. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq);
  1122. // the packet has already been acked (or not sent)
  1123. if (pkt == NULL) {
  1124. #if UTP_DEBUG_LOGGING
  1125. log(UTP_LOG_DEBUG, "got ack for:%u (already acked, or never sent)", seq);
  1126. #endif
  1127. return 1;
  1128. }
  1129. // can't ack packets that haven't been sent yet!
  1130. if (pkt->transmissions == 0) {
  1131. #if UTP_DEBUG_LOGGING
  1132. log(UTP_LOG_DEBUG, "got ack for:%u (never sent, pkt_size:%u need_resend:%u)",
  1133. seq, (uint)pkt->payload, pkt->need_resend);
  1134. #endif
  1135. return 2;
  1136. }
  1137. #if UTP_DEBUG_LOGGING
  1138. log(UTP_LOG_DEBUG, "got ack for:%u (pkt_size:%u need_resend:%u)",
  1139. seq, (uint)pkt->payload, pkt->need_resend);
  1140. #endif
  1141. outbuf.put(seq, NULL);
  1142. // if we never re-sent the packet, update the RTT estimate
  1143. if (pkt->transmissions == 1) {
  1144. // Estimate the round trip time.
  1145. const uint32 ertt = (uint32)((utp_call_get_microseconds(this->ctx, this) - pkt->time_sent) / 1000);
  1146. if (rtt == 0) {
  1147. // First round trip time sample
  1148. rtt = ertt;
  1149. rtt_var = ertt / 2;
  1150. // sanity check. rtt should never be more than 6 seconds
  1151. // assert(rtt < 6000);
  1152. } else {
  1153. // Compute new round trip times
  1154. const int delta = (int)rtt - ertt;
  1155. rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4;
  1156. rtt = rtt - rtt/8 + ertt/8;
  1157. // sanity check. rtt should never be more than 6 seconds
  1158. // assert(rtt < 6000);
  1159. rtt_hist.add_sample(ertt, ctx->current_ms);
  1160. }
  1161. rto = max<uint>(rtt + rtt_var * 4, 1000);
  1162. #if UTP_DEBUG_LOGGING
  1163. log(UTP_LOG_DEBUG, "rtt:%u avg:%u var:%u rto:%u",
  1164. ertt, rtt, rtt_var, rto);
  1165. #endif
  1166. }
  1167. retransmit_timeout = rto;
  1168. rto_timeout = ctx->current_ms + rto;
  1169. // if need_resend is set, this packet has already
  1170. // been considered timed-out, and is not included in
  1171. // the cur_window anymore
  1172. if (!pkt->need_resend) {
  1173. assert(cur_window >= pkt->payload);
  1174. cur_window -= pkt->payload;
  1175. }
  1176. free(pkt);
  1177. retransmit_count = 0;
  1178. return 0;
  1179. }
  1180. // count the number of bytes that were acked by the EACK header
  1181. size_t UTPSocket::selective_ack_bytes(uint base, const byte* mask, byte len, int64& min_rtt)
  1182. {
  1183. if (cur_window_packets == 0) return 0;
  1184. size_t acked_bytes = 0;
  1185. int bits = len * 8;
  1186. uint64 now = utp_call_get_microseconds(this->ctx, this);
  1187. do {
  1188. uint v = base + bits;
  1189. // ignore bits that haven't been sent yet
  1190. // see comment in UTPSocket::selective_ack
  1191. if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
  1192. continue;
  1193. // ignore bits that represents packets we haven't sent yet
  1194. // or packets that have already been acked
  1195. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
  1196. if (!pkt || pkt->transmissions == 0)
  1197. continue;
  1198. // Count the number of segments that were successfully received past it.
  1199. if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) {
  1200. assert((int)(pkt->payload) >= 0);
  1201. acked_bytes += pkt->payload;
  1202. if (pkt->time_sent < now)
  1203. min_rtt = min<int64>(min_rtt, now - pkt->time_sent);
  1204. else
  1205. min_rtt = min<int64>(min_rtt, 50000);
  1206. continue;
  1207. }
  1208. } while (--bits >= -1);
  1209. return acked_bytes;
  1210. }
  1211. enum { MAX_EACK = 128 };
  1212. void UTPSocket::selective_ack(uint base, const byte *mask, byte len)
  1213. {
  1214. if (cur_window_packets == 0) return;
  1215. // the range is inclusive [0, 31] bits
  1216. int bits = len * 8 - 1;
  1217. int count = 0;
  1218. // resends is a stack of sequence numbers we need to resend. Since we
  1219. // iterate in reverse over the acked packets, at the end, the top packets
  1220. // are the ones we want to resend
  1221. int resends[MAX_EACK];
  1222. int nr = 0;
  1223. #if UTP_DEBUG_LOGGING
  1224. char bitmask[1024] = {0};
  1225. int counter = bits;
  1226. for (int i = 0; i <= bits; ++i) {
  1227. bool bit_set = counter >= 0 && mask[counter>>3] & (1 << (counter & 7));
  1228. bitmask[i] = bit_set ? '1' : '0';
  1229. --counter;
  1230. }
  1231. log(UTP_LOG_DEBUG, "Got EACK [%s] base:%u", bitmask, base);
  1232. #endif
  1233. do {
  1234. // we're iterating over the bits from higher sequence numbers
  1235. // to lower (kind of in reverse order, wich might not be very
  1236. // intuitive)
  1237. uint v = base + bits;
  1238. // ignore bits that haven't been sent yet
  1239. // and bits that fall below the ACKed sequence number
  1240. // this can happen if an EACK message gets
  1241. // reordered and arrives after a packet that ACKs up past
  1242. // the base for thie EACK message
  1243. // this is essentially the same as:
  1244. // if v >= seq_nr || v <= seq_nr - cur_window_packets
  1245. // but it takes wrapping into account
  1246. // if v == seq_nr the -1 will make it wrap. if v > seq_nr
  1247. // it will also wrap (since it will fall further below 0)
  1248. // and be > cur_window_packets.
  1249. // if v == seq_nr - cur_window_packets, the result will be
  1250. // seq_nr - (seq_nr - cur_window_packets) - 1
  1251. // == seq_nr - seq_nr + cur_window_packets - 1
  1252. // == cur_window_packets - 1 which will be caught by the
  1253. // test. If v < seq_nr - cur_window_packets the result will grow
  1254. // fall furhter outside of the cur_window_packets range.
  1255. // sequence number space:
  1256. //
  1257. // rejected < accepted > rejected
  1258. // <============+--------------+============>
  1259. // ^ ^
  1260. // | |
  1261. // (seq_nr-wnd) seq_nr
  1262. if (((seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(cur_window_packets - 1))
  1263. continue;
  1264. // this counts as a duplicate ack, even though we might have
  1265. // received an ack for this packet previously (in another EACK
  1266. // message for instance)
  1267. bool bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7));
  1268. // if this packet is acked, it counts towards the duplicate ack counter
  1269. if (bit_set) count++;
  1270. // ignore bits that represents packets we haven't sent yet
  1271. // or packets that have already been acked
  1272. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
  1273. if (!pkt || pkt->transmissions == 0) {
  1274. #if UTP_DEBUG_LOGGING
  1275. log(UTP_LOG_DEBUG, "skipping %u. pkt:%08x transmissions:%u %s",
  1276. v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)");
  1277. #endif
  1278. continue;
  1279. }
  1280. // Count the number of segments that were successfully received past it.
  1281. if (bit_set) {
  1282. // the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets
  1283. assert((v & outbuf.mask) != ((seq_nr - cur_window_packets) & outbuf.mask));
  1284. ack_packet(v);
  1285. continue;
  1286. }
  1287. // Resend segments
  1288. // if count is less than our re-send limit, we haven't seen enough
  1289. // acked packets in front of this one to warrant a re-send.
  1290. // if count == 0, we're still going through the tail of zeroes
  1291. if (((v - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
  1292. count >= DUPLICATE_ACKS_BEFORE_RESEND) {
  1293. // resends is a stack, and we're mostly interested in the top of it
  1294. // if we're full, just throw away the lower half
  1295. if (nr >= MAX_EACK - 2) {
  1296. memmove(resends, &resends[MAX_EACK/2], MAX_EACK/2 * sizeof(resends[0]));
  1297. nr -= MAX_EACK / 2;
  1298. }
  1299. resends[nr++] = v;
  1300. #if UTP_DEBUG_LOGGING
  1301. log(UTP_LOG_DEBUG, "no ack for %u", v);
  1302. #endif
  1303. } else {
  1304. #if UTP_DEBUG_LOGGING
  1305. log(UTP_LOG_DEBUG, "not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
  1306. v, count, duplicate_ack, fast_resend_seq_nr);
  1307. #endif
  1308. }
  1309. } while (--bits >= -1);
  1310. if (((base - 1 - fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE &&
  1311. count >= DUPLICATE_ACKS_BEFORE_RESEND) {
  1312. // if we get enough duplicate acks to start
  1313. // resending, the first packet we should resend
  1314. // is base-1
  1315. resends[nr++] = (base - 1) & ACK_NR_MASK;
  1316. #if UTP_DEBUG_LOGGING
  1317. log(UTP_LOG_DEBUG, "no ack for %u", (base - 1) & ACK_NR_MASK);
  1318. #endif
  1319. } else {
  1320. #if UTP_DEBUG_LOGGING
  1321. log(UTP_LOG_DEBUG, "not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u",
  1322. base - 1, count, duplicate_ack, fast_resend_seq_nr);
  1323. #endif
  1324. }
  1325. bool back_off = false;
  1326. int i = 0;
  1327. while (nr > 0) {
  1328. uint v = resends[--nr];
  1329. // don't consider the tail of 0:es to be lost packets
  1330. // only unacked packets with acked packets after should
  1331. // be considered lost
  1332. OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(v);
  1333. // this may be an old (re-ordered) packet, and some of the
  1334. // packets in here may have been acked already. In which
  1335. // case they will not be in the send queue anymore
  1336. if (!pkt) continue;
  1337. // used in parse_log.py
  1338. log(UTP_LOG_NORMAL, "Packet %u lost. Resending", v);
  1339. // On Loss
  1340. back_off = true;
  1341. #ifdef _DEBUG
  1342. ++_stats.rexmit;
  1343. #endif
  1344. send_packet(pkt);
  1345. fast_resend_seq_nr = (v + 1) & ACK_NR_MASK;
  1346. // Re-send max 4 packets.
  1347. if (++i >= 4) break;
  1348. }
  1349. if (back_off)
  1350. maybe_decay_win(ctx->current_ms);
  1351. duplicate_ack = count;
  1352. }
  1353. void UTPSocket::apply_ccontrol(size_t bytes_acked, uint32 actual_delay, int64 min_rtt)
  1354. {
  1355. // the delay can never be greater than the rtt. The min_rtt
  1356. // variable is the RTT in microseconds
  1357. assert(min_rtt >= 0);
  1358. int32 our_delay = min<uint32>(our_hist.get_value(), uint32(min_rtt));
  1359. assert(our_delay != INT_MAX);
  1360. assert(our_delay >= 0);
  1361. utp_call_on_delay_sample(this->ctx, this, our_delay / 1000);
  1362. // This test the connection under heavy load from foreground
  1363. // traffic. Pretend that our delays are very high to force the
  1364. // connection to use sub-packet size window sizes
  1365. //our_delay *= 4;
  1366. // target is microseconds
  1367. int target = target_delay;
  1368. if (target <= 0) target = 100000;
  1369. // this is here to compensate for very large clock drift that affects
  1370. // the congestion controller into giving certain endpoints an unfair
  1371. // share of the bandwidth. We have an estimate of the clock drift
  1372. // (clock_drift). The unit of this is microseconds per 5 seconds.
  1373. // empirically, a reasonable cut-off appears to be about 200000
  1374. // (which is pretty high). The main purpose is to compensate for
  1375. // people trying to "cheat" uTP by making their clock run slower,
  1376. // and this definitely catches that without any risk of false positives
  1377. // if clock_drift < -200000 start applying a penalty delay proportional
  1378. // to how far beoynd -200000 the clock drift is
  1379. int32 penalty = 0;
  1380. if (clock_drift < -200000) {
  1381. penalty = (-clock_drift - 200000) / 7;
  1382. our_delay += penalty;
  1383. }
  1384. double off_target = target - our_delay;
  1385. // this is the same as:
  1386. //
  1387. // (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
  1388. //
  1389. // so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
  1390. // of the target delay the current delay represents.
  1391. // The min() around off_target protects against crazy values of our_delay, which may happen when th
  1392. // timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
  1393. // of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
  1394. // as for large negative numbers, this direction is already capped at the min packet size further down
  1395. // the min around the bytes_acked protects against the case where the window size was recently
  1396. // shrunk and the number of acked bytes exceeds that. This is considered no more than one full
  1397. // window, in order to keep the gain within sane boundries.
  1398. assert(bytes_acked > 0);
  1399. double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked);
  1400. double delay_factor = off_target / target;
  1401. double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;
  1402. // since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
  1403. // may increase per RTT, we may not increase the window size more than that proportional
  1404. // to the number of bytes that were acked, so that once one window has been acked (one rtt)
  1405. // the increase limit is not exceeded
  1406. // the +1. is to allow for floating point imprecision
  1407. assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked));
  1408. if (scaled_gain > 0 && ctx->current_ms - last_maxed_out_window > 1000) {
  1409. // if it was more than 1 second since we tried to send a packet
  1410. // and stopped because we hit the max window, we're most likely rate
  1411. // limited (which prevents us from ever hitting the window size)
  1412. // if this is the case, we cannot let the max_window grow indefinitely
  1413. scaled_gain = 0;
  1414. }
  1415. size_t ledbat_cwnd = (max_window + scaled_gain < MIN_WINDOW_SIZE) ? MIN_WINDOW_SIZE : (size_t)(max_window + scaled_gain);
  1416. if (slow_start) {
  1417. size_t ss_cwnd = (size_t)(max_window + window_factor*get_packet_size());
  1418. if (ss_cwnd > ssthresh) {
  1419. slow_start = false;
  1420. } else if (our_delay > target*0.9) {
  1421. // even if we're a little under the target delay, we conservatively
  1422. // discontinue the slow start phase
  1423. slow_start = false;
  1424. ssthresh = max_window;
  1425. } else {
  1426. max_window = max(ss_cwnd, ledbat_cwnd);
  1427. }
  1428. } else {
  1429. max_window = ledbat_cwnd;
  1430. }
  1431. // make sure that the congestion window is below max
  1432. // make sure that we don't shrink our window too small
  1433. max_window = clamp<size_t>(max_window, MIN_WINDOW_SIZE, opt_sndbuf);
  1434. // used in parse_log.py
  1435. log(UTP_LOG_NORMAL, "actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u "
  1436. "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u "
  1437. "scaled_gain:%f rtt:%u rate:%u wnduser:%u rto:%u timeout:%d get_microseconds:" I64u " "
  1438. "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u "
  1439. "average_delay:%d clock_drift:%d clock_drift_raw:%d delay_penalty:%d current_delay_sum:" I64u
  1440. "current_delay_samples:%d average_delay_base:%d last_maxed_out_window:" I64u " opt_sndbuf:%d "
  1441. "current_ms:" I64u "",
  1442. actual_delay, our_delay / 1000, their_hist.get_value() / 1000,
  1443. int(off_target / 1000), uint(max_window), uint32(our_hist.delay_base),
  1444. int((our_delay + their_hist.get_value()) / 1000), int(target / 1000), uint(bytes_acked),
  1445. (uint)(cur_window - bytes_acked), (float)(scaled_gain), rtt,
  1446. (uint)(max_window * 1000 / (rtt_hist.delay_base?rtt_hist.delay_base:50)),
  1447. (uint)max_window_user, rto, (int)(rto_timeout - ctx->current_ms),
  1448. utp_call_get_microseconds(this->ctx, this), cur_window_packets, (uint)get_packet_size(),
  1449. their_hist.delay_base, their_hist.delay_base + their_hist.get_value(),
  1450. average_delay, clock_drift, clock_drift_raw, penalty / 1000,
  1451. current_delay_sum, current_delay_samples, average_delay_base,
  1452. uint64(last_maxed_out_window), int(opt_sndbuf), uint64(ctx->current_ms));
  1453. }
  1454. static void utp_register_recv_packet(UTPSocket *conn, size_t len)
  1455. {
  1456. #ifdef _DEBUG
  1457. ++conn->_stats.nrecv;
  1458. conn->_stats.nbytes_recv += len;
  1459. #endif
  1460. if (len <= PACKET_SIZE_MID) {
  1461. if (len <= PACKET_SIZE_EMPTY) {
  1462. conn->ctx->context_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++;
  1463. } else if (len <= PACKET_SIZE_SMALL) {
  1464. conn->ctx->context_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++;
  1465. } else
  1466. conn->ctx->context_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++;
  1467. } else {
  1468. if (len <= PACKET_SIZE_BIG) {
  1469. conn->ctx->context_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++;
  1470. } else
  1471. conn->ctx->context_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++;
  1472. }
  1473. }
  1474. // returns the max number of bytes of payload the uTP
  1475. // connection is allowed to send
  1476. size_t UTPSocket::get_packet_size() const
  1477. {
  1478. int header_size = sizeof(PacketFormatV1);
  1479. size_t mtu = mtu_last ? mtu_last : mtu_ceiling;
  1480. return mtu - header_size;
  1481. }
  1482. // Process an incoming packet
  1483. // syn is true if this is the first packet received. It will cut off parsing
  1484. // as soon as the header is done
  1485. size_t utp_process_incoming(UTPSocket *conn, const byte *packet, size_t len, bool syn = false)
  1486. {
  1487. utp_register_recv_packet(conn, len);
  1488. conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
  1489. const PacketFormatV1 *pf1 = (PacketFormatV1*)packet;
  1490. const byte *packet_end = packet + len;
  1491. uint16 pk_seq_nr = pf1->seq_nr;
  1492. uint16 pk_ack_nr = pf1->ack_nr;
  1493. uint8 pk_flags = pf1->type();
  1494. if (pk_flags >= ST_NUM_STATES) return 0;
  1495. #if UTP_DEBUG_LOGGING
  1496. conn->log(UTP_LOG_DEBUG, "Got %s. seq_nr:%u ack_nr:%u state:%s timestamp:" I64u " reply_micro:%u"
  1497. , flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state]
  1498. , uint64(pf1->tv_usec), (uint32)(pf1->reply_micro));
  1499. #endif
  1500. // mark receipt time
  1501. uint64 time = utp_call_get_microseconds(conn->ctx, conn);
  1502. // window packets size is used to calculate a minimum
  1503. // permissible range for received acks. connections with acks falling
  1504. // out of this range are dropped
  1505. const uint16 curr_window = max<uint16>(conn->cur_window_packets + ACK_NR_ALLOWED_WINDOW, ACK_NR_ALLOWED_WINDOW);
  1506. // ignore packets whose ack_nr is invalid. This would imply a spoofed address
  1507. // or a malicious attempt to attach the uTP implementation.
  1508. // acking a packet that hasn't been sent yet!
  1509. // SYN packets have an exception, since there are no previous packets
  1510. if ((pk_flags != ST_SYN || conn->state != CS_SYN_RECV) &&
  1511. (wrapping_compare_less(conn->seq_nr - 1, pk_ack_nr, ACK_NR_MASK)
  1512. || wrapping_compare_less(pk_ack_nr, conn->seq_nr - 1 - curr_window, ACK_NR_MASK))) {
  1513. #if UTP_DEBUG_LOGGING
  1514. conn->log(UTP_LOG_DEBUG, "Invalid ack_nr: %u. our seq_nr: %u last unacked: %u"
  1515. , pk_ack_nr, conn->seq_nr, (conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK);
  1516. #endif
  1517. return 0;
  1518. }
  1519. // RSTs are handled earlier, since the connid matches the send id not the recv id
  1520. assert(pk_flags != ST_RESET);
  1521. // TODO: maybe send a ST_RESET if we're in CS_RESET?
  1522. const byte *selack_ptr = NULL;
  1523. // Unpack UTP packet options
  1524. // Data pointer
  1525. const byte *data = (const byte*)pf1 + conn->get_header_size();
  1526. if (conn->get_header_size() > len) {
  1527. #if UTP_DEBUG_LOGGING
  1528. conn->log(UTP_LOG_DEBUG, "Invalid packet size (less than header size)");
  1529. #endif
  1530. return 0;
  1531. }
  1532. // Skip the extension headers
  1533. uint extension = pf1->ext;
  1534. if (extension != 0) {
  1535. do {
  1536. // Verify that the packet is valid.
  1537. data += 2;
  1538. if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) {
  1539. #if UTP_DEBUG_LOGGING
  1540. conn->log(UTP_LOG_DEBUG, "Invalid len of extensions");
  1541. #endif
  1542. return 0;
  1543. }
  1544. switch(extension) {
  1545. case 1: // Selective Acknowledgment
  1546. selack_ptr = data;
  1547. break;
  1548. case 2: // extension bits
  1549. if (data[-1] != 8) {
  1550. #if UTP_DEBUG_LOGGING
  1551. conn->log(UTP_LOG_DEBUG, "Invalid len of extension bits header");
  1552. #endif
  1553. return 0;
  1554. }
  1555. memcpy(conn->extensions, data, 8);
  1556. #if UTP_DEBUG_LOGGING
  1557. conn->log(UTP_LOG_DEBUG, "got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x",
  1558. conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3],
  1559. conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]);
  1560. #endif
  1561. }
  1562. extension = data[-2];
  1563. data += data[-1];
  1564. } while (extension);
  1565. }
  1566. if (conn->state == CS_SYN_SENT) {
  1567. // if this is a syn-ack, initialize our ack_nr
  1568. // to match the sequence number we got from
  1569. // the other end
  1570. conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK;
  1571. }
  1572. conn->last_got_packet = conn->ctx->current_ms;
  1573. if (syn) {
  1574. return 0;
  1575. }
  1576. // seqnr is the number of packets past the expected
  1577. // packet this is. ack_nr is the last acked, seq_nr is the
  1578. // current. Subtracring 1 makes 0 mean "this is the next
  1579. // expected packet".
  1580. const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;
  1581. // Getting an invalid sequence number?
  1582. if (seqnr >= REORDER_BUFFER_MAX_SIZE) {
  1583. if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE) {
  1584. conn->schedule_ack();
  1585. }
  1586. #if UTP_DEBUG_LOGGING
  1587. conn->log(UTP_LOG_DEBUG, " Got old Packet/Ack (%u/%u)=%u"
  1588. , pk_seq_nr, conn->ack_nr, seqnr);
  1589. #endif
  1590. return 0;
  1591. }
  1592. // Process acknowledgment
  1593. // acks is the number of packets that was acked
  1594. int acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK;
  1595. // this happens when we receive an old ack nr
  1596. if (acks > conn->cur_window_packets) acks = 0;
  1597. // if we get the same ack_nr as in the last packet
  1598. // increase the duplicate_ack counter, otherwise reset
  1599. // it to 0.
  1600. // It's important to only count ACKs in ST_STATE packets. Any other
  1601. // packet (primarily ST_DATA) is likely to have been sent because of the
  1602. // other end having new outgoing data, not in response to incoming data.
  1603. // For instance, if we're receiving a steady stream of payload with no
  1604. // outgoing data, and we suddently have a few bytes of payload to send (say,
  1605. // a bittorrent HAVE message), we're very likely to see 3 duplicate ACKs
  1606. // immediately after sending our payload packet. This effectively disables
  1607. // the fast-resend on duplicate-ack logic for bi-directional connections
  1608. // (except in the case of a selective ACK). This is in line with BSD4.4 TCP
  1609. // implementation.
  1610. if (conn->cur_window_packets > 0) {
  1611. if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK)
  1612. && conn->cur_window_packets > 0
  1613. && pk_flags == ST_STATE) {
  1614. ++conn->duplicate_ack;
  1615. if (conn->duplicate_ack == DUPLICATE_ACKS_BEFORE_RESEND && conn->mtu_probe_seq) {
  1616. // It's likely that the probe was rejected due to its size, but we haven't got an
  1617. // ICMP report back yet
  1618. if (pk_ack_nr == ((conn->mtu_probe_seq - 1) & ACK_NR_MASK)) {
  1619. conn->mtu_ceiling = conn->mtu_probe_size - 1;
  1620. conn->mtu_search_update();
  1621. conn->log(UTP_LOG_MTU, "MTU [DUPACK] floor:%d ceiling:%d current:%d"
  1622. , conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
  1623. } else {
  1624. // A non-probe was blocked before our probe.
  1625. // Can't conclude much, send a new probe
  1626. conn->mtu_probe_seq = conn->mtu_probe_size = 0;
  1627. }
  1628. }
  1629. } else {
  1630. conn->duplicate_ack = 0;
  1631. }
  1632. // TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND
  1633. // and fast_resend_seq_nr <= ack_nr + 1
  1634. // resend ack_nr + 1
  1635. // also call maybe_decay_win()
  1636. }
  1637. // figure out how many bytes were acked
  1638. size_t acked_bytes = 0;
  1639. // the minimum rtt of all acks
  1640. // this is the upper limit on the delay we get back
  1641. // from the other peer. Our delay cannot exceed
  1642. // the rtt of the packet. If it does, clamp it.
  1643. // this is done in apply_ledbat_ccontrol()
  1644. int64 min_rtt = INT64_MAX;
  1645. uint64 now = utp_call_get_microseconds(conn->ctx, conn);
  1646. for (int i = 0; i < acks; ++i) {
  1647. int seq = (conn->seq_nr - conn->cur_window_packets + i) & ACK_NR_MASK;
  1648. OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(seq);
  1649. if (pkt == 0 || pkt->transmissions == 0) continue;
  1650. assert((int)(pkt->payload) >= 0);
  1651. acked_bytes += pkt->payload;
  1652. if (conn->mtu_probe_seq && seq == conn->mtu_probe_seq) {
  1653. conn->mtu_floor = conn->mtu_probe_size;
  1654. conn->mtu_search_update();
  1655. conn->log(UTP_LOG_MTU, "MTU [ACK] floor:%d ceiling:%d current:%d"
  1656. , conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
  1657. }
  1658. // in case our clock is not monotonic
  1659. if (pkt->time_sent < now)
  1660. min_rtt = min<int64>(min_rtt, now - pkt->time_sent);
  1661. else
  1662. min_rtt = min<int64>(min_rtt, 50000);
  1663. }
  1664. // count bytes acked by EACK
  1665. if (selack_ptr != NULL) {
  1666. acked_bytes += conn->selective_ack_bytes((pk_ack_nr + 2) & ACK_NR_MASK,
  1667. selack_ptr, selack_ptr[-1], min_rtt);
  1668. }
  1669. #if UTP_DEBUG_LOGGING
  1670. conn->log(UTP_LOG_DEBUG, "acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u",
  1671. acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets,
  1672. seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt);
  1673. #endif
  1674. uint64 p = pf1->tv_usec;
  1675. conn->last_measured_delay = conn->ctx->current_ms;
  1676. // get delay in both directions
  1677. // record the delay to report back
  1678. const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
  1679. conn->reply_micro = their_delay;
  1680. uint32 prev_delay_base = conn->their_hist.delay_base;
  1681. if (their_delay != 0) conn->their_hist.add_sample(their_delay, conn->ctx->current_ms);
  1682. // if their new delay base is less than their previous one
  1683. // we should shift our delay base in the other direction in order
  1684. // to take the clock skew into account
  1685. if (prev_delay_base != 0 &&
  1686. wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base, TIMESTAMP_MASK)) {
  1687. // never adjust more than 10 milliseconds
  1688. if (prev_delay_base - conn->their_hist.delay_base <= 10000) {
  1689. conn->our_hist.shift(prev_delay_base - conn->their_hist.delay_base);
  1690. }
  1691. }
  1692. const uint32 actual_delay = (uint32(pf1->reply_micro)==INT_MAX?0:uint32(pf1->reply_micro));
  1693. // if the actual delay is 0, it means the other end
  1694. // hasn't received a sample from us yet, and doesn't
  1695. // know what it is. We can't update out history unless
  1696. // we have a true measured sample
  1697. if (actual_delay != 0) {
  1698. conn->our_hist.add_sample(actual_delay, conn->ctx->current_ms);
  1699. // this is keeping an average of the delay samples
  1700. // we've recevied within the last 5 seconds. We sum
  1701. // all the samples and increase the count in order to
  1702. // calculate the average every 5 seconds. The samples
  1703. // are based off of the average_delay_base to deal with
  1704. // wrapping counters.
  1705. if (conn->average_delay_base == 0) conn->average_delay_base = actual_delay;
  1706. int64 average_delay_sample = 0;
  1707. // distance walking from lhs to rhs, downwards
  1708. const uint32 dist_down = conn->average_delay_base - actual_delay;
  1709. // distance walking from lhs to rhs, upwards
  1710. const uint32 dist_up = actual_delay - conn->average_delay_base;
  1711. if (dist_down > dist_up) {
  1712. // assert(dist_up < INT_MAX / 4);
  1713. // average_delay_base < actual_delay, we should end up
  1714. // with a positive sample
  1715. average_delay_sample = dist_up;
  1716. } else {
  1717. // assert(-int64(dist_down) < INT_MAX / 4);
  1718. // average_delay_base >= actual_delay, we should end up
  1719. // with a negative sample
  1720. average_delay_sample = -int64(dist_down);
  1721. }
  1722. conn->current_delay_sum += average_delay_sample;
  1723. ++conn->current_delay_samples;
  1724. if (conn->ctx->current_ms > conn->average_sample_time) {
  1725. int32 prev_average_delay = conn->average_delay;
  1726. assert(conn->current_delay_sum / conn->current_delay_samples < INT_MAX);
  1727. assert(conn->current_delay_sum / conn->current_delay_samples > -INT_MAX);
  1728. // write the new average
  1729. conn->average_delay = (int32)(conn->current_delay_sum / conn->current_delay_samples);
  1730. // each slot represents 5 seconds
  1731. conn->average_sample_time += 5000;
  1732. conn->current_delay_sum = 0;
  1733. conn->current_delay_samples = 0;
  1734. // this makes things very confusing when logging the average delay
  1735. //#if !g_log_utp
  1736. // normalize the average samples
  1737. // since we're only interested in the slope
  1738. // of the curve formed by the average delay samples,
  1739. // we can cancel out the actual offset to make sure
  1740. // we won't have problems with wrapping.
  1741. int min_sample = min(prev_average_delay, conn->average_delay);
  1742. int max_sample = max(prev_average_delay, conn->average_delay);
  1743. // normalize around zero. Try to keep the min <= 0 and max >= 0
  1744. int adjust = 0;
  1745. if (min_sample > 0) {
  1746. // adjust all samples (and the baseline) down by min_sample
  1747. adjust = -min_sample;
  1748. } else if (max_sample < 0) {
  1749. // adjust all samples (and the baseline) up by -max_sample
  1750. adjust = -max_sample;
  1751. }
  1752. if (adjust) {
  1753. conn->average_delay_base -= adjust;
  1754. conn->average_delay += adjust;
  1755. prev_average_delay += adjust;
  1756. }
  1757. //#endif
  1758. // update the clock drift estimate
  1759. // the unit is microseconds per 5 seconds
  1760. // what we're doing is just calculating the average of the
  1761. // difference between each slot. Since each slot is 5 seconds
  1762. // and the timestamps unit are microseconds, we'll end up with
  1763. // the average slope across our history. If there is a consistent
  1764. // trend, it will show up in this value
  1765. //int64 slope = 0;
  1766. int32 drift = conn->average_delay - prev_average_delay;
  1767. // clock_drift is a rolling average
  1768. conn->clock_drift = (int64(conn->clock_drift) * 7 + drift) / 8;
  1769. conn->clock_drift_raw = drift;
  1770. }
  1771. }
  1772. // if our new delay base is less than our previous one
  1773. // we should shift the other end's delay base in the other
  1774. // direction in order to take the clock skew into account
  1775. // This is commented out because it creates bad interactions
  1776. // with our adjustment in the other direction. We don't really
  1777. // need our estimates of the other peer to be very accurate
  1778. // anyway. The problem with shifting here is that we're more
  1779. // likely shift it back later because of a low latency. This
  1780. // second shift back would cause us to shift our delay base
  1781. // which then get's into a death spiral of shifting delay bases
  1782. /* if (prev_delay_base != 0 &&
  1783. wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
  1784. // never adjust more than 10 milliseconds
  1785. if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
  1786. conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
  1787. }
  1788. }
  1789. */
  1790. // if the delay estimate exceeds the RTT, adjust the base_delay to
  1791. // compensate
  1792. assert(min_rtt >= 0);
  1793. if (int64(conn->our_hist.get_value()) > min_rtt) {
  1794. conn->our_hist.shift((uint32)(conn->our_hist.get_value() - min_rtt));
  1795. }
  1796. // only apply the congestion controller on acks
  1797. // if we don't have a delay measurement, there's
  1798. // no point in invoking the congestion control
  1799. if (actual_delay != 0 && acked_bytes >= 1)
  1800. conn->apply_ccontrol(acked_bytes, actual_delay, min_rtt);
  1801. // sanity check, the other end should never ack packets
  1802. // past the point we've sent
  1803. if (acks <= conn->cur_window_packets) {
  1804. conn->max_window_user = pf1->windowsize;
  1805. // If max user window is set to 0, then we startup a timer
  1806. // That will reset it to 1 after 15 seconds.
  1807. if (conn->max_window_user == 0)
  1808. // Reset max_window_user to 1 every 15 seconds.
  1809. conn->zerowindow_time = conn->ctx->current_ms + 15000;
  1810. // Respond to connect message
  1811. // Switch to CONNECTED state.
  1812. // If this is an ack and we're in still handshaking
  1813. // transition over to the connected state.
  1814. // Incoming connection completion
  1815. if (pk_flags == ST_DATA && conn->state == CS_SYN_RECV) {
  1816. conn->state = CS_CONNECTED;
  1817. }
  1818. // Outgoing connection completion
  1819. if (pk_flags == ST_STATE && conn->state == CS_SYN_SENT) {
  1820. conn->state = CS_CONNECTED;
  1821. // If the user has defined the ON_CONNECT callback, use that to
  1822. // notify the user that the socket is now connected. If ON_CONNECT
  1823. // has not been defined, notify the user via ON_STATE_CHANGE.
  1824. if (conn->ctx->callbacks[UTP_ON_CONNECT])
  1825. utp_call_on_connect(conn->ctx, conn);
  1826. else
  1827. utp_call_on_state_change(conn->ctx, conn, UTP_STATE_CONNECT);
  1828. // We've sent a fin, and everything was ACKed (including the FIN).
  1829. // cur_window_packets == acks means that this packet acked all
  1830. // the remaining packets that were in-flight.
  1831. } else if (conn->fin_sent && conn->cur_window_packets == acks) {
  1832. conn->fin_sent_acked = true;
  1833. if (conn->close_requested) {
  1834. conn->state = CS_DESTROY;
  1835. }
  1836. }
  1837. // Update fast resend counter
  1838. if (wrapping_compare_less(conn->fast_resend_seq_nr
  1839. , (pk_ack_nr + 1) & ACK_NR_MASK, ACK_NR_MASK))
  1840. conn->fast_resend_seq_nr = (pk_ack_nr + 1) & ACK_NR_MASK;
  1841. #if UTP_DEBUG_LOGGING
  1842. conn->log(UTP_LOG_DEBUG, "fast_resend_seq_nr:%u", conn->fast_resend_seq_nr);
  1843. #endif
  1844. for (int i = 0; i < acks; ++i) {
  1845. int ack_status = conn->ack_packet(conn->seq_nr - conn->cur_window_packets);
  1846. // if ack_status is 0, the packet was acked.
  1847. // if acl_stauts is 1, it means that the packet had already been acked
  1848. // if it's 2, the packet has not been sent yet
  1849. // We need to break this loop in the latter case. This could potentially
  1850. // happen if we get an ack_nr that does not exceed what we have stuffed
  1851. // into the outgoing buffer, but does exceed what we have sent
  1852. if (ack_status == 2) {
  1853. #ifdef _DEBUG
  1854. OutgoingPacket* pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
  1855. assert(pkt->transmissions == 0);
  1856. #endif
  1857. break;
  1858. }
  1859. conn->cur_window_packets--;
  1860. #if UTP_DEBUG_LOGGING
  1861. conn->log(UTP_LOG_DEBUG, "decementing cur_window_packets:%u", conn->cur_window_packets);
  1862. #endif
  1863. }
  1864. #ifdef _DEBUG
  1865. if (conn->cur_window_packets == 0)
  1866. assert(conn->cur_window == 0);
  1867. #endif
  1868. // packets in front of this may have been acked by a
  1869. // selective ack (EACK). Keep decreasing the window packet size
  1870. // until we hit a packet that is still waiting to be acked
  1871. // in the send queue
  1872. // this is especially likely to happen when the other end
  1873. // has the EACK send bug older versions of uTP had
  1874. while (conn->cur_window_packets > 0 && !conn->outbuf.get(conn->seq_nr - conn->cur_window_packets)) {
  1875. conn->cur_window_packets--;
  1876. #if UTP_DEBUG_LOGGING
  1877. conn->log(UTP_LOG_DEBUG, "decementing cur_window_packets:%u", conn->cur_window_packets);
  1878. #endif
  1879. }
  1880. #ifdef _DEBUG
  1881. if (conn->cur_window_packets == 0)
  1882. assert(conn->cur_window == 0);
  1883. #endif
  1884. // this invariant should always be true
  1885. assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
  1886. // flush Nagle
  1887. if (conn->cur_window_packets == 1) {
  1888. OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - 1);
  1889. // do we still have quota?
  1890. if (pkt->transmissions == 0) {
  1891. conn->send_packet(pkt);
  1892. }
  1893. }
  1894. // Fast timeout-retry
  1895. if (conn->fast_timeout) {
  1896. #if UTP_DEBUG_LOGGING
  1897. conn->log(UTP_LOG_DEBUG, "Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr);
  1898. #endif
  1899. // if the fast_resend_seq_nr is not pointing to the oldest outstanding packet, it suggests that we've already
  1900. // resent the packet that timed out, and we should leave the fast-timeout mode.
  1901. if (((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr) {
  1902. conn->fast_timeout = false;
  1903. } else {
  1904. // resend the oldest packet and increment fast_resend_seq_nr
  1905. // to not allow another fast resend on it again
  1906. OutgoingPacket *pkt = (OutgoingPacket*)conn->outbuf.get(conn->seq_nr - conn->cur_window_packets);
  1907. if (pkt && pkt->transmissions > 0) {
  1908. #if UTP_DEBUG_LOGGING
  1909. conn->log(UTP_LOG_DEBUG, "Packet %u fast timeout-retry.", conn->seq_nr - conn->cur_window_packets);
  1910. #endif
  1911. #ifdef _DEBUG
  1912. ++conn->_stats.fastrexmit;
  1913. #endif
  1914. conn->fast_resend_seq_nr++;
  1915. conn->send_packet(pkt);
  1916. }
  1917. }
  1918. }
  1919. }
  1920. // Process selective acknowledgent
  1921. if (selack_ptr != NULL) {
  1922. conn->selective_ack(pk_ack_nr + 2, selack_ptr, selack_ptr[-1]);
  1923. }
  1924. // this invariant should always be true
  1925. assert(conn->cur_window_packets == 0 || conn->outbuf.get(conn->seq_nr - conn->cur_window_packets));
  1926. #if UTP_DEBUG_LOGGING
  1927. conn->log(UTP_LOG_DEBUG, "acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u ",
  1928. acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets);
  1929. #endif
  1930. // In case the ack dropped the current window below
  1931. // the max_window size, Mark the socket as writable
  1932. if (conn->state == CS_CONNECTED_FULL && !conn->is_full()) {
  1933. conn->state = CS_CONNECTED;
  1934. #if UTP_DEBUG_LOGGING
  1935. conn->log(UTP_LOG_DEBUG, "Socket writable. max_window:%u cur_window:%u packet_size:%u",
  1936. (uint)conn->max_window, (uint)conn->cur_window, (uint)conn->get_packet_size());
  1937. #endif
  1938. utp_call_on_state_change(conn->ctx, conn, UTP_STATE_WRITABLE);
  1939. }
  1940. if (pk_flags == ST_STATE) {
  1941. // This is a state packet only.
  1942. return 0;
  1943. }
  1944. // The connection is not in a state that can accept data?
  1945. if (conn->state != CS_CONNECTED &&
  1946. conn->state != CS_CONNECTED_FULL) {
  1947. return 0;
  1948. }
  1949. // Is this a finalize packet?
  1950. if (pk_flags == ST_FIN && !conn->got_fin) {
  1951. #if UTP_DEBUG_LOGGING
  1952. conn->log(UTP_LOG_DEBUG, "Got FIN eof_pkt:%u", pk_seq_nr);
  1953. #endif
  1954. conn->got_fin = true;
  1955. conn->eof_pkt = pk_seq_nr;
  1956. // at this point, it is possible for the
  1957. // other end to have sent packets with
  1958. // sequence numbers higher than seq_nr.
  1959. // if this is the case, our reorder_count
  1960. // is out of sync. This case is dealt with
  1961. // when we re-order and hit the eof_pkt.
  1962. // we'll just ignore any packets with
  1963. // sequence numbers past this
  1964. }
  1965. // Getting an in-order packet?
  1966. if (seqnr == 0) {
  1967. size_t count = packet_end - data;
  1968. if (count > 0 && !conn->read_shutdown) {
  1969. #if UTP_DEBUG_LOGGING
  1970. conn->log(UTP_LOG_DEBUG, "Got Data len:%u (rb:%u)", (uint)count, (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
  1971. #endif
  1972. // Post bytes to the upper layer
  1973. utp_call_on_read(conn->ctx, conn, data, count);
  1974. }
  1975. conn->ack_nr++;
  1976. // Check if the next packet has been received too, but waiting
  1977. // in the reorder buffer.
  1978. for (;;) {
  1979. if (!conn->got_fin_reached && conn->got_fin && conn->eof_pkt == conn->ack_nr) {
  1980. conn->got_fin_reached = true;
  1981. conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);
  1982. #if UTP_DEBUG_LOGGING
  1983. conn->log(UTP_LOG_DEBUG, "Posting EOF");
  1984. #endif
  1985. utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);
  1986. // if the other end wants to close, ack
  1987. conn->send_ack();
  1988. // reorder_count is not necessarily 0 at this point.
  1989. // even though it is most of the time, the other end
  1990. // may have sent packets with higher sequence numbers
  1991. // than what later end up being eof_pkt
  1992. // since we have received all packets up to eof_pkt
  1993. // just ignore the ones after it.
  1994. conn->reorder_count = 0;
  1995. }
  1996. // Quick get-out in case there is nothing to reorder
  1997. if (conn->reorder_count == 0)
  1998. break;
  1999. // Check if there are additional buffers in the reorder buffers
  2000. // that need delivery.
  2001. byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1);
  2002. if (p == NULL)
  2003. break;
  2004. conn->inbuf.put(conn->ack_nr+1, NULL);
  2005. count = *(uint*)p;
  2006. if (count > 0 && !conn->read_shutdown) {
  2007. // Pass the bytes to the upper layer
  2008. utp_call_on_read(conn->ctx, conn, p + sizeof(uint), count);
  2009. }
  2010. conn->ack_nr++;
  2011. // Free the element from the reorder buffer
  2012. free(p);
  2013. assert(conn->reorder_count > 0);
  2014. conn->reorder_count--;
  2015. }
  2016. conn->schedule_ack();
  2017. } else {
  2018. // Getting an out of order packet.
  2019. // The packet needs to be remembered and rearranged later.
  2020. // if we have received a FIN packet, and the EOF-sequence number
  2021. // is lower than the sequence number of the packet we just received
  2022. // something is wrong.
  2023. if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
  2024. #if UTP_DEBUG_LOGGING
  2025. conn->log(UTP_LOG_DEBUG, "Got an invalid packet sequence number, past EOF "
  2026. "reorder_count:%u len:%u (rb:%u)",
  2027. conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
  2028. #endif
  2029. return 0;
  2030. }
  2031. // if the sequence number is entirely off the expected
  2032. // one, just drop it. We can't allocate buffer space in
  2033. // the inbuf entirely based on untrusted input
  2034. if (seqnr > 0x3ff) {
  2035. #if UTP_DEBUG_LOGGING
  2036. conn->log(UTP_LOG_DEBUG, "0x%08x: Got an invalid packet sequence number, too far off "
  2037. "reorder_count:%u len:%u (rb:%u)",
  2038. conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
  2039. #endif
  2040. return 0;
  2041. }
  2042. // we need to grow the circle buffer before we
  2043. // check if the packet is already in here, so that
  2044. // we don't end up looking at an older packet (since
  2045. // the indices wraps around).
  2046. conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1);
  2047. // Has this packet already been received? (i.e. a duplicate)
  2048. // If that is the case, just discard it.
  2049. if (conn->inbuf.get(pk_seq_nr) != NULL) {
  2050. #ifdef _DEBUG
  2051. ++conn->_stats.nduprecv;
  2052. #endif
  2053. return 0;
  2054. }
  2055. // Allocate memory to fit the packet that needs to re-ordered
  2056. byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint));
  2057. *(uint*)mem = (uint)(packet_end - data);
  2058. memcpy(mem + sizeof(uint), data, packet_end - data);
  2059. // Insert into reorder buffer and increment the count
  2060. // of # of packets to be reordered.
  2061. // we add one to seqnr in order to leave the last
  2062. // entry empty, that way the assert in send_ack
  2063. // is valid. we have to add one to seqnr too, in order
  2064. // to make the circular buffer grow around the correct
  2065. // point (which is conn->ack_nr + 1).
  2066. assert(conn->inbuf.get(pk_seq_nr) == NULL);
  2067. assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
  2068. conn->inbuf.put(pk_seq_nr, mem);
  2069. conn->reorder_count++;
  2070. #if UTP_DEBUG_LOGGING
  2071. conn->log(UTP_LOG_DEBUG, "0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)",
  2072. conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
  2073. #endif
  2074. conn->schedule_ack();
  2075. }
  2076. return (size_t)(packet_end - data);
  2077. }
  2078. inline byte UTP_Version(PacketFormatV1 const* pf)
  2079. {
  2080. return (pf->type() < ST_NUM_STATES && pf->ext < 3 ? pf->version() : 0);
  2081. }
  2082. UTPSocket::~UTPSocket()
  2083. {
  2084. #if UTP_DEBUG_LOGGING
  2085. log(UTP_LOG_DEBUG, "Killing socket");
  2086. #endif
  2087. utp_call_on_state_change(ctx, this, UTP_STATE_DESTROYING);
  2088. if (ctx->last_utp_socket == this) {
  2089. ctx->last_utp_socket = NULL;
  2090. }
  2091. // Remove object from the global hash table
  2092. UTPSocketKeyData* kd = ctx->utp_sockets->Delete(UTPSocketKey(addr, conn_id_recv));
  2093. assert(kd);
  2094. // remove the socket from ack_sockets if it was there also
  2095. removeSocketFromAckList(this);
  2096. // Free all memory occupied by the socket object.
  2097. for (size_t i = 0; i <= inbuf.mask; i++) {
  2098. free(inbuf.elements[i]);
  2099. }
  2100. for (size_t i = 0; i <= outbuf.mask; i++) {
  2101. free(outbuf.elements[i]);
  2102. }
  2103. // TODO: The circular buffer should have a destructor
  2104. free(inbuf.elements);
  2105. free(outbuf.elements);
  2106. }
  2107. void UTP_FreeAll(struct UTPSocketHT *utp_sockets) {
  2108. utp_hash_iterator_t it;
  2109. UTPSocketKeyData* keyData;
  2110. while ((keyData = utp_sockets->Iterate(it))) {
  2111. delete keyData->socket;
  2112. }
  2113. }
  2114. void utp_initialize_socket( utp_socket *conn,
  2115. const struct sockaddr *addr,
  2116. socklen_t addrlen,
  2117. bool need_seed_gen,
  2118. uint32 conn_seed,
  2119. uint32 conn_id_recv,
  2120. uint32 conn_id_send)
  2121. {
  2122. PackedSockAddr psaddr = PackedSockAddr((const SOCKADDR_STORAGE*)addr, addrlen);
  2123. if (need_seed_gen) {
  2124. do {
  2125. conn_seed = utp_call_get_random(conn->ctx, conn);
  2126. // we identify v1 and higher by setting the first two bytes to 0x0001
  2127. conn_seed &= 0xffff;
  2128. } while (conn->ctx->utp_sockets->Lookup(UTPSocketKey(psaddr, conn_seed)));
  2129. conn_id_recv += conn_seed;
  2130. conn_id_send += conn_seed;
  2131. }
  2132. conn->state = CS_IDLE;
  2133. conn->conn_seed = conn_seed;
  2134. conn->conn_id_recv = conn_id_recv;
  2135. conn->conn_id_send = conn_id_send;
  2136. conn->addr = psaddr;
  2137. conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, NULL);
  2138. conn->last_got_packet = conn->ctx->current_ms;
  2139. conn->last_sent_packet = conn->ctx->current_ms;
  2140. conn->last_measured_delay = conn->ctx->current_ms + 0x70000000;
  2141. conn->average_sample_time = conn->ctx->current_ms + 5000;
  2142. conn->last_rwin_decay = conn->ctx->current_ms - MAX_WINDOW_DECAY;
  2143. conn->our_hist.clear(conn->ctx->current_ms);
  2144. conn->their_hist.clear(conn->ctx->current_ms);
  2145. conn->rtt_hist.clear(conn->ctx->current_ms);
  2146. // initialize MTU floor and ceiling
  2147. conn->mtu_reset();
  2148. conn->mtu_last = conn->mtu_ceiling;
  2149. conn->ctx->utp_sockets->Add(UTPSocketKey(conn->addr, conn->conn_id_recv))->socket = conn;
  2150. // we need to fit one packet in the window when we start the connection
  2151. conn->max_window = conn->get_packet_size();
  2152. #if UTP_DEBUG_LOGGING
  2153. conn->log(UTP_LOG_DEBUG, "UTP socket initialized");
  2154. #endif
  2155. }
  2156. utp_socket* utp_create_socket(utp_context *ctx)
  2157. {
  2158. assert(ctx);
  2159. if (!ctx) return NULL;
  2160. UTPSocket *conn = new UTPSocket; // TODO: UTPSocket should have a constructor
  2161. conn->state = CS_UNINITIALIZED;
  2162. conn->ctx = ctx;
  2163. conn->userdata = NULL;
  2164. conn->reorder_count = 0;
  2165. conn->duplicate_ack = 0;
  2166. conn->timeout_seq_nr = 0;
  2167. conn->last_rcv_win = 0;
  2168. conn->got_fin = false;
  2169. conn->got_fin_reached = false;
  2170. conn->fin_sent = false;
  2171. conn->fin_sent_acked = false;
  2172. conn->read_shutdown = false;
  2173. conn->close_requested = false;
  2174. conn->fast_timeout = false;
  2175. conn->rtt = 0;
  2176. conn->retransmit_timeout = 0;
  2177. conn->rto_timeout = 0;
  2178. conn->zerowindow_time = 0;
  2179. conn->average_delay = 0;
  2180. conn->current_delay_samples = 0;
  2181. conn->cur_window = 0;
  2182. conn->eof_pkt = 0;
  2183. conn->last_maxed_out_window = 0;
  2184. conn->mtu_probe_seq = 0;
  2185. conn->mtu_probe_size = 0;
  2186. conn->current_delay_sum = 0;
  2187. conn->average_delay_base = 0;
  2188. conn->retransmit_count = 0;
  2189. conn->rto = 3000;
  2190. conn->rtt_var = 800;
  2191. conn->seq_nr = 1;
  2192. conn->ack_nr = 0;
  2193. conn->max_window_user = 255 * PACKET_SIZE;
  2194. conn->cur_window_packets = 0;
  2195. conn->fast_resend_seq_nr = conn->seq_nr;
  2196. conn->target_delay = ctx->target_delay;
  2197. conn->reply_micro = 0;
  2198. conn->opt_sndbuf = ctx->opt_sndbuf;
  2199. conn->opt_rcvbuf = ctx->opt_rcvbuf;
  2200. conn->slow_start = true;
  2201. conn->ssthresh = conn->opt_sndbuf;
  2202. conn->clock_drift = 0;
  2203. conn->clock_drift_raw = 0;
  2204. conn->outbuf.mask = 15;
  2205. conn->inbuf.mask = 15;
  2206. conn->outbuf.elements = (void**)calloc(16, sizeof(void*));
  2207. conn->inbuf.elements = (void**)calloc(16, sizeof(void*));
  2208. conn->ida = -1; // set the index of every new socket in ack_sockets to
  2209. // -1, which also means it is not in ack_sockets yet
  2210. memset(conn->extensions, 0, sizeof(conn->extensions));
  2211. #ifdef _DEBUG
  2212. memset(&conn->_stats, 0, sizeof(utp_socket_stats));
  2213. #endif
  2214. return conn;
  2215. }
  2216. int utp_context_set_option(utp_context *ctx, int opt, int val)
  2217. {
  2218. assert(ctx);
  2219. if (!ctx) return -1;
  2220. switch (opt) {
  2221. case UTP_LOG_NORMAL:
  2222. ctx->log_normal = val ? true : false;
  2223. return 0;
  2224. case UTP_LOG_MTU:
  2225. ctx->log_mtu = val ? true : false;
  2226. return 0;
  2227. case UTP_LOG_DEBUG:
  2228. ctx->log_debug = val ? true : false;
  2229. return 0;
  2230. case UTP_TARGET_DELAY:
  2231. ctx->target_delay = val;
  2232. return 0;
  2233. case UTP_SNDBUF:
  2234. assert(val >= 1);
  2235. ctx->opt_sndbuf = val;
  2236. return 0;
  2237. case UTP_RCVBUF:
  2238. assert(val >= 1);
  2239. ctx->opt_rcvbuf = val;
  2240. return 0;
  2241. }
  2242. return -1;
  2243. }
  2244. int utp_context_get_option(utp_context *ctx, int opt)
  2245. {
  2246. assert(ctx);
  2247. if (!ctx) return -1;
  2248. switch (opt) {
  2249. case UTP_LOG_NORMAL: return ctx->log_normal ? 1 : 0;
  2250. case UTP_LOG_MTU: return ctx->log_mtu ? 1 : 0;
  2251. case UTP_LOG_DEBUG: return ctx->log_debug ? 1 : 0;
  2252. case UTP_TARGET_DELAY: return ctx->target_delay;
  2253. case UTP_SNDBUF: return ctx->opt_sndbuf;
  2254. case UTP_RCVBUF: return ctx->opt_rcvbuf;
  2255. }
  2256. return -1;
  2257. }
  2258. int utp_setsockopt(UTPSocket* conn, int opt, int val)
  2259. {
  2260. assert(conn);
  2261. if (!conn) return -1;
  2262. switch (opt) {
  2263. case UTP_SNDBUF:
  2264. assert(val >= 1);
  2265. conn->opt_sndbuf = val;
  2266. return 0;
  2267. case UTP_RCVBUF:
  2268. assert(val >= 1);
  2269. conn->opt_rcvbuf = val;
  2270. return 0;
  2271. case UTP_TARGET_DELAY:
  2272. conn->target_delay = val;
  2273. return 0;
  2274. }
  2275. return -1;
  2276. }
  2277. int utp_getsockopt(UTPSocket* conn, int opt)
  2278. {
  2279. assert(conn);
  2280. if (!conn) return -1;
  2281. switch (opt) {
  2282. case UTP_SNDBUF: return conn->opt_sndbuf;
  2283. case UTP_RCVBUF: return conn->opt_rcvbuf;
  2284. case UTP_TARGET_DELAY: return conn->target_delay;
  2285. }
  2286. return -1;
  2287. }
  2288. // Try to connect to a specified host.
  2289. int utp_connect(utp_socket *conn, const struct sockaddr *to, socklen_t tolen)
  2290. {
  2291. assert(conn);
  2292. if (!conn) return -1;
  2293. assert(conn->state == CS_UNINITIALIZED);
  2294. if (conn->state != CS_UNINITIALIZED) {
  2295. conn->state = CS_DESTROY;
  2296. return -1;
  2297. }
  2298. utp_initialize_socket(conn, to, tolen, true, 0, 0, 1);
  2299. assert(conn->cur_window_packets == 0);
  2300. assert(conn->outbuf.get(conn->seq_nr) == NULL);
  2301. assert(sizeof(PacketFormatV1) == 20);
  2302. conn->state = CS_SYN_SENT;
  2303. conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
  2304. // Create and send a connect message
  2305. // used in parse_log.py
  2306. conn->log(UTP_LOG_NORMAL, "UTP_Connect conn_seed:%u packet_size:%u (B) "
  2307. "target_delay:%u (ms) delay_history:%u "
  2308. "delay_base_history:%u (minutes)",
  2309. conn->conn_seed, PACKET_SIZE, conn->target_delay / 1000,
  2310. CUR_DELAY_SIZE, DELAY_BASE_HISTORY);
  2311. // Setup initial timeout timer.
  2312. conn->retransmit_timeout = 3000;
  2313. conn->rto_timeout = conn->ctx->current_ms + conn->retransmit_timeout;
  2314. conn->last_rcv_win = conn->get_rcv_window();
  2315. // if you need compatibiltiy with 1.8.1, use this. it increases attackability though.
  2316. //conn->seq_nr = 1;
  2317. conn->seq_nr = utp_call_get_random(conn->ctx, conn);
  2318. // Create the connect packet.
  2319. const size_t header_size = sizeof(PacketFormatV1);
  2320. OutgoingPacket *pkt = (OutgoingPacket*)malloc(sizeof(OutgoingPacket) - 1 + header_size);
  2321. PacketFormatV1* p1 = (PacketFormatV1*)pkt->data;
  2322. memset(p1, 0, header_size);
  2323. // SYN packets are special, and have the receive ID in the connid field,
  2324. // instead of conn_id_send.
  2325. p1->set_version(1);
  2326. p1->set_type(ST_SYN);
  2327. p1->ext = 0;
  2328. p1->connid = conn->conn_id_recv;
  2329. p1->windowsize = (uint32)conn->last_rcv_win;
  2330. p1->seq_nr = conn->seq_nr;
  2331. pkt->transmissions = 0;
  2332. pkt->length = header_size;
  2333. pkt->payload = 0;
  2334. /*
  2335. #if UTP_DEBUG_LOGGING
  2336. conn->log(UTP_LOG_DEBUG, "Sending connect %s [%u].",
  2337. addrfmt(conn->addr, addrbuf), conn_seed);
  2338. #endif
  2339. */
  2340. // Remember the message in the outgoing queue.
  2341. conn->outbuf.ensure_size(conn->seq_nr, conn->cur_window_packets);
  2342. conn->outbuf.put(conn->seq_nr, pkt);
  2343. conn->seq_nr++;
  2344. conn->cur_window_packets++;
  2345. #if UTP_DEBUG_LOGGING
  2346. conn->log(UTP_LOG_DEBUG, "incrementing cur_window_packets:%u", conn->cur_window_packets);
  2347. #endif
  2348. conn->send_packet(pkt);
  2349. return 0;
  2350. }
  2351. // Returns 1 if the UDP payload was recognized as a UTP packet, or 0 if it was not
  2352. int utp_process_udp(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
  2353. {
  2354. assert(ctx);
  2355. if (!ctx) return 0;
  2356. assert(buffer);
  2357. if (!buffer) return 0;
  2358. assert(to);
  2359. if (!to) return 0;
  2360. const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
  2361. if (len < sizeof(PacketFormatV1)) {
  2362. #if UTP_DEBUG_LOGGING
  2363. ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u too small", addrfmt(addr, addrbuf), (uint)len);
  2364. #endif
  2365. return 0;
  2366. }
  2367. const PacketFormatV1 *pf1 = (PacketFormatV1*)buffer;
  2368. const byte version = UTP_Version(pf1);
  2369. const uint32 id = uint32(pf1->connid);
  2370. if (version != 1) {
  2371. #if UTP_DEBUG_LOGGING
  2372. ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u version:%u unsupported version", addrfmt(addr, addrbuf), (uint)len, version);
  2373. #endif
  2374. return 0;
  2375. }
  2376. #if UTP_DEBUG_LOGGING
  2377. ctx->log(UTP_LOG_DEBUG, NULL, "recv %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, id);
  2378. ctx->log(UTP_LOG_DEBUG, NULL, "recv id:%u seq_nr:%u ack_nr:%u", id, (uint)pf1->seq_nr, (uint)pf1->ack_nr);
  2379. #endif
  2380. const byte flags = pf1->type();
  2381. if (flags == ST_RESET) {
  2382. // id is either our recv id or our send id
  2383. // if it's our send id, and we initiated the connection, our recv id is id + 1
  2384. // if it's our send id, and we did not initiate the connection, our recv id is id - 1
  2385. // we have to check every case
  2386. UTPSocketKeyData* keyData;
  2387. if ( (keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id))) ||
  2388. ((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1))) && keyData->socket->conn_id_send == id) ||
  2389. ((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id - 1))) && keyData->socket->conn_id_send == id))
  2390. {
  2391. UTPSocket* conn = keyData->socket;
  2392. #if UTP_DEBUG_LOGGING
  2393. ctx->log(UTP_LOG_DEBUG, NULL, "recv RST for existing connection");
  2394. #endif
  2395. if (conn->close_requested)
  2396. conn->state = CS_DESTROY;
  2397. else
  2398. conn->state = CS_RESET;
  2399. utp_call_on_overhead_statistics(conn->ctx, conn, false, len + conn->get_udp_overhead(), close_overhead);
  2400. const int err = (conn->state == CS_SYN_SENT) ? UTP_ECONNREFUSED : UTP_ECONNRESET;
  2401. utp_call_on_error(conn->ctx, conn, err);
  2402. }
  2403. else {
  2404. #if UTP_DEBUG_LOGGING
  2405. ctx->log(UTP_LOG_DEBUG, NULL, "recv RST for unknown connection");
  2406. #endif
  2407. }
  2408. return 1;
  2409. }
  2410. else if (flags != ST_SYN) {
  2411. UTPSocket* conn = NULL;
  2412. if (ctx->last_utp_socket && ctx->last_utp_socket->addr == addr && ctx->last_utp_socket->conn_id_recv == id) {
  2413. conn = ctx->last_utp_socket;
  2414. } else {
  2415. UTPSocketKeyData* keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id));
  2416. if (keyData) {
  2417. conn = keyData->socket;
  2418. ctx->last_utp_socket = conn;
  2419. }
  2420. }
  2421. if (conn) {
  2422. #if UTP_DEBUG_LOGGING
  2423. ctx->log(UTP_LOG_DEBUG, NULL, "recv processing");
  2424. #endif
  2425. const size_t read = utp_process_incoming(conn, buffer, len);
  2426. utp_call_on_overhead_statistics(conn->ctx, conn, false, (len - read) + conn->get_udp_overhead(), header_overhead);
  2427. return 1;
  2428. }
  2429. }
  2430. // We have not found a matching utp_socket, and this isn't a SYN. Reject it.
  2431. const uint32 seq_nr = pf1->seq_nr;
  2432. if (flags != ST_SYN) {
  2433. ctx->current_ms = utp_call_get_milliseconds(ctx, NULL);
  2434. for (size_t i = 0; i < ctx->rst_info.GetCount(); i++) {
  2435. if ((ctx->rst_info[i].connid == id) &&
  2436. (ctx->rst_info[i].addr == addr) &&
  2437. (ctx->rst_info[i].ack_nr == seq_nr))
  2438. {
  2439. ctx->rst_info[i].timestamp = ctx->current_ms;
  2440. #if UTP_DEBUG_LOGGING
  2441. ctx->log(UTP_LOG_DEBUG, NULL, "recv not sending RST to non-SYN (stored)");
  2442. #endif
  2443. return 1;
  2444. }
  2445. }
  2446. if (ctx->rst_info.GetCount() > RST_INFO_LIMIT) {
  2447. #if UTP_DEBUG_LOGGING
  2448. ctx->log(UTP_LOG_DEBUG, NULL, "recv not sending RST to non-SYN (limit at %u stored)", (uint)ctx->rst_info.GetCount());
  2449. #endif
  2450. return 1;
  2451. }
  2452. #if UTP_DEBUG_LOGGING
  2453. ctx->log(UTP_LOG_DEBUG, NULL, "recv send RST to non-SYN (%u stored)", (uint)ctx->rst_info.GetCount());
  2454. #endif
  2455. RST_Info &r = ctx->rst_info.Append();
  2456. r.addr = addr;
  2457. r.connid = id;
  2458. r.ack_nr = seq_nr;
  2459. r.timestamp = ctx->current_ms;
  2460. UTPSocket::send_rst(ctx, addr, id, seq_nr, utp_call_get_random(ctx, NULL));
  2461. return 1;
  2462. }
  2463. if (ctx->callbacks[UTP_ON_ACCEPT]) {
  2464. #if UTP_DEBUG_LOGGING
  2465. ctx->log(UTP_LOG_DEBUG, NULL, "Incoming connection from %s", addrfmt(addr, addrbuf));
  2466. #endif
  2467. UTPSocketKeyData* keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1));
  2468. if (keyData) {
  2469. #if UTP_DEBUG_LOGGING
  2470. ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, connection already exists");
  2471. #endif
  2472. return 1;
  2473. }
  2474. if (ctx->utp_sockets->GetCount() > 3000) {
  2475. #if UTP_DEBUG_LOGGING
  2476. ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, too many uTP sockets %d", ctx->utp_sockets->GetCount());
  2477. #endif
  2478. return 1;
  2479. }
  2480. // true means yes, block connection. false means no, don't block.
  2481. if (utp_call_on_firewall(ctx, to, tolen)) {
  2482. #if UTP_DEBUG_LOGGING
  2483. ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, firewall callback returned true");
  2484. #endif
  2485. return 1;
  2486. }
  2487. // Create a new UTP socket to handle this new connection
  2488. UTPSocket *conn = utp_create_socket(ctx);
  2489. utp_initialize_socket(conn, to, tolen, false, id, id+1, id);
  2490. conn->ack_nr = seq_nr;
  2491. conn->seq_nr = utp_call_get_random(ctx, NULL);
  2492. conn->fast_resend_seq_nr = conn->seq_nr;
  2493. conn->state = CS_SYN_RECV;
  2494. const size_t read = utp_process_incoming(conn, buffer, len, true);
  2495. #if UTP_DEBUG_LOGGING
  2496. ctx->log(UTP_LOG_DEBUG, NULL, "recv send connect ACK");
  2497. #endif
  2498. conn->send_ack(true);
  2499. utp_call_on_accept(ctx, conn, to, tolen);
  2500. // we report overhead after on_accept(), because the callbacks are setup now
  2501. utp_call_on_overhead_statistics(conn->ctx, conn, false, (len - read) + conn->get_udp_overhead(), header_overhead); // SYN
  2502. utp_call_on_overhead_statistics(conn->ctx, conn, true, conn->get_overhead(), ack_overhead); // SYNACK
  2503. }
  2504. else {
  2505. #if UTP_DEBUG_LOGGING
  2506. ctx->log(UTP_LOG_DEBUG, NULL, "rejected incoming connection, UTP_ON_ACCEPT callback not set");
  2507. #endif
  2508. }
  2509. return 1;
  2510. }
  2511. // Called by utp_process_icmp_fragmentation() and utp_process_icmp_error() below
  2512. static UTPSocket* parse_icmp_payload(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
  2513. {
  2514. assert(ctx);
  2515. if (!ctx) return NULL;
  2516. assert(buffer);
  2517. if (!buffer) return NULL;
  2518. assert(to);
  2519. if (!to) return NULL;
  2520. const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
  2521. // ICMP packets are only required to quote the first 8 bytes of the layer4
  2522. // payload. The UDP payload is 8 bytes, and the UTP header is another 20
  2523. // bytes. So, in order to find the entire UTP header, we need the ICMP
  2524. // packet to quote 28 bytes.
  2525. if (len < sizeof(PacketFormatV1)) {
  2526. #if UTP_DEBUG_LOGGING
  2527. ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: runt length %d", addrfmt(addr, addrbuf), len);
  2528. #endif
  2529. return NULL;
  2530. }
  2531. const PacketFormatV1 *pf = (PacketFormatV1*)buffer;
  2532. const byte version = UTP_Version(pf);
  2533. const uint32 id = uint32(pf->connid);
  2534. if (version != 1) {
  2535. #if UTP_DEBUG_LOGGING
  2536. ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: not UTP version 1", addrfmt(addr, addrbuf));
  2537. #endif
  2538. return NULL;
  2539. }
  2540. UTPSocketKeyData* keyData;
  2541. if ( (keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id))) ||
  2542. ((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id + 1))) && keyData->socket->conn_id_send == id) ||
  2543. ((keyData = ctx->utp_sockets->Lookup(UTPSocketKey(addr, id - 1))) && keyData->socket->conn_id_send == id))
  2544. {
  2545. return keyData->socket;
  2546. }
  2547. #if UTP_DEBUG_LOGGING
  2548. ctx->log(UTP_LOG_DEBUG, NULL, "Ignoring ICMP from %s: No matching connection found for id %u", addrfmt(addr, addrbuf), id);
  2549. #endif
  2550. return NULL;
  2551. }
  2552. // Should be called when an ICMP Type 3, Code 4 packet (fragmentation needed) is received, to adjust the MTU
  2553. //
  2554. // Returns 1 if the UDP payload (delivered in the ICMP packet) was recognized as a UTP packet, or 0 if it was not
  2555. //
  2556. // @ctx: utp_context
  2557. // @buf: Contents of the original UDP payload, which the ICMP packet quoted. *Not* the ICMP packet itself.
  2558. // @len: buffer length
  2559. // @to: destination address of the original UDP pakcet
  2560. // @tolen: address length
  2561. // @next_hop_mtu:
  2562. int utp_process_icmp_fragmentation(utp_context *ctx, const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen, uint16 next_hop_mtu)
  2563. {
  2564. UTPSocket* conn = parse_icmp_payload(ctx, buffer, len, to, tolen);
  2565. if (!conn) return 0;
  2566. // Constrain the next_hop_mtu to sane values. It might not be initialized or sent properly
  2567. if (next_hop_mtu >= 576 && next_hop_mtu < 0x2000) {
  2568. conn->mtu_ceiling = min<uint32>(next_hop_mtu, conn->mtu_ceiling);
  2569. conn->mtu_search_update();
  2570. // this is something of a speecial case, where we don't set mtu_last
  2571. // to the value in between the floor and the ceiling. We can update the
  2572. // floor, because there might be more network segments after the one
  2573. // that sent this ICMP with smaller MTUs. But we want to test this
  2574. // MTU size first. If the next probe gets through, mtu_floor is updated
  2575. conn->mtu_last = conn->mtu_ceiling;
  2576. } else {
  2577. // Otherwise, binary search. At this point we don't actually know
  2578. // what size the packet that failed was, and apparently we can't
  2579. // trust the next hop mtu either. It seems reasonably conservative
  2580. // to just lower the ceiling. This should not happen on working networks
  2581. // anyway.
  2582. conn->mtu_ceiling = (conn->mtu_floor + conn->mtu_ceiling) / 2;
  2583. conn->mtu_search_update();
  2584. }
  2585. conn->log(UTP_LOG_MTU, "MTU [ICMP] floor:%d ceiling:%d current:%d", conn->mtu_floor, conn->mtu_ceiling, conn->mtu_last);
  2586. return 1;
  2587. }
  2588. // Should be called when an ICMP message is received that should tear down the connection.
  2589. //
  2590. // Returns 1 if the UDP payload (delivered in the ICMP packet) was recognized as a UTP packet, or 0 if it was not
  2591. //
  2592. // @ctx: utp_context
  2593. // @buf: Contents of the original UDP payload, which the ICMP packet quoted. *Not* the ICMP packet itself.
  2594. // @len: buffer length
  2595. // @to: destination address of the original UDP pakcet
  2596. // @tolen: address length
  2597. int utp_process_icmp_error(utp_context *ctx, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
  2598. {
  2599. UTPSocket* conn = parse_icmp_payload(ctx, buffer, len, to, tolen);
  2600. if (!conn) return 0;
  2601. const int err = (conn->state == CS_SYN_SENT) ? UTP_ECONNREFUSED : UTP_ECONNRESET;
  2602. const PackedSockAddr addr((const SOCKADDR_STORAGE*)to, tolen);
  2603. switch(conn->state) {
  2604. // Don't pass on errors for idle/closed connections
  2605. case CS_IDLE:
  2606. #if UTP_DEBUG_LOGGING
  2607. ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s in state CS_IDLE, ignoring", addrfmt(addr, addrbuf));
  2608. #endif
  2609. return 1;
  2610. default:
  2611. if (conn->close_requested) {
  2612. #if UTP_DEBUG_LOGGING
  2613. ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s after close, setting state to CS_DESTROY and causing error %d", addrfmt(addr, addrbuf), err);
  2614. #endif
  2615. conn->state = CS_DESTROY;
  2616. } else {
  2617. #if UTP_DEBUG_LOGGING
  2618. ctx->log(UTP_LOG_DEBUG, NULL, "ICMP from %s, setting state to CS_RESET and causing error %d", addrfmt(addr, addrbuf), err);
  2619. #endif
  2620. conn->state = CS_RESET;
  2621. }
  2622. break;
  2623. }
  2624. utp_call_on_error(conn->ctx, conn, err);
  2625. return 1;
  2626. }
  2627. // Write bytes to the UTP socket. Returns the number of bytes written.
  2628. // 0 indicates the socket is no longer writable, -1 indicates an error
  2629. ssize_t utp_writev(utp_socket *conn, struct utp_iovec *iovec_input, size_t num_iovecs)
  2630. {
  2631. static utp_iovec iovec[UTP_IOV_MAX];
  2632. assert(conn);
  2633. if (!conn) return -1;
  2634. assert(iovec_input);
  2635. if (!iovec_input) return -1;
  2636. assert(num_iovecs);
  2637. if (!num_iovecs) return -1;
  2638. if (num_iovecs > UTP_IOV_MAX)
  2639. num_iovecs = UTP_IOV_MAX;
  2640. memcpy(iovec, iovec_input, sizeof(struct utp_iovec)*num_iovecs);
  2641. size_t bytes = 0;
  2642. size_t sent = 0;
  2643. for (size_t i = 0; i < num_iovecs; i++)
  2644. bytes += iovec[i].iov_len;
  2645. #if UTP_DEBUG_LOGGING
  2646. size_t param = bytes;
  2647. #endif
  2648. if (conn->state != CS_CONNECTED) {
  2649. #if UTP_DEBUG_LOGGING
  2650. conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = false (not CS_CONNECTED)", (uint)bytes);
  2651. #endif
  2652. return 0;
  2653. }
  2654. if (conn->fin_sent) {
  2655. #if UTP_DEBUG_LOGGING
  2656. conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = false (fin_sent already)", (uint)bytes);
  2657. #endif
  2658. return 0;
  2659. }
  2660. conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
  2661. // don't send unless it will all fit in the window
  2662. size_t packet_size = conn->get_packet_size();
  2663. size_t num_to_send = min<size_t>(bytes, packet_size);
  2664. while (!conn->is_full(num_to_send)) {
  2665. // Send an outgoing packet.
  2666. // Also add it to the outgoing of packets that have been sent but not ACKed.
  2667. bytes -= num_to_send;
  2668. sent += num_to_send;
  2669. #if UTP_DEBUG_LOGGING
  2670. conn->log(UTP_LOG_DEBUG, "Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u cur_window_packets:%u",
  2671. conn->seq_nr, conn->ack_nr,
  2672. (uint)(conn->cur_window + num_to_send),
  2673. (uint)conn->max_window, (uint)conn->max_window_user,
  2674. (uint)conn->last_rcv_win, num_to_send,
  2675. conn->cur_window_packets);
  2676. #endif
  2677. conn->write_outgoing_packet(num_to_send, ST_DATA, iovec, num_iovecs);
  2678. num_to_send = min<size_t>(bytes, packet_size);
  2679. if (num_to_send == 0) {
  2680. #if UTP_DEBUG_LOGGING
  2681. conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = true", (uint)param);
  2682. #endif
  2683. return sent;
  2684. }
  2685. }
  2686. bool full = conn->is_full();
  2687. if (full) {
  2688. // mark the socket as not being writable.
  2689. conn->state = CS_CONNECTED_FULL;
  2690. }
  2691. #if UTP_DEBUG_LOGGING
  2692. conn->log(UTP_LOG_DEBUG, "UTP_Write %u bytes = %s", (uint)bytes, full ? "false" : "true");
  2693. #endif
  2694. // returns whether or not the socket is still writable
  2695. // if the congestion window is not full, we can still write to it
  2696. //return !full;
  2697. return sent;
  2698. }
  2699. void utp_read_drained(utp_socket *conn)
  2700. {
  2701. assert(conn);
  2702. if (!conn) return;
  2703. assert(conn->state != CS_UNINITIALIZED);
  2704. if (conn->state == CS_UNINITIALIZED) return;
  2705. const size_t rcvwin = conn->get_rcv_window();
  2706. if (rcvwin > conn->last_rcv_win) {
  2707. // If last window was 0 send ACK immediately, otherwise should set timer
  2708. if (conn->last_rcv_win == 0) {
  2709. conn->send_ack();
  2710. } else {
  2711. conn->ctx->current_ms = utp_call_get_milliseconds(conn->ctx, conn);
  2712. conn->schedule_ack();
  2713. }
  2714. }
  2715. }
  2716. // Should be called each time the UDP socket is drained
  2717. void utp_issue_deferred_acks(utp_context *ctx)
  2718. {
  2719. assert(ctx);
  2720. if (!ctx) return;
  2721. for (size_t i = 0; i < ctx->ack_sockets.GetCount(); i++) {
  2722. UTPSocket *conn = ctx->ack_sockets[i];
  2723. conn->send_ack();
  2724. i--;
  2725. }
  2726. }
  2727. // Should be called every 500ms
  2728. void utp_check_timeouts(utp_context *ctx)
  2729. {
  2730. assert(ctx);
  2731. if (!ctx) return;
  2732. ctx->current_ms = utp_call_get_milliseconds(ctx, NULL);
  2733. if (ctx->current_ms - ctx->last_check < TIMEOUT_CHECK_INTERVAL)
  2734. return;
  2735. ctx->last_check = ctx->current_ms;
  2736. for (size_t i = 0; i < ctx->rst_info.GetCount(); i++) {
  2737. if ((int)(ctx->current_ms - ctx->rst_info[i].timestamp) >= RST_INFO_TIMEOUT) {
  2738. ctx->rst_info.MoveUpLast(i);
  2739. i--;
  2740. }
  2741. }
  2742. if (ctx->rst_info.GetCount() != ctx->rst_info.GetAlloc()) {
  2743. ctx->rst_info.Compact();
  2744. }
  2745. utp_hash_iterator_t it;
  2746. UTPSocketKeyData* keyData;
  2747. while ((keyData = ctx->utp_sockets->Iterate(it))) {
  2748. UTPSocket *conn = keyData->socket;
  2749. conn->check_timeouts();
  2750. // Check if the object was deleted
  2751. if (conn->state == CS_DESTROY) {
  2752. #if UTP_DEBUG_LOGGING
  2753. conn->log(UTP_LOG_DEBUG, "Destroying");
  2754. #endif
  2755. delete conn;
  2756. }
  2757. }
  2758. }
  2759. int utp_getpeername(utp_socket *conn, struct sockaddr *addr, socklen_t *addrlen)
  2760. {
  2761. assert(addr);
  2762. if (!addr) return -1;
  2763. assert(addrlen);
  2764. if (!addrlen) return -1;
  2765. assert(conn);
  2766. if (!conn) return -1;
  2767. assert(conn->state != CS_UNINITIALIZED);
  2768. if (conn->state == CS_UNINITIALIZED) return -1;
  2769. socklen_t len;
  2770. const SOCKADDR_STORAGE sa = conn->addr.get_sockaddr_storage(&len);
  2771. *addrlen = min(len, *addrlen);
  2772. memcpy(addr, &sa, *addrlen);
  2773. return 0;
  2774. }
  2775. int utp_get_delays(UTPSocket *conn, uint32 *ours, uint32 *theirs, uint32 *age)
  2776. {
  2777. assert(conn);
  2778. if (!conn) return -1;
  2779. assert(conn->state != CS_UNINITIALIZED);
  2780. if (conn->state == CS_UNINITIALIZED) {
  2781. if (ours) *ours = 0;
  2782. if (theirs) *theirs = 0;
  2783. if (age) *age = 0;
  2784. return -1;
  2785. }
  2786. if (ours) *ours = conn->our_hist.get_value();
  2787. if (theirs) *theirs = conn->their_hist.get_value();
  2788. if (age) *age = (uint32)(conn->ctx->current_ms - conn->last_measured_delay);
  2789. return 0;
  2790. }
  2791. // Close the UTP socket.
  2792. // It is not valid for the upper layer to refer to socket after it is closed.
  2793. // Data will keep to try being delivered after the close.
  2794. void utp_close(UTPSocket *conn)
  2795. {
  2796. assert(conn);
  2797. if (!conn) return;
  2798. assert(conn->state != CS_UNINITIALIZED
  2799. && conn->state != CS_DESTROY);
  2800. #if UTP_DEBUG_LOGGING
  2801. conn->log(UTP_LOG_DEBUG, "UTP_Close in state:%s", statenames[conn->state]);
  2802. #endif
  2803. switch(conn->state) {
  2804. case CS_CONNECTED:
  2805. case CS_CONNECTED_FULL:
  2806. conn->read_shutdown = true;
  2807. conn->close_requested = true;
  2808. if (!conn->fin_sent) {
  2809. conn->fin_sent = true;
  2810. conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
  2811. } else if (conn->fin_sent_acked) {
  2812. conn->state = CS_DESTROY;
  2813. }
  2814. break;
  2815. case CS_SYN_SENT:
  2816. conn->rto_timeout = utp_call_get_milliseconds(conn->ctx, conn) + min<uint>(conn->rto * 2, 60);
  2817. // fall through
  2818. case CS_SYN_RECV:
  2819. // fall through
  2820. default:
  2821. conn->state = CS_DESTROY;
  2822. break;
  2823. }
  2824. #if UTP_DEBUG_LOGGING
  2825. conn->log(UTP_LOG_DEBUG, "UTP_Close end in state:%s", statenames[conn->state]);
  2826. #endif
  2827. }
  2828. void utp_shutdown(UTPSocket *conn, int how)
  2829. {
  2830. assert(conn);
  2831. if (!conn) return;
  2832. assert(conn->state != CS_UNINITIALIZED
  2833. && conn->state != CS_DESTROY);
  2834. #if UTP_DEBUG_LOGGING
  2835. conn->log(UTP_LOG_DEBUG, "UTP_shutdown(%d) in state:%s", how, statenames[conn->state]);
  2836. #endif
  2837. if (how != SHUT_WR) {
  2838. conn->read_shutdown = true;
  2839. }
  2840. if (how != SHUT_RD) {
  2841. switch(conn->state) {
  2842. case CS_CONNECTED:
  2843. case CS_CONNECTED_FULL:
  2844. if (!conn->fin_sent) {
  2845. conn->fin_sent = true;
  2846. conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
  2847. }
  2848. break;
  2849. case CS_SYN_SENT:
  2850. conn->rto_timeout = utp_call_get_milliseconds(conn->ctx, conn) + min<uint>(conn->rto * 2, 60);
  2851. default:
  2852. break;
  2853. }
  2854. }
  2855. }
  2856. utp_context* utp_get_context(utp_socket *socket) {
  2857. assert(socket);
  2858. return socket ? socket->ctx : NULL;
  2859. }
  2860. void* utp_set_userdata(utp_socket *socket, void *userdata) {
  2861. assert(socket);
  2862. if (socket) socket->userdata = userdata;
  2863. return socket ? socket->userdata : NULL;
  2864. }
  2865. void* utp_get_userdata(utp_socket *socket) {
  2866. assert(socket);
  2867. return socket ? socket->userdata : NULL;
  2868. }
  2869. void struct_utp_context::log(int level, utp_socket *socket, char const *fmt, ...)
  2870. {
  2871. if (!would_log(level)) {
  2872. return;
  2873. }
  2874. va_list va;
  2875. va_start(va, fmt);
  2876. log_unchecked(socket, fmt, va);
  2877. va_end(va);
  2878. }
  2879. void struct_utp_context::log_unchecked(utp_socket *socket, char const *fmt, ...)
  2880. {
  2881. va_list va;
  2882. char buf[4096];
  2883. va_start(va, fmt);
  2884. vsnprintf(buf, 4096, fmt, va);
  2885. buf[4095] = '\0';
  2886. va_end(va);
  2887. utp_call_log(this, socket, (const byte *)buf);
  2888. }
  2889. inline bool struct_utp_context::would_log(int level)
  2890. {
  2891. if (level == UTP_LOG_NORMAL) return log_normal;
  2892. if (level == UTP_LOG_MTU) return log_mtu;
  2893. if (level == UTP_LOG_DEBUG) return log_debug;
  2894. return true;
  2895. }
  2896. utp_socket_stats* utp_get_stats(utp_socket *socket)
  2897. {
  2898. #ifdef _DEBUG
  2899. assert(socket);
  2900. if (!socket) return NULL;
  2901. socket->_stats.mtu_guess = socket->mtu_last ? socket->mtu_last : socket->mtu_ceiling;
  2902. return &socket->_stats;
  2903. #else
  2904. return NULL;
  2905. #endif
  2906. }