blob: 3539efaf99baa1f29a999bf3c10edc496eccc81e [file] [log] [blame]
# SPDX-License-Identifier: GPL-2.0
#
# Runs UML kernel, collects output, and handles errors.
#
# Copyright (C) 2019, Google LLC.
# Author: Felix Guo <felixguoxiuping@gmail.com>
# Author: Brendan Higgins <brendanhiggins@google.com>
import importlib.abc
import importlib.util
import logging
import subprocess
import os
import shlex
import shutil
import signal
import threading
from typing import Iterator, List, Optional, Tuple
import kunit_config
import kunit_parser
import qemu_config
KCONFIG_PATH = '.config'
KUNITCONFIG_PATH = '.kunitconfig'
OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
BROKEN_ALLCONFIG_PATH = 'tools/testing/kunit/configs/broken_on_uml.config'
OUTFILE_PATH = 'test.log'
ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
class ConfigError(Exception):
"""Represents an error trying to configure the Linux kernel."""
class BuildError(Exception):
"""Represents an error trying to build the Linux kernel."""
class LinuxSourceTreeOperations:
"""An abstraction over command line operations performed on a source tree."""
def __init__(self, linux_arch: str, cross_compile: Optional[str]):
self._linux_arch = linux_arch
self._cross_compile = cross_compile
def make_mrproper(self) -> None:
try:
subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
except OSError as e:
raise ConfigError('Could not call make command: ' + str(e))
except subprocess.CalledProcessError as e:
raise ConfigError(e.output.decode())
def make_arch_qemuconfig(self, base_kunitconfig: kunit_config.Kconfig) -> None:
pass
def make_allyesconfig(self, build_dir: str, make_options) -> None:
raise ConfigError('Only the "um" arch is supported for alltests')
def make_olddefconfig(self, build_dir: str, make_options) -> None:
command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
if self._cross_compile:
command += ['CROSS_COMPILE=' + self._cross_compile]
if make_options:
command.extend(make_options)
print('Populating config with:\n$', ' '.join(command))
try:
subprocess.check_output(command, stderr=subprocess.STDOUT)
except OSError as e:
raise ConfigError('Could not call make command: ' + str(e))
except subprocess.CalledProcessError as e:
raise ConfigError(e.output.decode())
def make(self, jobs, build_dir: str, make_options) -> None:
command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
if make_options:
command.extend(make_options)
if self._cross_compile:
command += ['CROSS_COMPILE=' + self._cross_compile]
print('Building with:\n$', ' '.join(command))
try:
proc = subprocess.Popen(command,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL)
except OSError as e:
raise BuildError('Could not call execute make: ' + str(e))
except subprocess.CalledProcessError as e:
raise BuildError(e.output)
_, stderr = proc.communicate()
if proc.returncode != 0:
raise BuildError(stderr.decode())
if stderr: # likely only due to build warnings
print(stderr.decode())
def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
raise RuntimeError('not implemented!')
class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
super().__init__(linux_arch=qemu_arch_params.linux_arch,
cross_compile=cross_compile)
self._kconfig = qemu_arch_params.kconfig
self._qemu_arch = qemu_arch_params.qemu_arch
self._kernel_path = qemu_arch_params.kernel_path
self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
self._extra_qemu_params = qemu_arch_params.extra_qemu_params
def make_arch_qemuconfig(self, base_kunitconfig: kunit_config.Kconfig) -> None:
kconfig = kunit_config.parse_from_string(self._kconfig)
base_kunitconfig.merge_in_entries(kconfig)
def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
kernel_path = os.path.join(build_dir, self._kernel_path)
qemu_command = ['qemu-system-' + self._qemu_arch,
'-nodefaults',
'-m', '1024',
'-kernel', kernel_path,
'-append', ' '.join(params + [self._kernel_command_line]),
'-no-reboot',
'-nographic',
'-serial', 'stdio'] + self._extra_qemu_params
# Note: shlex.join() does what we want, but requires python 3.8+.
print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
return subprocess.Popen(qemu_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True, errors='backslashreplace')
class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
"""An abstraction over command line operations performed on a source tree."""
def __init__(self, cross_compile=None):
super().__init__(linux_arch='um', cross_compile=cross_compile)
def make_allyesconfig(self, build_dir: str, make_options) -> None:
kunit_parser.print_with_timestamp(
'Enabling all CONFIGs for UML...')
command = ['make', 'ARCH=um', 'O=' + build_dir, 'allyesconfig']
if make_options:
command.extend(make_options)
process = subprocess.Popen(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
process.wait()
kunit_parser.print_with_timestamp(
'Disabling broken configs to run KUnit tests...')
with open(get_kconfig_path(build_dir), 'a') as config:
with open(BROKEN_ALLCONFIG_PATH, 'r') as disable:
config.write(disable.read())
kunit_parser.print_with_timestamp(
'Starting Kernel with all configs takes a few minutes...')
def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
"""Runs the Linux UML binary. Must be named 'linux'."""
linux_bin = os.path.join(build_dir, 'linux')
return subprocess.Popen([linux_bin] + params,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True, errors='backslashreplace')
def get_kconfig_path(build_dir: str) -> str:
return os.path.join(build_dir, KCONFIG_PATH)
def get_kunitconfig_path(build_dir: str) -> str:
return os.path.join(build_dir, KUNITCONFIG_PATH)
def get_old_kunitconfig_path(build_dir: str) -> str:
return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
def get_outfile_path(build_dir: str) -> str:
return os.path.join(build_dir, OUTFILE_PATH)
def get_source_tree_ops(arch: str, cross_compile: Optional[str]) -> LinuxSourceTreeOperations:
config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
if arch == 'um':
return LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
if os.path.isfile(config_path):
return get_source_tree_ops_from_qemu_config(config_path, cross_compile)[1]
options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
def get_source_tree_ops_from_qemu_config(config_path: str,
cross_compile: Optional[str]) -> Tuple[
str, LinuxSourceTreeOperations]:
# The module name/path has very little to do with where the actual file
# exists (I learned this through experimentation and could not find it
# anywhere in the Python documentation).
#
# Bascially, we completely ignore the actual file location of the config
# we are loading and just tell Python that the module lives in the
# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
# exists as a file.
module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
spec = importlib.util.spec_from_file_location(module_path, config_path)
assert spec is not None
config = importlib.util.module_from_spec(spec)
# See https://github.com/python/typeshed/pull/2626 for context.
assert isinstance(spec.loader, importlib.abc.Loader)
spec.loader.exec_module(config)
if not hasattr(config, 'QEMU_ARCH'):
raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
params: qemu_config.QemuArchParams = config.QEMU_ARCH # type: ignore
return params.linux_arch, LinuxSourceTreeOperationsQemu(
params, cross_compile=cross_compile)
class LinuxSourceTree:
"""Represents a Linux kernel source tree with KUnit tests."""
def __init__(
self,
build_dir: str,
load_config=True,
kunitconfig_path='',
kconfig_add: Optional[List[str]]=None,
arch=None,
cross_compile=None,
qemu_config_path=None) -> None:
signal.signal(signal.SIGINT, self.signal_handler)
if qemu_config_path:
self._arch, self._ops = get_source_tree_ops_from_qemu_config(
qemu_config_path, cross_compile)
else:
self._arch = 'um' if arch is None else arch
self._ops = get_source_tree_ops(self._arch, cross_compile)
if not load_config:
return
if kunitconfig_path:
if os.path.isdir(kunitconfig_path):
kunitconfig_path = os.path.join(kunitconfig_path, KUNITCONFIG_PATH)
if not os.path.exists(kunitconfig_path):
raise ConfigError(f'Specified kunitconfig ({kunitconfig_path}) does not exist')
else:
kunitconfig_path = get_kunitconfig_path(build_dir)
if not os.path.exists(kunitconfig_path):
shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, kunitconfig_path)
self._kconfig = kunit_config.parse_file(kunitconfig_path)
if kconfig_add:
kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
self._kconfig.merge_in_entries(kconfig)
def arch(self) -> str:
return self._arch
def clean(self) -> bool:
try:
self._ops.make_mrproper()
except ConfigError as e:
logging.error(e)
return False
return True
def validate_config(self, build_dir: str) -> bool:
kconfig_path = get_kconfig_path(build_dir)
validated_kconfig = kunit_config.parse_file(kconfig_path)
if self._kconfig.is_subset_of(validated_kconfig):
return True
invalid = self._kconfig.entries() - validated_kconfig.entries()
message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
'This is probably due to unsatisfied dependencies.\n' \
'Missing: ' + ', '.join([str(e) for e in invalid])
if self._arch == 'um':
message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
'on a different architecture with something like "--arch=x86_64".'
logging.error(message)
return False
def build_config(self, build_dir: str, make_options) -> bool:
kconfig_path = get_kconfig_path(build_dir)
if build_dir and not os.path.exists(build_dir):
os.mkdir(build_dir)
try:
self._ops.make_arch_qemuconfig(self._kconfig)
self._kconfig.write_to_file(kconfig_path)
self._ops.make_olddefconfig(build_dir, make_options)
except ConfigError as e:
logging.error(e)
return False
if not self.validate_config(build_dir):
return False
old_path = get_old_kunitconfig_path(build_dir)
if os.path.exists(old_path):
os.remove(old_path) # write_to_file appends to the file
self._kconfig.write_to_file(old_path)
return True
def _kunitconfig_changed(self, build_dir: str) -> bool:
old_path = get_old_kunitconfig_path(build_dir)
if not os.path.exists(old_path):
return True
old_kconfig = kunit_config.parse_file(old_path)
return old_kconfig.entries() != self._kconfig.entries()
def build_reconfig(self, build_dir: str, make_options) -> bool:
"""Creates a new .config if it is not a subset of the .kunitconfig."""
kconfig_path = get_kconfig_path(build_dir)
if not os.path.exists(kconfig_path):
print('Generating .config ...')
return self.build_config(build_dir, make_options)
existing_kconfig = kunit_config.parse_file(kconfig_path)
self._ops.make_arch_qemuconfig(self._kconfig)
if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
return True
print('Regenerating .config ...')
os.remove(kconfig_path)
return self.build_config(build_dir, make_options)
def build_kernel(self, alltests, jobs, build_dir: str, make_options) -> bool:
try:
if alltests:
self._ops.make_allyesconfig(build_dir, make_options)
self._ops.make_olddefconfig(build_dir, make_options)
self._ops.make(jobs, build_dir, make_options)
except (ConfigError, BuildError) as e:
logging.error(e)
return False
return self.validate_config(build_dir)
def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]:
if not args:
args = []
args.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
if filter_glob:
args.append('kunit.filter_glob='+filter_glob)
process = self._ops.start(args, build_dir)
assert process.stdout is not None # tell mypy it's set
# Enforce the timeout in a background thread.
def _wait_proc():
try:
process.wait(timeout=timeout)
except Exception as e:
print(e)
process.terminate()
process.wait()
waiter = threading.Thread(target=_wait_proc)
waiter.start()
output = open(get_outfile_path(build_dir), 'w')
try:
# Tee the output to the file and to our caller in real time.
for line in process.stdout:
output.write(line)
yield line
# This runs even if our caller doesn't consume every line.
finally:
# Flush any leftover output to the file
output.write(process.stdout.read())
output.close()
process.stdout.close()
waiter.join()
subprocess.call(['stty', 'sane'])
def signal_handler(self, unused_sig, unused_frame) -> None:
logging.error('Build interruption occurred. Cleaning console.')
subprocess.call(['stty', 'sane'])