portmapping.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package guestman
  15. import (
  16. "context"
  17. "sync"
  18. "yunion.io/x/jsonutils"
  19. "yunion.io/x/log"
  20. "yunion.io/x/pkg/errors"
  21. "yunion.io/x/pkg/util/sets"
  22. "yunion.io/x/onecloud/pkg/apis/compute"
  23. "yunion.io/x/onecloud/pkg/hostman/options"
  24. "yunion.io/x/onecloud/pkg/httperrors"
  25. "yunion.io/x/onecloud/pkg/mcclient"
  26. "yunion.io/x/onecloud/pkg/mcclient/auth"
  27. computemod "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
  28. "yunion.io/x/onecloud/pkg/util/netutils2/getport"
  29. )
  30. var (
  31. allocatePortLock sync.Mutex
  32. )
  33. type IPortMappingManager interface {
  34. AllocateGuestPortMappings(ctx context.Context, userCred mcclient.TokenCredential, guest GuestRuntimeInstance) error
  35. }
  36. type portMappingManager struct {
  37. manager *SGuestManager
  38. }
  39. func NewPortMappingManager(manager *SGuestManager) IPortMappingManager {
  40. return &portMappingManager{
  41. manager: manager,
  42. }
  43. }
  44. func (m *portMappingManager) GetGuestPortMappings(guest GuestRuntimeInstance) map[string]compute.GuestPortMappings {
  45. nics := guest.GetSourceDesc().Nics
  46. pms := make(map[string]compute.GuestPortMappings)
  47. for _, nic := range nics {
  48. if len(nic.PortMappings) == 0 {
  49. continue
  50. }
  51. pms[nic.NetId] = nic.PortMappings
  52. }
  53. return pms
  54. }
  55. func (m *portMappingManager) IsGuestHasPortMapping(guest GuestRuntimeInstance) bool {
  56. return len(m.GetGuestPortMappings(guest)) == 0
  57. }
  58. func (m *portMappingManager) AllocateGuestPortMappings(ctx context.Context, userCred mcclient.TokenCredential, guest GuestRuntimeInstance) error {
  59. allocatePortLock.Lock()
  60. defer allocatePortLock.Unlock()
  61. for idx, nic := range guest.GetDesc().Nics {
  62. if len(nic.PortMappings) == 0 {
  63. continue
  64. }
  65. newPms, err := m.allocatePortMappings(guest, nic.PortMappings)
  66. if err != nil {
  67. return errors.Wrapf(err, "allocateGuestPortMapping for nic %d: %s", idx, jsonutils.Marshal(nic.PortMappings))
  68. }
  69. // update allocated port mappings
  70. if err := m.setPortMappings(ctx, userCred, guest, idx, newPms); err != nil {
  71. return errors.Wrapf(err, "setPortMappings for nic %d", idx)
  72. }
  73. }
  74. return nil
  75. }
  76. func (m *portMappingManager) setPortMappings(ctx context.Context, userCred mcclient.TokenCredential, gst GuestRuntimeInstance, nicIdx int, pms compute.GuestPortMappings) error {
  77. // update desc
  78. desc := gst.GetDesc()
  79. nic := desc.Nics[nicIdx]
  80. nic.PortMappings = pms
  81. desc.Nics[nicIdx] = nic
  82. // update port mapping info to controller
  83. body := jsonutils.Marshal(map[string]interface{}{
  84. "port_mappings": pms,
  85. })
  86. session := auth.GetSession(ctx, userCred, options.HostOptions.Region)
  87. if _, err := computemod.Servernetworks.Update(session, gst.GetId(), nic.NetId, nil, body); err != nil {
  88. return errors.Wrapf(err, "update server %s network %s with port_mappings %s", gst.GetId(), nic.NetId, body.String())
  89. }
  90. // save desc
  91. gst.SetDesc(desc)
  92. return SaveDesc(gst, desc)
  93. }
  94. func (m *portMappingManager) getOtherGuests(gst GuestRuntimeInstance) []GuestRuntimeInstance {
  95. others := make([]GuestRuntimeInstance, 0)
  96. m.manager.Servers.Range(func(id, value interface{}) bool {
  97. if id == gst.GetId() {
  98. return true
  99. }
  100. ins := value.(GuestRuntimeInstance)
  101. others = append(others, ins)
  102. return true
  103. })
  104. return others
  105. }
  106. func (m *portMappingManager) getGuestFlattenPortMappings(guest GuestRuntimeInstance) compute.GuestPortMappings {
  107. ret := make([]*compute.GuestPortMapping, 0)
  108. pms := m.GetGuestPortMappings(guest)
  109. for _, pm := range pms {
  110. for _, p := range pm {
  111. ret = append(ret, p)
  112. }
  113. }
  114. return ret
  115. }
  116. func (m *portMappingManager) getOtherGuestsUsedPorts(gst GuestRuntimeInstance) (map[compute.GuestPortMappingProtocol]sets.Int, error) {
  117. others := m.getOtherGuests(gst)
  118. ret := make(map[compute.GuestPortMappingProtocol]sets.Int)
  119. for _, ins := range others {
  120. pms := m.getGuestFlattenPortMappings(ins)
  121. for _, pm := range pms {
  122. ps, ok := ret[pm.Protocol]
  123. if !ok {
  124. ps = sets.NewInt()
  125. }
  126. if pm.HostPort == nil {
  127. //return nil, errors.Errorf("guest (%s/%s) portmap %s has nil host port", ins.GetId(), ins.GetName(), jsonutils.Marshal(pm))
  128. log.Warningf("%s", errors.Errorf("guest (%s/%s) portmap %s has nil host port", ins.GetId(), ins.GetName(), jsonutils.Marshal(pm)))
  129. continue
  130. }
  131. ps.Insert(*pm.HostPort)
  132. ret[pm.Protocol] = ps
  133. }
  134. }
  135. return ret, nil
  136. }
  137. func (m *portMappingManager) allocatePortMappings(gst GuestRuntimeInstance, input compute.GuestPortMappings) (compute.GuestPortMappings, error) {
  138. result := make([]*compute.GuestPortMapping, len(input))
  139. allocPorts := make(map[compute.GuestPortMappingProtocol]sets.Int)
  140. // 检查是否有需要按规则分配的端口映射
  141. hasRuleMapping := false
  142. for _, pm := range input {
  143. if pm.Rule != nil && pm.Rule.FirstPortOffset != nil {
  144. hasRuleMapping = true
  145. break
  146. }
  147. }
  148. if hasRuleMapping {
  149. // 如果有规则映射,需要先找到第一个空闲端口,然后按偏移量分配
  150. return m.allocatePortMappingsWithRule(gst, input, allocPorts)
  151. }
  152. // 原有的分配逻辑
  153. for idx := range input {
  154. data := input[idx]
  155. if _, ok := allocPorts[data.Protocol]; !ok {
  156. allocPorts[data.Protocol] = sets.NewInt()
  157. }
  158. pm, err := m.allocatePortMapping(gst, data, allocPorts)
  159. if err != nil {
  160. return nil, errors.Wrapf(err, "get port mapping %s", jsonutils.Marshal(input[idx]))
  161. }
  162. result[idx] = pm
  163. allocPorts[data.Protocol].Insert(*pm.HostPort)
  164. }
  165. return result, nil
  166. }
  167. func (m *portMappingManager) allocatePortMappingsWithRule(gst GuestRuntimeInstance, input compute.GuestPortMappings, allocPorts map[compute.GuestPortMappingProtocol]sets.Int) (compute.GuestPortMappings, error) {
  168. result := make([]*compute.GuestPortMapping, len(input))
  169. // 按协议分组,分别处理
  170. indices := make([]*compute.GuestPortMapping, 0)
  171. for idx, pm := range input {
  172. if pm.Rule != nil && pm.Rule.FirstPortOffset != nil {
  173. indices = append(indices, input[idx])
  174. }
  175. }
  176. // 为每个协议组分配端口
  177. if err := m.allocateProtocolGroupWithRule(gst, input, result, indices, allocPorts); err != nil {
  178. return nil, errors.Wrapf(err, "allocate portmappings with rule: %s", jsonutils.Marshal(indices))
  179. }
  180. // 处理没有规则的端口映射
  181. for idx, pm := range input {
  182. if pm.Rule == nil || pm.Rule.FirstPortOffset == nil {
  183. if _, ok := allocPorts[pm.Protocol]; !ok {
  184. allocPorts[pm.Protocol] = sets.NewInt()
  185. }
  186. allocatedPm, err := m.allocatePortMapping(gst, pm, allocPorts)
  187. if err != nil {
  188. return nil, errors.Wrapf(err, "get port mapping %s", jsonutils.Marshal(pm))
  189. }
  190. result[idx] = allocatedPm
  191. allocPorts[pm.Protocol].Insert(*allocatedPm.HostPort)
  192. }
  193. }
  194. return result, nil
  195. }
  196. func (m *portMappingManager) allocateProtocolGroupWithRule(gst GuestRuntimeInstance, input compute.GuestPortMappings, result compute.GuestPortMappings, indices []*compute.GuestPortMapping, allocPorts map[compute.GuestPortMappingProtocol]sets.Int) error {
  197. // 获取其他虚拟机已使用的端口
  198. otherPorts, err := m.getOtherGuestsUsedPorts(gst)
  199. if err != nil {
  200. return errors.Wrap(err, "getOtherGuestsUsedPorts")
  201. }
  202. // 获取当前协议已分配的端口
  203. usedPorts := map[compute.GuestPortMappingProtocol]sets.Int{
  204. compute.GuestPortMappingProtocolTCP: sets.NewInt(),
  205. compute.GuestPortMappingProtocolUDP: sets.NewInt(),
  206. }
  207. for proto, ports := range otherPorts {
  208. usedPorts[proto].Insert(ports.List()...)
  209. }
  210. for proto, allocPortsSet := range allocPorts {
  211. if ports, ok := usedPorts[proto]; ok {
  212. ports.Insert(allocPortsSet.List()...)
  213. usedPorts[proto] = ports
  214. } else {
  215. usedPorts[proto] = sets.NewInt(allocPortsSet.List()...)
  216. }
  217. }
  218. // 确定端口范围(从 hostman options 读取)
  219. start := options.HostOptions.PortMappingRangeStart
  220. end := options.HostOptions.PortMappingRangeEnd
  221. // 尝试不同的 basePort,直到找到满足所有规则要求的端口
  222. success := false
  223. for basePort := start; basePort <= end; basePort++ {
  224. // 检查这个 basePort 是否能满足所有规则要求
  225. if m.canAllocateWithBasePort(basePort, input, indices, usedPorts) {
  226. // 分配端口
  227. if err := m.allocateWithBasePort(basePort, input, result, indices, usedPorts, allocPorts); err != nil {
  228. // 如果分配失败,继续尝试下一个 basePort
  229. continue
  230. }
  231. success = true
  232. break
  233. }
  234. }
  235. if !success {
  236. return errors.Errorf("cannot find suitable base port for protocol %s in range %d-%d", indices[0].Protocol, start, end)
  237. }
  238. return nil
  239. }
  240. func (m *portMappingManager) checkPortIsUsed(port int, protocol compute.GuestPortMappingProtocol, usedPorts map[compute.GuestPortMappingProtocol]sets.Int) bool {
  241. portProtocol := getport.TCP
  242. if protocol == compute.GuestPortMappingProtocolUDP {
  243. portProtocol = getport.UDP
  244. }
  245. if _, ok := usedPorts[protocol]; !ok {
  246. usedPorts[protocol] = sets.NewInt()
  247. }
  248. return usedPorts[protocol].Has(port) || getport.IsPortUsed(portProtocol, "", port)
  249. }
  250. func (m *portMappingManager) canAllocateWithBasePort(basePort int, input compute.GuestPortMappings, indices []*compute.GuestPortMapping, usedPorts map[compute.GuestPortMappingProtocol]sets.Int) bool {
  251. baseProtocol := indices[0].Protocol
  252. // 检查 basePort 本身是否可用
  253. if m.checkPortIsUsed(basePort, baseProtocol, usedPorts) {
  254. return false
  255. }
  256. // 检查所有设置了规则的端口是否都可用
  257. for _, pm := range indices {
  258. offset := *pm.Rule.FirstPortOffset
  259. targetPort := basePort + offset
  260. // 检查目标端口是否在配置的范围内
  261. if targetPort > options.HostOptions.PortMappingRangeEnd {
  262. return false
  263. }
  264. // 检查目标端口是否已被使用
  265. if m.checkPortIsUsed(targetPort, pm.Protocol, usedPorts) {
  266. return false
  267. }
  268. }
  269. return true
  270. }
  271. func (m *portMappingManager) allocateWithBasePort(basePort int, input compute.GuestPortMappings, result compute.GuestPortMappings, indices []*compute.GuestPortMapping, usedPorts, allocPorts map[compute.GuestPortMappingProtocol]sets.Int) error {
  272. // 分配所有设置了规则的端口
  273. for idx, _ := range indices {
  274. pm := input[idx]
  275. offset := *pm.Rule.FirstPortOffset
  276. targetPort := basePort + offset
  277. // 再次检查端口可用性(双重检查)
  278. if m.checkPortIsUsed(targetPort, pm.Protocol, usedPorts) {
  279. return errors.Errorf("port %d is not available for protocol %s", targetPort, pm.Protocol)
  280. }
  281. // 创建分配的端口映射
  282. runtimePm := &compute.GuestPortMapping{}
  283. if err := jsonutils.Marshal(pm).Unmarshal(runtimePm); err != nil {
  284. return errors.Wrap(err, "unmarshal to runtime port mapping")
  285. }
  286. runtimePm.HostPort = &targetPort
  287. if runtimePm.Port == -1 {
  288. runtimePm.Port = targetPort
  289. }
  290. result[idx] = runtimePm
  291. // 更新已使用端口集合
  292. usedPorts[pm.Protocol].Insert(targetPort)
  293. if _, ok := allocPorts[pm.Protocol]; !ok {
  294. allocPorts[pm.Protocol] = sets.NewInt()
  295. }
  296. allocPorts[pm.Protocol].Insert(targetPort)
  297. }
  298. return nil
  299. }
  300. func (m *portMappingManager) allocatePortMapping(gst GuestRuntimeInstance, pm *compute.GuestPortMapping, allocPorts map[compute.GuestPortMappingProtocol]sets.Int) (*compute.GuestPortMapping, error) {
  301. otherPorts, err := m.getOtherGuestsUsedPorts(gst)
  302. if err != nil {
  303. return nil, errors.Wrap(err, "getOtherPodsUsedPorts")
  304. }
  305. // copy to runtime port mapping
  306. runtimePm := &compute.GuestPortMapping{}
  307. if err := jsonutils.Marshal(pm).Unmarshal(runtimePm); err != nil {
  308. return nil, errors.Wrap(err, "unmarshal to runtime port mapping")
  309. }
  310. portProtocol := getport.TCP
  311. switch pm.Protocol {
  312. case compute.GuestPortMappingProtocolTCP:
  313. portProtocol = getport.TCP
  314. case compute.GuestPortMappingProtocolUDP:
  315. portProtocol = getport.UDP
  316. default:
  317. return nil, errors.Errorf("invalid protocol: %q", pm.Protocol)
  318. }
  319. if pm.HostPort != nil {
  320. runtimePm.HostPort = pm.HostPort
  321. if getport.IsPortUsed(portProtocol, "", *pm.HostPort) {
  322. return nil, httperrors.NewInputParameterError("host_port %d is used", *pm.HostPort)
  323. }
  324. usedPorts, ok := otherPorts[pm.Protocol]
  325. if ok {
  326. if usedPorts.Has(*pm.HostPort) {
  327. return nil, errors.Errorf("%s host_port %d is already used", pm.Protocol, *pm.HostPort)
  328. }
  329. }
  330. allocProtoPorts, ok := allocPorts[pm.Protocol]
  331. if ok {
  332. if allocProtoPorts.Has(*pm.HostPort) {
  333. return nil, errors.Errorf("%s host_port %d is already allocated", pm.Protocol, *pm.HostPort)
  334. }
  335. }
  336. if runtimePm.Port == -1 {
  337. runtimePm.Port = *pm.HostPort
  338. }
  339. return runtimePm, nil
  340. } else {
  341. start := options.HostOptions.PortMappingRangeStart
  342. end := options.HostOptions.PortMappingRangeEnd
  343. if pm.HostPortRange != nil {
  344. if pm.HostPortRange.Start > start {
  345. start = pm.HostPortRange.Start
  346. }
  347. if pm.HostPortRange.End < end {
  348. end = pm.HostPortRange.End
  349. }
  350. }
  351. otherPodPorts, ok := otherPorts[pm.Protocol]
  352. if !ok {
  353. otherPodPorts = sets.NewInt()
  354. }
  355. allocProtoPorts, ok := allocPorts[pm.Protocol]
  356. if ok {
  357. otherPodPorts.Insert(allocProtoPorts.List()...)
  358. }
  359. portResult, err := getport.GetPortByRangeBySets(portProtocol, start, end, otherPodPorts)
  360. if err != nil {
  361. return nil, errors.Wrapf(err, "listen %s port inside %d and %d", pm.Protocol, start, end)
  362. }
  363. runtimePm.HostPort = &portResult.Port
  364. if runtimePm.Port == -1 {
  365. runtimePm.Port = portResult.Port
  366. }
  367. return runtimePm, nil
  368. }
  369. }