| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- #!/usr/bin/env python3
- # encoding: utf-8
- from __future__ import unicode_literals
- import json
- import re
- from getpass import getuser
- from .ssh import SSHClient
- from . import k8s
- from .k3s import is_using_k3s
- from .ocboot import GROUP_PRIMARY_MASTER_NODE, GROUP_MASTER_NODES, GROUP_WORKER_NODES
- from .color import GB
- A_OCBOOT_UPGRADE_CURRENT_VERSION = 'upgrade.ocboot.yunion.io/current-version'
- def construct_cluster(primary_master_host, ssh_user, ssh_private_file, ssh_port):
- cli = SSHClient(
- primary_master_host,
- ssh_user,
- ssh_private_file,
- ssh_port,
- )
- cluster = OnecloudCluster(cli)
- return cluster
- def json_trim(str_2_replace):
- regex = re.compile(r"^[^{]+|[^}]+$", re.DOTALL)
- return regex.sub("", str_2_replace)
- class OnecloudCluster(object):
- def use_sudo(self):
- return getuser() != 'root'
- def is_using_k3s(self):
- return is_using_k3s(self.ssh_client, self.use_sudo())
- def __init__(self, ssh_client):
- self.ssh_client = ssh_client
- k3s_cmd_placeholder = 'k3s' if self.is_using_k3s() else ''
- ret = ssh_client.exec_command(
- f'{k3s_cmd_placeholder} kubectl -n onecloud get onecloudclusters default -o json', self.use_sudo())
- print(GB(f'{k3s_cmd_placeholder} kubectl -n onecloud get onecloudclusters default -o json'))
- try:
- cluster = json.loads(ret)
- except ValueError:
- print("load json return error: %s" % ret)
- ret = json_trim(ret)
- cluster = json.loads(ret)
- self.cluster = k8s.Resource(cluster)
- self.k8s_nodes = None
- self.primary_master_node = None
- self.master_nodes = None
- self.worker_nodes = None
- self._construct_nodes()
- def get_cluster_controlplane_host(self):
- endpoint = self.get_spec().get('loadBalancerEndpoint')
- if endpoint:
- return endpoint
- return self.ssh_client.get_host()
- def get_primary_master_node_ip(self):
- return self.primary_master_node.get_ip()
- def get_metadata(self):
- return self.cluster.get_metadata()
- def get_annotations(self):
- return self.cluster.get_annotations()
- def get_spec(self):
- return self.cluster.get_spec()
- def get_image_repository(self):
- img_repo = self.get_spec().get('imageRepository')
- return img_repo
- def get_repository(self):
- img_repo = self.get_image_repository()
- if img_repo is None:
- raise Exception("Not found imageRepository from cluster spec")
- import re
- IPADDR_REG_PATTERN = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:'
- IPADDR_REG = re.compile(IPADDR_REG_PATTERN)
- parts = img_repo.split('/')
- repo = parts[0]
- return (repo, IPADDR_REG.match(repo) is not None)
- def get_current_version(self):
- version = self.get_annotations().get(A_OCBOOT_UPGRADE_CURRENT_VERSION, None)
- if version:
- return version
- return self.get_spec().get('version')
- def _construct_nodes(self):
- k3s_cmd_placeholder = 'k3s' if self.is_using_k3s() else ''
- print(GB(f'{k3s_cmd_placeholder} kubectl get nodes -o json'))
- k8s_nodes = json.loads(self.ssh_client.exec_command(f'{k3s_cmd_placeholder} kubectl get nodes -o json', self.use_sudo())).get('items')
- self.k8s_nodes = [k8s.Node(obj) for obj in k8s_nodes]
- self.master_nodes = [node for node in self.k8s_nodes if node.is_master()]
- self.worker_nodes = [node for node in self.k8s_nodes if not node.is_master()]
- self.primary_master_node = self.find_primary_master_node(self.master_nodes)
- def find_primary_master_node(self, master_nodes):
- master_nodes.sort(key=lambda node: node.creationTimestamp())
- p_m_node = master_nodes[0]
- master_nodes.remove(p_m_node)
- return p_m_node
- def find_node_by_ip_or_hostname(self, ip_hostname):
- k = ip_hostname
- for node in self.k8s_nodes:
- if node.get_hostname() == k or node.get_ip() == k:
- return node
- return None
- def generate_playbook_inventory(self, bastion_host=None, master_port=22, node_port=22):
- inventory = AnsibleInventory()
- def add_i(node):
- if bastion_host:
- node.with_bastion(bastion_host)
- inventory.add(node)
- add_i(AnsiblePrimaryMasterHost(self.primary_master_node, port=master_port))
- for node in self.master_nodes:
- add_i(AnsibleMasterHost(node, port=master_port))
- for node in self.worker_nodes:
- add_i(AnsibleWorkerHost(node, port=node_port))
- return inventory.generate_content()
- def set_current_version(self, version):
- k3s_cmd_placeholder = 'k3s' if self.is_using_k3s() else ''
- cmd = f'{k3s_cmd_placeholder} kubectl -n onecloud annotate --overwrite=true onecloudclusters default {A_OCBOOT_UPGRADE_CURRENT_VERSION}={version}'
- print(GB(cmd))
- self.ssh_client.exec_command(cmd, self.use_sudo())
- class AnsibleInventory(object):
- def __init__(self):
- self.all_hosts = []
- self.primary_master_host = None
- self.master_hosts = []
- self.worker_hosts = []
- def _append(self, hosts, host):
- for a_host in hosts:
- if host.get_hostname() == a_host.get_hostname():
- return
- hosts.append(host)
- def _add(self, host):
- self._append(self.all_hosts, host)
- role = host.get_role()
- if role == GROUP_PRIMARY_MASTER_NODE:
- self.primary_master_host = host
- elif role == GROUP_MASTER_NODES:
- self._append(self.master_hosts, host)
- elif role == GROUP_WORKER_NODES:
- self._append(self.worker_hosts, host)
- else:
- raise Exception("Unsupported role %s" % role)
- def add(self, *hosts):
- for host in hosts:
- self._add(host)
- def generate_content(self):
- ret = ['[all]']
- ret.extend([host.get_content() for host in self.all_hosts])
- ret.append('[%s]' % GROUP_PRIMARY_MASTER_NODE)
- ret.append(self.primary_master_host.get_hostname())
- ret.append('[%s]' % GROUP_MASTER_NODES)
- ret.extend([host.get_hostname() for host in self.master_hosts])
- ret.append('[%s]' % GROUP_WORKER_NODES)
- ret.extend([host.get_hostname() for host in self.worker_hosts])
- return '\n'.join(ret)
- class ansibleHost(object):
- def __init__(self, node, role, user=getuser(), port=22):
- self.hostname = node.get_hostname()
- self.ip = node.get_ip()
- self.role = role
- self.user = user
- self.bastion_host = None
- self.port = port
- def get_hostname(self):
- return self.hostname
- def get_ip(self):
- return self.ip
- def get_role(self):
- return self.role
- def with_bastion(self, bastion_host):
- self.bastion_host = bastion_host
- return self
- def get_content(self):
- config = '%s ansible_ssh_host=%s ansible_host=%s ansible_ssh_user=%s ansible_user=%s ansible_ssh_port=%s ansible_port=%s' % (
- self.hostname,
- self.ip,
- self.ip,
- self.user,
- self.user,
- self.port,
- self.port)
- if self.user != 'root':
- config += " ansible_become=yes"
- if self.bastion_host:
- config += " ansible_ssh_common_args='%s'" % self.bastion_host.to_option()
- return config
- class AnsiblePrimaryMasterHost(ansibleHost):
- def __init__(self, node, user=getuser(), port=22):
- super(AnsiblePrimaryMasterHost, self).__init__(
- node, GROUP_PRIMARY_MASTER_NODE, user, port)
- class AnsibleMasterHost(ansibleHost):
- def __init__(self, node, user=getuser(), port=22):
- super(AnsibleMasterHost, self).__init__(
- node, GROUP_MASTER_NODES, user, port)
- class AnsibleWorkerHost(ansibleHost):
- def __init__(self, node, user=getuser(), port=22):
- super(AnsibleWorkerHost, self).__init__(
- node, GROUP_WORKER_NODES, user, port)
|