router.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. package vnet
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/pion/logging"
  12. )
  13. const (
  14. defaultRouterQueueSize = 0 // unlimited
  15. )
  16. var (
  17. errInvalidLocalIPinStaticIPs = errors.New("invalid local IP in StaticIPs")
  18. errLocalIPBeyondStaticIPsSubset = errors.New("mapped in StaticIPs is beyond subnet")
  19. errLocalIPNoStaticsIPsAssociated = errors.New("all StaticIPs must have associated local IPs")
  20. errRouterAlreadyStarted = errors.New("router already started")
  21. errRouterAlreadyStopped = errors.New("router already stopped")
  22. errStaticIPisBeyondSubnet = errors.New("static IP is beyond subnet")
  23. errAddressSpaceExhausted = errors.New("address space exhausted")
  24. errNoIPAddrEth0 = errors.New("no IP address is assigned for eth0")
  25. )
  26. // Generate a unique router name
  27. var assignRouterName = func() func() string { //nolint:gochecknoglobals
  28. var routerIDCtr uint64
  29. return func() string {
  30. n := atomic.AddUint64(&routerIDCtr, 1)
  31. return fmt.Sprintf("router%d", n)
  32. }
  33. }()
  34. // RouterConfig ...
  35. type RouterConfig struct {
  36. // Name of router. If not specified, a unique name will be assigned.
  37. Name string
  38. // CIDR notation, like "192.0.2.0/24"
  39. CIDR string
  40. // StaticIPs is an array of static IP addresses to be assigned for this router.
  41. // If no static IP address is given, the router will automatically assign
  42. // an IP address.
  43. // This will be ignored if this router is the root.
  44. StaticIPs []string
  45. // StaticIP is deprecated. Use StaticIPs.
  46. StaticIP string
  47. // Internal queue size
  48. QueueSize int
  49. // Effective only when this router has a parent router
  50. NATType *NATType
  51. // Minimum Delay
  52. MinDelay time.Duration
  53. // Max Jitter
  54. MaxJitter time.Duration
  55. // Logger factory
  56. LoggerFactory logging.LoggerFactory
  57. }
  58. // NIC is a nework inerface controller that interfaces Router
  59. type NIC interface {
  60. getInterface(ifName string) (*Interface, error)
  61. onInboundChunk(c Chunk)
  62. getStaticIPs() []net.IP
  63. setRouter(r *Router) error
  64. }
  65. // ChunkFilter is a handler users can add to filter chunks.
  66. // If the filter returns false, the packet will be dropped.
  67. type ChunkFilter func(c Chunk) bool
  68. // Router ...
  69. type Router struct {
  70. name string // read-only
  71. interfaces []*Interface // read-only
  72. ipv4Net *net.IPNet // read-only
  73. staticIPs []net.IP // read-only
  74. staticLocalIPs map[string]net.IP // read-only,
  75. lastID byte // requires mutex [x], used to assign the last digit of IPv4 address
  76. queue *chunkQueue // read-only
  77. parent *Router // read-only
  78. children []*Router // read-only
  79. natType *NATType // read-only
  80. nat *networkAddressTranslator // read-only
  81. nics map[string]NIC // read-only
  82. stopFunc func() // requires mutex [x]
  83. resolver *resolver // read-only
  84. chunkFilters []ChunkFilter // requires mutex [x]
  85. minDelay time.Duration // requires mutex [x]
  86. maxJitter time.Duration // requires mutex [x]
  87. mutex sync.RWMutex // thread-safe
  88. pushCh chan struct{} // writer requires mutex
  89. loggerFactory logging.LoggerFactory // read-only
  90. log logging.LeveledLogger // read-only
  91. }
  92. // NewRouter ...
  93. func NewRouter(config *RouterConfig) (*Router, error) {
  94. loggerFactory := config.LoggerFactory
  95. log := loggerFactory.NewLogger("vnet")
  96. _, ipv4Net, err := net.ParseCIDR(config.CIDR)
  97. if err != nil {
  98. return nil, err
  99. }
  100. queueSize := defaultRouterQueueSize
  101. if config.QueueSize > 0 {
  102. queueSize = config.QueueSize
  103. }
  104. // set up network interface, lo0
  105. lo0 := NewInterface(net.Interface{
  106. Index: 1,
  107. MTU: 16384,
  108. Name: lo0String,
  109. HardwareAddr: nil,
  110. Flags: net.FlagUp | net.FlagLoopback | net.FlagMulticast,
  111. })
  112. lo0.AddAddr(&net.IPAddr{IP: net.ParseIP("127.0.0.1"), Zone: ""})
  113. // set up network interface, eth0
  114. eth0 := NewInterface(net.Interface{
  115. Index: 2,
  116. MTU: 1500,
  117. Name: "eth0",
  118. HardwareAddr: newMACAddress(),
  119. Flags: net.FlagUp | net.FlagMulticast,
  120. })
  121. // local host name resolver
  122. resolver := newResolver(&resolverConfig{
  123. LoggerFactory: config.LoggerFactory,
  124. })
  125. name := config.Name
  126. if len(name) == 0 {
  127. name = assignRouterName()
  128. }
  129. var staticIPs []net.IP
  130. staticLocalIPs := map[string]net.IP{}
  131. for _, ipStr := range config.StaticIPs {
  132. ipPair := strings.Split(ipStr, "/")
  133. if ip := net.ParseIP(ipPair[0]); ip != nil {
  134. if len(ipPair) > 1 {
  135. locIP := net.ParseIP(ipPair[1])
  136. if locIP == nil {
  137. return nil, errInvalidLocalIPinStaticIPs
  138. }
  139. if !ipv4Net.Contains(locIP) {
  140. return nil, fmt.Errorf("local IP %s %w", locIP.String(), errLocalIPBeyondStaticIPsSubset)
  141. }
  142. staticLocalIPs[ip.String()] = locIP
  143. }
  144. staticIPs = append(staticIPs, ip)
  145. }
  146. }
  147. if len(config.StaticIP) > 0 {
  148. log.Warn("StaticIP is deprecated. Use StaticIPs instead")
  149. if ip := net.ParseIP(config.StaticIP); ip != nil {
  150. staticIPs = append(staticIPs, ip)
  151. }
  152. }
  153. if nStaticLocal := len(staticLocalIPs); nStaticLocal > 0 {
  154. if nStaticLocal != len(staticIPs) {
  155. return nil, errLocalIPNoStaticsIPsAssociated
  156. }
  157. }
  158. return &Router{
  159. name: name,
  160. interfaces: []*Interface{lo0, eth0},
  161. ipv4Net: ipv4Net,
  162. staticIPs: staticIPs,
  163. staticLocalIPs: staticLocalIPs,
  164. queue: newChunkQueue(queueSize, 0),
  165. natType: config.NATType,
  166. nics: map[string]NIC{},
  167. resolver: resolver,
  168. minDelay: config.MinDelay,
  169. maxJitter: config.MaxJitter,
  170. pushCh: make(chan struct{}, 1),
  171. loggerFactory: loggerFactory,
  172. log: log,
  173. }, nil
  174. }
  175. // caller must hold the mutex
  176. func (r *Router) getInterfaces() ([]*Interface, error) {
  177. if len(r.interfaces) == 0 {
  178. return nil, fmt.Errorf("%w is available", errNoInterface)
  179. }
  180. return r.interfaces, nil
  181. }
  182. func (r *Router) getInterface(ifName string) (*Interface, error) {
  183. r.mutex.RLock()
  184. defer r.mutex.RUnlock()
  185. ifs, err := r.getInterfaces()
  186. if err != nil {
  187. return nil, err
  188. }
  189. for _, ifc := range ifs {
  190. if ifc.Name == ifName {
  191. return ifc, nil
  192. }
  193. }
  194. return nil, fmt.Errorf("interface %s %w", ifName, errNotFound)
  195. }
  196. // Start ...
  197. func (r *Router) Start() error {
  198. r.mutex.Lock()
  199. defer r.mutex.Unlock()
  200. if r.stopFunc != nil {
  201. return errRouterAlreadyStarted
  202. }
  203. cancelCh := make(chan struct{})
  204. go func() {
  205. loop:
  206. for {
  207. d, err := r.processChunks()
  208. if err != nil {
  209. r.log.Errorf("[%s] %s", r.name, err.Error())
  210. break
  211. }
  212. if d <= 0 {
  213. select {
  214. case <-r.pushCh:
  215. case <-cancelCh:
  216. break loop
  217. }
  218. } else {
  219. t := time.NewTimer(d)
  220. select {
  221. case <-t.C:
  222. case <-cancelCh:
  223. break loop
  224. }
  225. }
  226. }
  227. }()
  228. r.stopFunc = func() {
  229. close(cancelCh)
  230. }
  231. for _, child := range r.children {
  232. if err := child.Start(); err != nil {
  233. return err
  234. }
  235. }
  236. return nil
  237. }
  238. // Stop ...
  239. func (r *Router) Stop() error {
  240. r.mutex.Lock()
  241. defer r.mutex.Unlock()
  242. if r.stopFunc == nil {
  243. return errRouterAlreadyStopped
  244. }
  245. for _, router := range r.children {
  246. r.mutex.Unlock()
  247. err := router.Stop()
  248. r.mutex.Lock()
  249. if err != nil {
  250. return err
  251. }
  252. }
  253. r.stopFunc()
  254. r.stopFunc = nil
  255. return nil
  256. }
  257. // caller must hold the mutex
  258. func (r *Router) addNIC(nic NIC) error {
  259. ifc, err := nic.getInterface("eth0")
  260. if err != nil {
  261. return err
  262. }
  263. var ips []net.IP
  264. if ips = nic.getStaticIPs(); len(ips) == 0 {
  265. // assign an IP address
  266. ip, err2 := r.assignIPAddress()
  267. if err2 != nil {
  268. return err2
  269. }
  270. ips = append(ips, ip)
  271. }
  272. for _, ip := range ips {
  273. if !r.ipv4Net.Contains(ip) {
  274. return fmt.Errorf("%w: %s", errStaticIPisBeyondSubnet, r.ipv4Net.String())
  275. }
  276. ifc.AddAddr(&net.IPNet{
  277. IP: ip,
  278. Mask: r.ipv4Net.Mask,
  279. })
  280. r.nics[ip.String()] = nic
  281. }
  282. if err = nic.setRouter(r); err != nil {
  283. return err
  284. }
  285. return nil
  286. }
  287. // AddRouter adds a chile Router.
  288. func (r *Router) AddRouter(router *Router) error {
  289. r.mutex.Lock()
  290. defer r.mutex.Unlock()
  291. // Router is a NIC. Add it as a NIC so that packets are routed to this child
  292. // router.
  293. err := r.addNIC(router)
  294. if err != nil {
  295. return err
  296. }
  297. if err = router.setRouter(r); err != nil {
  298. return err
  299. }
  300. r.children = append(r.children, router)
  301. return nil
  302. }
  303. // AddChildRouter is like AddRouter, but does not add the child routers NIC to
  304. // the parent. This has to be done manually by calling AddNet, which allows to
  305. // use a wrapper around the subrouters NIC.
  306. // AddNet MUST be called before AddChildRouter.
  307. func (r *Router) AddChildRouter(router *Router) error {
  308. r.mutex.Lock()
  309. defer r.mutex.Unlock()
  310. if err := router.setRouter(r); err != nil {
  311. return err
  312. }
  313. r.children = append(r.children, router)
  314. return nil
  315. }
  316. // AddNet ...
  317. func (r *Router) AddNet(nic NIC) error {
  318. r.mutex.Lock()
  319. defer r.mutex.Unlock()
  320. return r.addNIC(nic)
  321. }
  322. // AddHost adds a mapping of hostname and an IP address to the local resolver.
  323. func (r *Router) AddHost(hostName string, ipAddr string) error {
  324. return r.resolver.addHost(hostName, ipAddr)
  325. }
  326. // AddChunkFilter adds a filter for chunks traversing this router.
  327. // You may add more than one filter. The filters are called in the order of this method call.
  328. // If a chunk is dropped by a filter, subsequent filter will not receive the chunk.
  329. func (r *Router) AddChunkFilter(filter ChunkFilter) {
  330. r.mutex.Lock()
  331. defer r.mutex.Unlock()
  332. r.chunkFilters = append(r.chunkFilters, filter)
  333. }
  334. // caller should hold the mutex
  335. func (r *Router) assignIPAddress() (net.IP, error) {
  336. // See: https://stackoverflow.com/questions/14915188/ip-address-ending-with-zero
  337. if r.lastID == 0xfe {
  338. return nil, errAddressSpaceExhausted
  339. }
  340. ip := make(net.IP, 4)
  341. copy(ip, r.ipv4Net.IP[:3])
  342. r.lastID++
  343. ip[3] = r.lastID
  344. return ip, nil
  345. }
  346. func (r *Router) push(c Chunk) {
  347. r.mutex.Lock()
  348. defer r.mutex.Unlock()
  349. r.log.Debugf("[%s] route %s", r.name, c.String())
  350. if r.stopFunc != nil {
  351. c.setTimestamp()
  352. if r.queue.push(c) {
  353. select {
  354. case r.pushCh <- struct{}{}:
  355. default:
  356. }
  357. } else {
  358. r.log.Warnf("[%s] queue was full. dropped a chunk", r.name)
  359. }
  360. }
  361. }
  362. func (r *Router) processChunks() (time.Duration, error) {
  363. r.mutex.Lock()
  364. defer r.mutex.Unlock()
  365. // Introduce jitter by delaying the processing of chunks.
  366. if r.maxJitter > 0 {
  367. jitter := time.Duration(rand.Int63n(int64(r.maxJitter))) //nolint:gosec
  368. time.Sleep(jitter)
  369. }
  370. // cutOff
  371. // v min delay
  372. // |<--->|
  373. // +------------:--
  374. // |OOOOOOXXXXX : --> time
  375. // +------------:--
  376. // |<--->| now
  377. // due
  378. enteredAt := time.Now()
  379. cutOff := enteredAt.Add(-r.minDelay)
  380. var d time.Duration // the next sleep duration
  381. for {
  382. d = 0
  383. c := r.queue.peek()
  384. if c == nil {
  385. break // no more chunk in the queue
  386. }
  387. // check timestamp to find if the chunk is due
  388. if c.getTimestamp().After(cutOff) {
  389. // There is one or more chunk in the queue but none of them are due.
  390. // Calculate the next sleep duration here.
  391. nextExpire := c.getTimestamp().Add(r.minDelay)
  392. d = nextExpire.Sub(enteredAt)
  393. break
  394. }
  395. var ok bool
  396. if c, ok = r.queue.pop(); !ok {
  397. break // no more chunk in the queue
  398. }
  399. blocked := false
  400. for i := 0; i < len(r.chunkFilters); i++ {
  401. filter := r.chunkFilters[i]
  402. if !filter(c) {
  403. blocked = true
  404. break
  405. }
  406. }
  407. if blocked {
  408. continue // discard
  409. }
  410. dstIP := c.getDestinationIP()
  411. // check if the desination is in our subnet
  412. if r.ipv4Net.Contains(dstIP) {
  413. // search for the destination NIC
  414. var nic NIC
  415. if nic, ok = r.nics[dstIP.String()]; !ok {
  416. // NIC not found. drop it.
  417. r.log.Debugf("[%s] %s unreachable", r.name, c.String())
  418. continue
  419. }
  420. // found the NIC, forward the chunk to the NIC.
  421. // call to NIC must unlock mutex
  422. r.mutex.Unlock()
  423. nic.onInboundChunk(c)
  424. r.mutex.Lock()
  425. continue
  426. }
  427. // the destination is outside of this subnet
  428. // is this WAN?
  429. if r.parent == nil {
  430. // this WAN. No route for this chunk
  431. r.log.Debugf("[%s] no route found for %s", r.name, c.String())
  432. continue
  433. }
  434. // Pass it to the parent via NAT
  435. toParent, err := r.nat.translateOutbound(c)
  436. if err != nil {
  437. return 0, err
  438. }
  439. if toParent == nil {
  440. continue
  441. }
  442. //nolint:godox
  443. /* FIXME: this implementation would introduce a duplicate packet!
  444. if r.nat.natType.Hairpining {
  445. hairpinned, err := r.nat.translateInbound(toParent)
  446. if err != nil {
  447. r.log.Warnf("[%s] %s", r.name, err.Error())
  448. } else {
  449. go func() {
  450. r.push(hairpinned)
  451. }()
  452. }
  453. }
  454. */
  455. // call to parent router mutex unlock mutex
  456. r.mutex.Unlock()
  457. r.parent.push(toParent)
  458. r.mutex.Lock()
  459. }
  460. return d, nil
  461. }
  462. // caller must hold the mutex
  463. func (r *Router) setRouter(parent *Router) error {
  464. r.parent = parent
  465. r.resolver.setParent(parent.resolver)
  466. // when this method is called, one or more IP address has already been assigned by
  467. // the parent router.
  468. ifc, err := r.getInterface("eth0")
  469. if err != nil {
  470. return err
  471. }
  472. if len(ifc.addrs) == 0 {
  473. return errNoIPAddrEth0
  474. }
  475. mappedIPs := []net.IP{}
  476. localIPs := []net.IP{}
  477. for _, ifcAddr := range ifc.addrs {
  478. var ip net.IP
  479. switch addr := ifcAddr.(type) {
  480. case *net.IPNet:
  481. ip = addr.IP
  482. case *net.IPAddr: // Do we really need this case?
  483. ip = addr.IP
  484. default:
  485. }
  486. if ip == nil {
  487. continue
  488. }
  489. mappedIPs = append(mappedIPs, ip)
  490. if locIP := r.staticLocalIPs[ip.String()]; locIP != nil {
  491. localIPs = append(localIPs, locIP)
  492. }
  493. }
  494. // Set up NAT here
  495. if r.natType == nil {
  496. r.natType = &NATType{
  497. MappingBehavior: EndpointIndependent,
  498. FilteringBehavior: EndpointAddrPortDependent,
  499. Hairpining: false,
  500. PortPreservation: false,
  501. MappingLifeTime: 30 * time.Second,
  502. }
  503. }
  504. r.nat, err = newNAT(&natConfig{
  505. name: r.name,
  506. natType: *r.natType,
  507. mappedIPs: mappedIPs,
  508. localIPs: localIPs,
  509. loggerFactory: r.loggerFactory,
  510. })
  511. if err != nil {
  512. return err
  513. }
  514. return nil
  515. }
  516. func (r *Router) onInboundChunk(c Chunk) {
  517. fromParent, err := r.nat.translateInbound(c)
  518. if err != nil {
  519. r.log.Warnf("[%s] %s", r.name, err.Error())
  520. return
  521. }
  522. r.push(fromParent)
  523. }
  524. func (r *Router) getStaticIPs() []net.IP {
  525. return r.staticIPs
  526. }