Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3076,6 +3076,8 @@ def __init__(self):
self.parser._add_version_option()

if __name__ == '__main__':
import multiprocessing
multiprocessing.freeze_support()
defaultencoding = 'utf-8'
if sys.getdefaultencoding() != defaultencoding:
try:
Expand Down
2 changes: 1 addition & 1 deletion _errno.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class InitDirFailedErrorMessage(object):
PERMISSION_DENIED = ': {path} permission denied .'


DOC_LINK = '<DOC_LINK>'
DOC_LINK = ''
DOC_LINK_MSG = 'See {}'.format(DOC_LINK if DOC_LINK else "https://www.oceanbase.com/product/ob-deployer/error-codes .")

# generic error code
Expand Down
12 changes: 9 additions & 3 deletions _plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ def _new_func(
namespace_vars.update(kwargs)
if arg:
idx = 0
params = list(inspect2.signature(method).parameters.keys())[1:-2]
try:
params = list(inspect2.signature(method).parameters.keys())[1:-2]
except (ValueError, TypeError):
params = []
num = min(len(arg), len(params))
while idx < num:
key = params[idx]
Expand Down Expand Up @@ -871,6 +874,9 @@ def __init__(self, home_path, script_name=None, dev_mode=False, stdio=None):
super(PyScriptPluginLoader, self).__init__(home_path, dev_mode=dev_mode, stdio=stdio)

def _create_(self, script_name):
# Use an explicit namespace dict for exec() because Python 3.12+ (PEP 667)
# changed locals() to return a snapshot, so exec() results won't appear in locals().
_ns = {}
exec('''
class %s(PyScriptPlugin):

Expand All @@ -890,8 +896,8 @@ def %s(
repositories, components, clients, cluster_config, cmd,
options, stdio, *arg, **kwargs):
pass
''' % (self.PLUGIN_TYPE.value, script_name, script_name, self.PLUGIN_TYPE.value, self.PLUGIN_TYPE.value, script_name))
clz = locals()[self.PLUGIN_TYPE.value]
''' % (self.PLUGIN_TYPE.value, script_name, script_name, self.PLUGIN_TYPE.value, self.PLUGIN_TYPE.value, script_name), globals(), _ns)
clz = _ns[self.PLUGIN_TYPE.value]
setattr(sys.modules[__name__], self.PLUGIN_TYPE.value, clz)
clz.set_plugin_type(self.PLUGIN_TYPE)
return clz
Expand Down
6 changes: 5 additions & 1 deletion _stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,11 @@ def decorated(func):
is_bond_method = True
_type = type(func)
func = func.__func__
all_parameters = inspect2.signature(func).parameters
try:
all_parameters = inspect2.signature(func).parameters
except (ValueError, TypeError):
# Builtin slot wrappers (e.g. object.__init__) can't be inspected on Python 3.13+
return _type(func) if is_bond_method else func
if "stdio" in all_parameters:
default_stdio_in_params = all_parameters["stdio"].default
if not isinstance(default_stdio_in_params, Parameter.empty):
Expand Down
7 changes: 5 additions & 2 deletions _workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ def __init__(self, home_path, workflow_name=None, dev_mode=False, stdio=None):
self.workflow_name = workflow_name

def _create_(self, workflow_name):
# Use an explicit namespace dict for exec() because Python 3.12+ (PEP 667)
# changed locals() to return a snapshot, so exec() results won't appear in locals().
_ns = {}
exec('''
class %s(PyScriptPlugin):

Expand All @@ -237,8 +240,8 @@ def %s(
repositories, components, clients, cluster_config, cmd,
options, stdio, *arg, **kwargs):
pass
''' % (self.PLUGIN_TYPE.value, workflow_name, workflow_name, self.PLUGIN_TYPE.value, self.PLUGIN_TYPE.value, workflow_name))
clz = locals()[self.PLUGIN_TYPE.value]
''' % (self.PLUGIN_TYPE.value, workflow_name, workflow_name, self.PLUGIN_TYPE.value, self.PLUGIN_TYPE.value, workflow_name), globals(), _ns)
clz = _ns[self.PLUGIN_TYPE.value]
setattr(sys.modules[__name__], self.PLUGIN_TYPE.value, clz)
clz.set_plugin_type(self.PLUGIN_TYPE)
return clz
Expand Down
1 change: 1 addition & 0 deletions plugins-requirements3.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ bcrypt==4.0.0
configparser>=5.2.0
urllib3==2.5.0
influxdb==5.3.2
PyYAML>=6.0
obshell
6 changes: 5 additions & 1 deletion plugins/general/0.1/install_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import absolute_import, division, print_function

import os
import platform
import re

