| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- // Copyright 2019 Yunion
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package guestman
- import (
- "context"
- "sync"
- "yunion.io/x/jsonutils"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/sets"
- "yunion.io/x/onecloud/pkg/apis/compute"
- "yunion.io/x/onecloud/pkg/hostman/options"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- computemod "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
- "yunion.io/x/onecloud/pkg/util/netutils2/getport"
- )
- var (
- allocatePortLock sync.Mutex
- )
- type IPortMappingManager interface {
- AllocateGuestPortMappings(ctx context.Context, userCred mcclient.TokenCredential, guest GuestRuntimeInstance) error
- }
- type portMappingManager struct {
- manager *SGuestManager
- }
- func NewPortMappingManager(manager *SGuestManager) IPortMappingManager {
- return &portMappingManager{
- manager: manager,
- }
- }
- func (m *portMappingManager) GetGuestPortMappings(guest GuestRuntimeInstance) map[string]compute.GuestPortMappings {
- nics := guest.GetSourceDesc().Nics
- pms := make(map[string]compute.GuestPortMappings)
- for _, nic := range nics {
- if len(nic.PortMappings) == 0 {
- continue
- }
- pms[nic.NetId] = nic.PortMappings
- }
- return pms
- }
- func (m *portMappingManager) IsGuestHasPortMapping(guest GuestRuntimeInstance) bool {
- return len(m.GetGuestPortMappings(guest)) == 0
- }
- func (m *portMappingManager) AllocateGuestPortMappings(ctx context.Context, userCred mcclient.TokenCredential, guest GuestRuntimeInstance) error {
- allocatePortLock.Lock()
- defer allocatePortLock.Unlock()
- for idx, nic := range guest.GetDesc().Nics {
- if len(nic.PortMappings) == 0 {
- continue
- }
- newPms, err := m.allocatePortMappings(guest, nic.PortMappings)
- if err != nil {
- return errors.Wrapf(err, "allocateGuestPortMapping for nic %d: %s", idx, jsonutils.Marshal(nic.PortMappings))
- }
- // update allocated port mappings
- if err := m.setPortMappings(ctx, userCred, guest, idx, newPms); err != nil {
- return errors.Wrapf(err, "setPortMappings for nic %d", idx)
- }
- }
- return nil
- }
- func (m *portMappingManager) setPortMappings(ctx context.Context, userCred mcclient.TokenCredential, gst GuestRuntimeInstance, nicIdx int, pms compute.GuestPortMappings) error {
- // update desc
- desc := gst.GetDesc()
- nic := desc.Nics[nicIdx]
- nic.PortMappings = pms
- desc.Nics[nicIdx] = nic
- // update port mapping info to controller
- body := jsonutils.Marshal(map[string]interface{}{
- "port_mappings": pms,
- })
- session := auth.GetSession(ctx, userCred, options.HostOptions.Region)
- if _, err := computemod.Servernetworks.Update(session, gst.GetId(), nic.NetId, nil, body); err != nil {
- return errors.Wrapf(err, "update server %s network %s with port_mappings %s", gst.GetId(), nic.NetId, body.String())
- }
- // save desc
- gst.SetDesc(desc)
- return SaveDesc(gst, desc)
- }
- func (m *portMappingManager) getOtherGuests(gst GuestRuntimeInstance) []GuestRuntimeInstance {
- others := make([]GuestRuntimeInstance, 0)
- m.manager.Servers.Range(func(id, value interface{}) bool {
- if id == gst.GetId() {
- return true
- }
- ins := value.(GuestRuntimeInstance)
- others = append(others, ins)
- return true
- })
- return others
- }
- func (m *portMappingManager) getGuestFlattenPortMappings(guest GuestRuntimeInstance) compute.GuestPortMappings {
- ret := make([]*compute.GuestPortMapping, 0)
- pms := m.GetGuestPortMappings(guest)
- for _, pm := range pms {
- for _, p := range pm {
- ret = append(ret, p)
- }
- }
- return ret
- }
- func (m *portMappingManager) getOtherGuestsUsedPorts(gst GuestRuntimeInstance) (map[compute.GuestPortMappingProtocol]sets.Int, error) {
- others := m.getOtherGuests(gst)
- ret := make(map[compute.GuestPortMappingProtocol]sets.Int)
- for _, ins := range others {
- pms := m.getGuestFlattenPortMappings(ins)
- for _, pm := range pms {
- ps, ok := ret[pm.Protocol]
- if !ok {
- ps = sets.NewInt()
- }
- if pm.HostPort == nil {
- //return nil, errors.Errorf("guest (%s/%s) portmap %s has nil host port", ins.GetId(), ins.GetName(), jsonutils.Marshal(pm))
- log.Warningf("%s", errors.Errorf("guest (%s/%s) portmap %s has nil host port", ins.GetId(), ins.GetName(), jsonutils.Marshal(pm)))
- continue
- }
- ps.Insert(*pm.HostPort)
- ret[pm.Protocol] = ps
- }
- }
- return ret, nil
- }
- func (m *portMappingManager) allocatePortMappings(gst GuestRuntimeInstance, input compute.GuestPortMappings) (compute.GuestPortMappings, error) {
- result := make([]*compute.GuestPortMapping, len(input))
- allocPorts := make(map[compute.GuestPortMappingProtocol]sets.Int)
- // 检查是否有需要按规则分配的端口映射
- hasRuleMapping := false
- for _, pm := range input {
- if pm.Rule != nil && pm.Rule.FirstPortOffset != nil {
- hasRuleMapping = true
- break
- }
- }
- if hasRuleMapping {
- // 如果有规则映射,需要先找到第一个空闲端口,然后按偏移量分配
- return m.allocatePortMappingsWithRule(gst, input, allocPorts)
- }
- // 原有的分配逻辑
- for idx := range input {
- data := input[idx]
- if _, ok := allocPorts[data.Protocol]; !ok {
- allocPorts[data.Protocol] = sets.NewInt()
- }
- pm, err := m.allocatePortMapping(gst, data, allocPorts)
- if err != nil {
- return nil, errors.Wrapf(err, "get port mapping %s", jsonutils.Marshal(input[idx]))
- }
- result[idx] = pm
- allocPorts[data.Protocol].Insert(*pm.HostPort)
- }
- return result, nil
- }
- func (m *portMappingManager) allocatePortMappingsWithRule(gst GuestRuntimeInstance, input compute.GuestPortMappings, allocPorts map[compute.GuestPortMappingProtocol]sets.Int) (compute.GuestPortMappings, error) {
- result := make([]*compute.GuestPortMapping, len(input))
- // 按协议分组,分别处理
- indices := make([]*compute.GuestPortMapping, 0)
- for idx, pm := range input {
- if pm.Rule != nil && pm.Rule.FirstPortOffset != nil {
- indices = append(indices, input[idx])
- }
- }
- // 为每个协议组分配端口
- if err := m.allocateProtocolGroupWithRule(gst, input, result, indices, allocPorts); err != nil {
- return nil, errors.Wrapf(err, "allocate portmappings with rule: %s", jsonutils.Marshal(indices))
- }
- // 处理没有规则的端口映射
- for idx, pm := range input {
- if pm.Rule == nil || pm.Rule.FirstPortOffset == nil {
- if _, ok := allocPorts[pm.Protocol]; !ok {
- allocPorts[pm.Protocol] = sets.NewInt()
- }
- allocatedPm, err := m.allocatePortMapping(gst, pm, allocPorts)
- if err != nil {
- return nil, errors.Wrapf(err, "get port mapping %s", jsonutils.Marshal(pm))
- }
- result[idx] = allocatedPm
- allocPorts[pm.Protocol].Insert(*allocatedPm.HostPort)
- }
- }
- return result, nil
- }
- func (m *portMappingManager) allocateProtocolGroupWithRule(gst GuestRuntimeInstance, input compute.GuestPortMappings, result compute.GuestPortMappings, indices []*compute.GuestPortMapping, allocPorts map[compute.GuestPortMappingProtocol]sets.Int) error {
- // 获取其他虚拟机已使用的端口
- otherPorts, err := m.getOtherGuestsUsedPorts(gst)
- if err != nil {
- return errors.Wrap(err, "getOtherGuestsUsedPorts")
- }
- // 获取当前协议已分配的端口
- usedPorts := map[compute.GuestPortMappingProtocol]sets.Int{
- compute.GuestPortMappingProtocolTCP: sets.NewInt(),
- compute.GuestPortMappingProtocolUDP: sets.NewInt(),
- }
- for proto, ports := range otherPorts {
- usedPorts[proto].Insert(ports.List()...)
- }
- for proto, allocPortsSet := range allocPorts {
- if ports, ok := usedPorts[proto]; ok {
- ports.Insert(allocPortsSet.List()...)
- usedPorts[proto] = ports
- } else {
- usedPorts[proto] = sets.NewInt(allocPortsSet.List()...)
- }
- }
- // 确定端口范围(从 hostman options 读取)
- start := options.HostOptions.PortMappingRangeStart
- end := options.HostOptions.PortMappingRangeEnd
- // 尝试不同的 basePort,直到找到满足所有规则要求的端口
- success := false
- for basePort := start; basePort <= end; basePort++ {
- // 检查这个 basePort 是否能满足所有规则要求
- if m.canAllocateWithBasePort(basePort, input, indices, usedPorts) {
- // 分配端口
- if err := m.allocateWithBasePort(basePort, input, result, indices, usedPorts, allocPorts); err != nil {
- // 如果分配失败,继续尝试下一个 basePort
- continue
- }
- success = true
- break
- }
- }
- if !success {
- return errors.Errorf("cannot find suitable base port for protocol %s in range %d-%d", indices[0].Protocol, start, end)
- }
- return nil
- }
- func (m *portMappingManager) checkPortIsUsed(port int, protocol compute.GuestPortMappingProtocol, usedPorts map[compute.GuestPortMappingProtocol]sets.Int) bool {
- portProtocol := getport.TCP
- if protocol == compute.GuestPortMappingProtocolUDP {
- portProtocol = getport.UDP
- }
- if _, ok := usedPorts[protocol]; !ok {
- usedPorts[protocol] = sets.NewInt()
- }
- return usedPorts[protocol].Has(port) || getport.IsPortUsed(portProtocol, "", port)
- }
- func (m *portMappingManager) canAllocateWithBasePort(basePort int, input compute.GuestPortMappings, indices []*compute.GuestPortMapping, usedPorts map[compute.GuestPortMappingProtocol]sets.Int) bool {
- baseProtocol := indices[0].Protocol
- // 检查 basePort 本身是否可用
- if m.checkPortIsUsed(basePort, baseProtocol, usedPorts) {
- return false
- }
- // 检查所有设置了规则的端口是否都可用
- for _, pm := range indices {
- offset := *pm.Rule.FirstPortOffset
- targetPort := basePort + offset
- // 检查目标端口是否在配置的范围内
- if targetPort > options.HostOptions.PortMappingRangeEnd {
- return false
- }
- // 检查目标端口是否已被使用
- if m.checkPortIsUsed(targetPort, pm.Protocol, usedPorts) {
- return false
- }
- }
- return true
- }
- func (m *portMappingManager) allocateWithBasePort(basePort int, input compute.GuestPortMappings, result compute.GuestPortMappings, indices []*compute.GuestPortMapping, usedPorts, allocPorts map[compute.GuestPortMappingProtocol]sets.Int) error {
- // 分配所有设置了规则的端口
- for idx, _ := range indices {
- pm := input[idx]
- offset := *pm.Rule.FirstPortOffset
- targetPort := basePort + offset
- // 再次检查端口可用性(双重检查)
- if m.checkPortIsUsed(targetPort, pm.Protocol, usedPorts) {
- return errors.Errorf("port %d is not available for protocol %s", targetPort, pm.Protocol)
- }
- // 创建分配的端口映射
- runtimePm := &compute.GuestPortMapping{}
- if err := jsonutils.Marshal(pm).Unmarshal(runtimePm); err != nil {
- return errors.Wrap(err, "unmarshal to runtime port mapping")
- }
- runtimePm.HostPort = &targetPort
- if runtimePm.Port == -1 {
- runtimePm.Port = targetPort
- }
- result[idx] = runtimePm
- // 更新已使用端口集合
- usedPorts[pm.Protocol].Insert(targetPort)
- if _, ok := allocPorts[pm.Protocol]; !ok {
- allocPorts[pm.Protocol] = sets.NewInt()
- }
- allocPorts[pm.Protocol].Insert(targetPort)
- }
- return nil
- }
- func (m *portMappingManager) allocatePortMapping(gst GuestRuntimeInstance, pm *compute.GuestPortMapping, allocPorts map[compute.GuestPortMappingProtocol]sets.Int) (*compute.GuestPortMapping, error) {
- otherPorts, err := m.getOtherGuestsUsedPorts(gst)
- if err != nil {
- return nil, errors.Wrap(err, "getOtherPodsUsedPorts")
- }
- // copy to runtime port mapping
- runtimePm := &compute.GuestPortMapping{}
- if err := jsonutils.Marshal(pm).Unmarshal(runtimePm); err != nil {
- return nil, errors.Wrap(err, "unmarshal to runtime port mapping")
- }
- portProtocol := getport.TCP
- switch pm.Protocol {
- case compute.GuestPortMappingProtocolTCP:
- portProtocol = getport.TCP
- case compute.GuestPortMappingProtocolUDP:
- portProtocol = getport.UDP
- default:
- return nil, errors.Errorf("invalid protocol: %q", pm.Protocol)
- }
- if pm.HostPort != nil {
- runtimePm.HostPort = pm.HostPort
- if getport.IsPortUsed(portProtocol, "", *pm.HostPort) {
- return nil, httperrors.NewInputParameterError("host_port %d is used", *pm.HostPort)
- }
- usedPorts, ok := otherPorts[pm.Protocol]
- if ok {
- if usedPorts.Has(*pm.HostPort) {
- return nil, errors.Errorf("%s host_port %d is already used", pm.Protocol, *pm.HostPort)
- }
- }
- allocProtoPorts, ok := allocPorts[pm.Protocol]
- if ok {
- if allocProtoPorts.Has(*pm.HostPort) {
- return nil, errors.Errorf("%s host_port %d is already allocated", pm.Protocol, *pm.HostPort)
- }
- }
- if runtimePm.Port == -1 {
- runtimePm.Port = *pm.HostPort
- }
- return runtimePm, nil
- } else {
- start := options.HostOptions.PortMappingRangeStart
- end := options.HostOptions.PortMappingRangeEnd
- if pm.HostPortRange != nil {
- if pm.HostPortRange.Start > start {
- start = pm.HostPortRange.Start
- }
- if pm.HostPortRange.End < end {
- end = pm.HostPortRange.End
- }
- }
- otherPodPorts, ok := otherPorts[pm.Protocol]
- if !ok {
- otherPodPorts = sets.NewInt()
- }
- allocProtoPorts, ok := allocPorts[pm.Protocol]
- if ok {
- otherPodPorts.Insert(allocProtoPorts.List()...)
- }
- portResult, err := getport.GetPortByRangeBySets(portProtocol, start, end, otherPodPorts)
- if err != nil {
- return nil, errors.Wrapf(err, "listen %s port inside %d and %d", pm.Protocol, start, end)
- }
- runtimePm.HostPort = &portResult.Port
- if runtimePm.Port == -1 {
- runtimePm.Port = portResult.Port
- }
- return runtimePm, nil
- }
- }
|