cluster.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #!/usr/bin/env python3
  2. # encoding: utf-8
  3. from __future__ import unicode_literals
  4. import json
  5. import re
  6. from getpass import getuser
  7. from .ssh import SSHClient
  8. from . import k8s
  9. from .k3s import is_using_k3s
  10. from .ocboot import GROUP_PRIMARY_MASTER_NODE, GROUP_MASTER_NODES, GROUP_WORKER_NODES
  11. from .color import GB
  12. A_OCBOOT_UPGRADE_CURRENT_VERSION = 'upgrade.ocboot.yunion.io/current-version'
  13. def construct_cluster(primary_master_host, ssh_user, ssh_private_file, ssh_port):
  14. cli = SSHClient(
  15. primary_master_host,
  16. ssh_user,
  17. ssh_private_file,
  18. ssh_port,
  19. )
  20. cluster = OnecloudCluster(cli)
  21. return cluster
  22. def json_trim(str_2_replace):
  23. regex = re.compile(r"^[^{]+|[^}]+$", re.DOTALL)
  24. return regex.sub("", str_2_replace)
  25. class OnecloudCluster(object):
  26. def use_sudo(self):
  27. return getuser() != 'root'
  28. def is_using_k3s(self):
  29. return is_using_k3s(self.ssh_client, self.use_sudo())
  30. def __init__(self, ssh_client):
  31. self.ssh_client = ssh_client
  32. k3s_cmd_placeholder = 'k3s' if self.is_using_k3s() else ''
  33. ret = ssh_client.exec_command(
  34. f'{k3s_cmd_placeholder} kubectl -n onecloud get onecloudclusters default -o json', self.use_sudo())
  35. print(GB(f'{k3s_cmd_placeholder} kubectl -n onecloud get onecloudclusters default -o json'))
  36. try:
  37. cluster = json.loads(ret)
  38. except ValueError:
  39. print("load json return error: %s" % ret)
  40. ret = json_trim(ret)
  41. cluster = json.loads(ret)
  42. self.cluster = k8s.Resource(cluster)
  43. self.k8s_nodes = None
  44. self.primary_master_node = None
  45. self.master_nodes = None
  46. self.worker_nodes = None
  47. self._construct_nodes()
  48. def get_cluster_controlplane_host(self):
  49. endpoint = self.get_spec().get('loadBalancerEndpoint')
  50. if endpoint:
  51. return endpoint
  52. return self.ssh_client.get_host()
  53. def get_primary_master_node_ip(self):
  54. return self.primary_master_node.get_ip()
  55. def get_metadata(self):
  56. return self.cluster.get_metadata()
  57. def get_annotations(self):
  58. return self.cluster.get_annotations()
  59. def get_spec(self):
  60. return self.cluster.get_spec()
  61. def get_image_repository(self):
  62. img_repo = self.get_spec().get('imageRepository')
  63. return img_repo
  64. def get_repository(self):
  65. img_repo = self.get_image_repository()
  66. if img_repo is None:
  67. raise Exception("Not found imageRepository from cluster spec")
  68. import re
  69. IPADDR_REG_PATTERN = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:'
  70. IPADDR_REG = re.compile(IPADDR_REG_PATTERN)
  71. parts = img_repo.split('/')
  72. repo = parts[0]
  73. return (repo, IPADDR_REG.match(repo) is not None)
  74. def get_current_version(self):
  75. version = self.get_annotations().get(A_OCBOOT_UPGRADE_CURRENT_VERSION, None)
  76. if version:
  77. return version
  78. return self.get_spec().get('version')
  79. def _construct_nodes(self):
  80. k3s_cmd_placeholder = 'k3s' if self.is_using_k3s() else ''
  81. print(GB(f'{k3s_cmd_placeholder} kubectl get nodes -o json'))
  82. k8s_nodes = json.loads(self.ssh_client.exec_command(f'{k3s_cmd_placeholder} kubectl get nodes -o json', self.use_sudo())).get('items')
  83. self.k8s_nodes = [k8s.Node(obj) for obj in k8s_nodes]
  84. self.master_nodes = [node for node in self.k8s_nodes if node.is_master()]
  85. self.worker_nodes = [node for node in self.k8s_nodes if not node.is_master()]
  86. self.primary_master_node = self.find_primary_master_node(self.master_nodes)
  87. def find_primary_master_node(self, master_nodes):
  88. master_nodes.sort(key=lambda node: node.creationTimestamp())
  89. p_m_node = master_nodes[0]
  90. master_nodes.remove(p_m_node)
  91. return p_m_node
  92. def find_node_by_ip_or_hostname(self, ip_hostname):
  93. k = ip_hostname
  94. for node in self.k8s_nodes:
  95. if node.get_hostname() == k or node.get_ip() == k:
  96. return node
  97. return None
  98. def generate_playbook_inventory(self, bastion_host=None, master_port=22, node_port=22):
  99. inventory = AnsibleInventory()
  100. def add_i(node):
  101. if bastion_host:
  102. node.with_bastion(bastion_host)
  103. inventory.add(node)
  104. add_i(AnsiblePrimaryMasterHost(self.primary_master_node, port=master_port))
  105. for node in self.master_nodes:
  106. add_i(AnsibleMasterHost(node, port=master_port))
  107. for node in self.worker_nodes:
  108. add_i(AnsibleWorkerHost(node, port=node_port))
  109. return inventory.generate_content()
  110. def set_current_version(self, version):
  111. k3s_cmd_placeholder = 'k3s' if self.is_using_k3s() else ''
  112. cmd = f'{k3s_cmd_placeholder} kubectl -n onecloud annotate --overwrite=true onecloudclusters default {A_OCBOOT_UPGRADE_CURRENT_VERSION}={version}'
  113. print(GB(cmd))
  114. self.ssh_client.exec_command(cmd, self.use_sudo())
  115. class AnsibleInventory(object):
  116. def __init__(self):
  117. self.all_hosts = []
  118. self.primary_master_host = None
  119. self.master_hosts = []
  120. self.worker_hosts = []
  121. def _append(self, hosts, host):
  122. for a_host in hosts:
  123. if host.get_hostname() == a_host.get_hostname():
  124. return
  125. hosts.append(host)
  126. def _add(self, host):
  127. self._append(self.all_hosts, host)
  128. role = host.get_role()
  129. if role == GROUP_PRIMARY_MASTER_NODE:
  130. self.primary_master_host = host
  131. elif role == GROUP_MASTER_NODES:
  132. self._append(self.master_hosts, host)
  133. elif role == GROUP_WORKER_NODES:
  134. self._append(self.worker_hosts, host)
  135. else:
  136. raise Exception("Unsupported role %s" % role)
  137. def add(self, *hosts):
  138. for host in hosts:
  139. self._add(host)
  140. def generate_content(self):
  141. ret = ['[all]']
  142. ret.extend([host.get_content() for host in self.all_hosts])
  143. ret.append('[%s]' % GROUP_PRIMARY_MASTER_NODE)
  144. ret.append(self.primary_master_host.get_hostname())
  145. ret.append('[%s]' % GROUP_MASTER_NODES)
  146. ret.extend([host.get_hostname() for host in self.master_hosts])
  147. ret.append('[%s]' % GROUP_WORKER_NODES)
  148. ret.extend([host.get_hostname() for host in self.worker_hosts])
  149. return '\n'.join(ret)
  150. class ansibleHost(object):
  151. def __init__(self, node, role, user=getuser(), port=22):
  152. self.hostname = node.get_hostname()
  153. self.ip = node.get_ip()
  154. self.role = role
  155. self.user = user
  156. self.bastion_host = None
  157. self.port = port
  158. def get_hostname(self):
  159. return self.hostname
  160. def get_ip(self):
  161. return self.ip
  162. def get_role(self):
  163. return self.role
  164. def with_bastion(self, bastion_host):
  165. self.bastion_host = bastion_host
  166. return self
  167. def get_content(self):
  168. config = '%s ansible_ssh_host=%s ansible_host=%s ansible_ssh_user=%s ansible_user=%s ansible_ssh_port=%s ansible_port=%s' % (
  169. self.hostname,
  170. self.ip,
  171. self.ip,
  172. self.user,
  173. self.user,
  174. self.port,
  175. self.port)
  176. if self.user != 'root':
  177. config += " ansible_become=yes"
  178. if self.bastion_host:
  179. config += " ansible_ssh_common_args='%s'" % self.bastion_host.to_option()
  180. return config
  181. class AnsiblePrimaryMasterHost(ansibleHost):
  182. def __init__(self, node, user=getuser(), port=22):
  183. super(AnsiblePrimaryMasterHost, self).__init__(
  184. node, GROUP_PRIMARY_MASTER_NODE, user, port)
  185. class AnsibleMasterHost(ansibleHost):
  186. def __init__(self, node, user=getuser(), port=22):
  187. super(AnsibleMasterHost, self).__init__(
  188. node, GROUP_MASTER_NODES, user, port)
  189. class AnsibleWorkerHost(ansibleHost):
  190. def __init__(self, node, user=getuser(), port=22):
  191. super(AnsibleWorkerHost, self).__init__(
  192. node, GROUP_WORKER_NODES, user, port)