from _plugin import InstallPlugin
Expand Down Expand Up @@ -158,7 +159,10 @@ def check_lib():
for file_item in check_file_map.values():
if file_item.type == InstallPlugin.FileItemType.BIN:
remote_file_path = os.path.join(remote_home_path, file_item.target_path)
ret = client.execute_command('ldd %s' % remote_file_path)
if platform.system() == 'Darwin':
ret = client.execute_command('otool -L %s' % remote_file_path)
else:
ret = client.execute_command('ldd %s' % remote_file_path)
libs = re.findall('(/?[\w+\-/]+\.\w+[\.\w]+)[\s\\n]*\=\>[\s\\n]*not found', ret.stdout)
if not libs:
libs = re.findall('(/?[\w+\-/]+\.\w+[\.\w]+)[\s\\n]*\=\>[\s\\n]*not found', ret.stderr)
Expand Down
45 changes: 34 additions & 11 deletions plugins/mysqltest/3.1.0/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import os
import time
import shlex
import platform
import requests
import urllib
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE, TimeoutExpired
from copy import deepcopy
from ssh import LocalClient
from tool import DirectoryUtil
Expand Down Expand Up @@ -380,20 +381,42 @@ def return_true(**kw):
LocalClient.execute_command('%s "alter system set _enable_static_typing_engine = %s;select sleep(2);"' % (exec_sql_cmd, opt['_enable_static_typing_engine']), stdio=stdio)

start_time = time.time()
cmd = 'timeout %s %s %s' % (case_timeout, mysqltest_bin, str(Arguments(opt)))
IS_DARWIN = platform.system() == 'Darwin'
if IS_DARWIN:
# macOS does not have GNU timeout; use Python subprocess timeout instead
cmd = '%s %s' % (mysqltest_bin, str(Arguments(opt)))
else:
cmd = 'timeout %s %s %s' % (case_timeout, mysqltest_bin, str(Arguments(opt)))
try:
stdio.verbose('local execute: %s ' % cmd)
p = Popen(shlex.split(cmd), env=test_env, stdout=PIPE, stderr=PIPE)
output, errput = p.communicate()
retcode = p.returncode
if retcode == 124:
output = ''
if 'source_limit' in opt and 'g.buffer' in opt['source_limit']:
errput = "%s secs out of soft limit (%s secs), sql may be hung, please check" % (opt['source_limit']['g.buffer'], case_timeout)
if IS_DARWIN:
try:
output, errput = p.communicate(timeout=case_timeout)
except TimeoutExpired:
p.kill()
output, errput = p.communicate()
retcode = 124 # mimic GNU timeout exit code
output = ''
if 'source_limit' in opt and 'g.buffer' in opt['source_limit']:
errput = "%s secs out of soft limit (%s secs), sql may be hung, please check" % (opt['source_limit']['g.buffer'], case_timeout)
else:
errput = "%s seconds timeout, sql may be hung, please check" % case_timeout
else:
errput = "%s seconds timeout, sql may be hung, please check" % case_timeout
elif isinstance(errput, bytes):
errput = errput.decode(errors='replace')
retcode = p.returncode
if isinstance(errput, bytes):
errput = errput.decode(errors='replace')
else:
output, errput = p.communicate()
retcode = p.returncode
if retcode == 124:
output = ''
if 'source_limit' in opt and 'g.buffer' in opt['source_limit']:
errput = "%s secs out of soft limit (%s secs), sql may be hung, please check" % (opt['source_limit']['g.buffer'], case_timeout)
else:
errput = "%s seconds timeout, sql may be hung, please check" % case_timeout
elif isinstance(errput, bytes):
errput = errput.decode(errors='replace')
except Exception as e:
errput = str(e)
output = ''
Expand Down
8 changes: 7 additions & 1 deletion plugins/seekdb/1.0.0/environment_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
from __future__ import absolute_import, division, print_function

import os
import platform
import re

import _errno as err
from _arch import getBaseArch
from _rpm import Version
from tool import get_port_socket_inode, contains_duplicate_nodes

IS_DARWIN = platform.system() == 'Darwin'


def environment_check(plugin_context, work_dir_empty_check=True, generate_configs={}, *args, **kwargs):
cluster_config = plugin_context.cluster_config
Expand Down Expand Up @@ -140,7 +143,10 @@ def environment_check(plugin_context, work_dir_empty_check=True, generate_config

basearch = getBaseArch()
stdio.verbose("basearch: %s" % basearch)
if 'x86' in basearch and len(re.findall(r'(^avx\s+)|(\s+avx\s+)|(\s+avx$)', client.execute_command('lscpu | grep avx').stdout)) == 0:
if IS_DARWIN:
# macOS does not have lscpu; Apple Silicon natively supports atomics and AVX is N/A.
stdio.verbose('Skip CPU instruction set check on macOS')
elif 'x86' in basearch and len(re.findall(r'(^avx\s+)|(\s+avx\s+)|(\s+avx$)', client.execute_command('lscpu | grep avx').stdout)) == 0:
if not (Version('4.2.5.6') <= repository.version < Version('4.3.0.0') or Version('4.3.5.4') <= repository.version < Version('4.4.0.0') or Version('4.4.1.0') <= repository.version):
critical(server, 'cpu', err.EC_CPU_NOT_SUPPORT_INSTRUCTION_SET.format(server=server, instruction_set='avx'), [err.SUG_CHANGE_SERVER.format()])
elif ('arm' in basearch or 'aarch' in basearch) and len(re.findall(r'(^atomics\s+)|(\s+atomics\s+)|(\s+atomics$)', client.execute_command('lscpu | grep atomics').stdout)) == 0 and 'nonlse' not in repository.release:
Expand Down
67 changes: 48 additions & 19 deletions plugins/seekdb/1.0.0/generate_general_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@

from __future__ import absolute_import, division, print_function

import platform
import re
import os

from _types import Capacity
from _errno import EC_OBSERVER_NOT_ENOUGH_MEMORY_ALAILABLE, EC_OBSERVER_NOT_ENOUGH_MEMORY_CACHED, EC_OBSERVER_GET_MEMINFO_FAIL
import _errno as err

IS_DARWIN = platform.system() == 'Darwin'


def get_system_memory(memory_limit):
if memory_limit < 12 << 30:
Expand Down Expand Up @@ -102,22 +105,44 @@ def generate_general_config(plugin_context, generate_config_mini=False, auto_dep
min_pool_memory = server_config['__min_full_resource_pool_memory']
min_memory = max(system_memory, MIN_MEMORY)
if ip not in ip_server_memory_info:
ret = client.execute_command('cat /proc/meminfo')
if ret:
ip_server_memory_info[ip] = server_memory_stats = {}
memory_key_map = {
'MemTotal': 'total',
'MemFree': 'free',
'MemAvailable': 'available',
'Buffers': 'buffers',
'Cached': 'cached'
}
for key in memory_key_map:
server_memory_stats[memory_key_map[key]] = 0
for k, v in re.findall('(\w+)\s*:\s*(\d+\s*\w+)', ret.stdout):
if k in memory_key_map:
key = memory_key_map[k]
server_memory_stats[key] = Capacity(str(v)).bytes
if IS_DARWIN:
ret = client.execute_command('sysctl hw.memsize')
if ret:
try:
total_mem = int(re.findall(r'hw\.memsize:\s*(\d+)', ret.stdout)[0])
vm_ret = client.execute_command('vm_stat')
page_size = 16384
ps_match = re.search(r'page size of (\d+) bytes', vm_ret.stdout) if vm_ret else None
if ps_match:
page_size = int(ps_match.group(1))
free_pages = int(re.findall(r'Pages free:\s+(\d+)', vm_ret.stdout)[0]) if vm_ret else 0
inactive_pages = int(re.findall(r'Pages inactive:\s+(\d+)', vm_ret.stdout)[0]) if vm_ret else 0
ip_server_memory_info[ip] = server_memory_stats = {
'total': total_mem,
'free': free_pages * page_size,
'available': (free_pages + inactive_pages) * page_size,
'buffers': 0,
'cached': inactive_pages * page_size
}
except Exception:
stdio.exception('Failed to parse macOS memory info')
else:
ret = client.execute_command('cat /proc/meminfo')
if ret:
ip_server_memory_info[ip] = server_memory_stats = {}
memory_key_map = {
'MemTotal': 'total',
'MemFree': 'free',
'MemAvailable': 'available',
'Buffers': 'buffers',
'Cached': 'cached'
}
for key in memory_key_map:
server_memory_stats[memory_key_map[key]] = 0
for k, v in re.findall('(\w+)\s*:\s*(\d+\s*\w+)', ret.stdout):
if k in memory_key_map:
key = memory_key_map[k]
server_memory_stats[key] = Capacity(str(v)).bytes

if user_server_config.get('memory_limit_percentage'):
if ip in ip_server_memory_info:
Expand Down Expand Up @@ -180,7 +205,10 @@ def generate_general_config(plugin_context, generate_config_mini=False, auto_dep

# cpu
if not server_config.get('cpu_count'):
ret = client.execute_command("grep -e 'processor\s*:' /proc/cpuinfo | wc -l")
if IS_DARWIN:
ret = client.execute_command("sysctl -n hw.ncpu")
else:
ret = client.execute_command("grep -e 'processor\s*:' /proc/cpuinfo | wc -l")
if ret and ret.stdout.strip().isdigit():
cpu_num = int(ret.stdout)
server_config['cpu_count'] = max(MIN_CPU_COUNT, int(cpu_num - 2))
Expand All @@ -196,7 +224,8 @@ def generate_general_config(plugin_context, generate_config_mini=False, auto_dep
log_disk_size = Capacity(server_config.get('log_disk_size', 0)).bytes
if not server_config.get('datafile_size') or not server_config.get('log_disk_size'):
disk = {'/': 0}
ret = client.execute_command('df --block-size=1024')
df_cmd = 'df -Pk' if IS_DARWIN else 'df --block-size=1024'
ret = client.execute_command(df_cmd)
if ret:
for total, used, avail, puse, path in re.findall('(\d+)\s+(\d+)\s+(\d+)\s+(\d+%)\s+(.+)', ret.stdout):
disk[path] = {
Expand All @@ -206,7 +235,7 @@ def generate_general_config(plugin_context, generate_config_mini=False, auto_dep
}
for include_dir in dirs.values():
while include_dir not in disk:
ret = client.execute_command('df --block-size=1024 %s' % include_dir)
ret = client.execute_command('%s %s' % (df_cmd, include_dir))
if ret:
for total, used, avail, puse, path in re.findall('(\d+)\s+(\d+)\s+(\d+)\s+(\d+%)\s+(.+)', ret.stdout):
disk[path] = {
Expand Down
2 changes: 1 addition & 1 deletion plugins/seekdb/1.0.0/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def health_check(plugin_context, *args, **kwargs):
remote_pid_path = '%s/run/seekdb.pid' % home_path
stdio.verbose('%s program health check' % server)
remote_pid = client.execute_command('cat %s' % remote_pid_path).stdout.strip()
if remote_pid and client.execute_command('ls /proc/%s' % remote_pid):
if remote_pid and client.execute_command('ps -p %s' % remote_pid):
stdio.verbose('%s seekdb[pid: %s] started', server, remote_pid)
else:
failed.append(EC_OBSERVER_FAIL_TO_START.format(server=server))
Expand Down
9 changes: 7 additions & 2 deletions plugins/seekdb/1.0.0/init_pre.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

from __future__ import absolute_import, division, print_function

import platform

IS_DARWIN = platform.system() == 'Darwin'


def init_pre(plugin_context, *args, **kwargs):
data_dir_same_redo_dir_keys = ['home_path', 'data_dir', 'clog_dir', 'slog_dir']
Expand Down Expand Up @@ -48,11 +52,12 @@ def rm_meta(client, home_path, critical, EC_FAIL_TO_INIT_PATH, server, InitDirFa
def same_disk_check(stdio, client, server_config, critical, EC_FAIL_TO_INIT_PATH, server, *args, **kwargs):
stdio.verbose("check slog dir in the same disk with data dir")
slog_disk = data_disk = None
ret = client.execute_command("df --block-size=1024 %s | awk 'NR == 2 { print $1 }'" % server_config['slog_dir'])
df_cmd = 'df -Pk' if IS_DARWIN else 'df --block-size=1024'
ret = client.execute_command("%s %s | awk 'NR == 2 { print $1 }'" % (df_cmd, server_config['slog_dir']))
if ret:
slog_disk = ret.stdout.strip()
stdio.verbose('slog disk is {}'.format(slog_disk))
ret = client.execute_command("df --block-size=1024 %s | awk 'NR == 2 { print $1 }'" % server_config['data_dir'])
ret = client.execute_command("%s %s | awk 'NR == 2 { print $1 }'" % (df_cmd, server_config['data_dir']))
if ret:
data_disk = ret.stdout.strip()
stdio.verbose('data disk is {}'.format(data_disk))
Expand Down
9 changes: 8 additions & 1 deletion plugins/seekdb/1.0.0/parameter_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
from __future__ import absolute_import, division, print_function

import os
import platform

import _errno as err
from _types import Capacity

IS_DARWIN = platform.system() == 'Darwin'


def parameter_check(plugin_context, generate_configs={}, *args, **kwargs):
cluster_config = plugin_context.cluster_config
Expand Down Expand Up @@ -109,7 +112,11 @@ def parameter_check(plugin_context, generate_configs={}, *args, **kwargs):

devname = server_config.get('devname')
if devname:
if not client.execute_command("grep -e '^ *%s:' /proc/net/dev" % devname):
if IS_DARWIN:
devname_check_cmd = "ifconfig %s 2>/dev/null" % devname
else:
devname_check_cmd = "grep -e '^ *%s:' /proc/net/dev" % devname
if not client.execute_command(devname_check_cmd):
suggest = err.SUG_NO_SUCH_NET_DEVIC.format(ip=ip)
suggest.auto_fix = 'devname' not in global_generate_config and 'devname' not in server_generate_config
critical(server, 'net', err.EC_NO_SUCH_NET_DEVICE.format(server=server, devname=devname), suggests=[suggest])
Expand Down
Loading