Initial integration test suite.

Installs all requirements and then runs the testsuite in a VirtualBox VM.
looper
Ben Niemann 2017-10-12 17:12:28 +02:00
parent ab6b9659d2
commit 9df7dee0d6
5 changed files with 774 additions and 2 deletions

6
.gitignore vendored
View File

@ -1,4 +1,6 @@
__pycache__/
build/
ENV/
/build
/3rdparty/*/build
/vmtests
/ENV
*~

79
noisidev/runvmtests.py Normal file
View File

@ -0,0 +1,79 @@
#!/usr/bin/env python3
import argparse
import os
import os.path
import sys
from .vmtests import ubuntu_16_04
VMs = {}
def register_vm(vm):
assert vm.name not in VMs
VMs[vm.name] = vm
register_vm(ubuntu_16_04.Ubuntu_16_04())
class TestSettings(object):
def __init__(self, args):
self.branch = args.branch
self.source = args.source
self.clean_snapshot = args.clean_snapshot
self.shutdown = args.shutdown
def bool_arg(value):
if isinstance(value, bool):
return value
if isinstance(value, str):
if value.lower() in ('true', 'y', 'yes', 'on', '1'):
return True
if value.lower() in ('false', 'n', 'no', 'off', '0'):
return False
raise ValueError("Invalid value '%s'." % value)
raise TypeError("Invalid type '%s'." % type(value).__name__)
def main(argv):
argparser = argparse.ArgumentParser()
argparser.add_argument('--deletevm', type=str, default=None)
argparser.add_argument('--source', type=str, choices=['local', 'git'], default='local')
argparser.add_argument('--branch', type=str, default='master')
argparser.add_argument('--clean-snapshot', type=bool_arg, default=True)
argparser.add_argument('--shutdown', type=bool_arg, default=True)
args = argparser.parse_args(argv[1:])
if args.deletevm is not None:
try:
vm = VMs[args.deletevm]
except KeyError:
print("Invalid distribution name.")
return 1
vm.delete()
return 0
settings = TestSettings(args)
results = {}
for _, vm in sorted(VMs.items()):
results[vm.name] = vm.runtest(settings)
if not all(results.values()):
print()
print('-' * 96)
print("%d/%d tests FAILED." % (
sum(1 for success in results.values() if not success), len(results)))
for vm, success in sorted(results.items(), key=lambda i: i[0]):
print("%s... %s" % (vm, 'SUCCESS' if success else 'FAILED'))
return 1
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))

View File

View File

@ -0,0 +1,401 @@
#!/usr/bin/python3
import os
import os.path
import shutil
import subprocess
import sys
import paramiko
from . import vm
PRESEED_TEMPLATE = r'''
### Localization
d-i debian-installer/locale string en_US.UTF-8
d-i localechooser/supported-locales multiselect en_US.UTF-8, de_DE.UTF-8
d-i console-setup/ask_detect boolean false
d-i keyboard-configuration/xkb-keymap select {keyboard_layout}
### Network configuration
d-i netcfg/choose_interface select auto
d-i netcfg/hostname string {hostname}
d-i netcfg/get_hostname string {hostname}
d-i netcfg/get_domain string unnamed
d-i hw-detect/load_firmware boolean true
### Mirror settings
d-i mirror/country string manual
d-i mirror/http/hostname string archive.ubuntu.com
d-i mirror/http/directory string /ubuntu
d-i mirror/http/proxy string
### Account setup
d-i passwd/root-login boolean false
d-i passwd/make-user boolean true
d-i passwd/user-fullname string Tester
d-i passwd/username string {username}
d-i passwd/user-password password {password}
d-i passwd/user-password-again password {password}
d-i user-setup/allow-password-weak boolean true
d-i user-setup/encrypt-home boolean false
### Clock and time zone setup
d-i clock-setup/utc boolean true
d-i time/zone string {timezone}
d-i clock-setup/ntp boolean true
d-i clock-setup/ntp-server string ntp.ubuntu.com
### Partitioning
d-i preseed/early_command string umount /media || true
d-i partman-auto/method string lvm
d-i partman-auto-lvm/guided_size string max
d-i partman-lvm/device_remove_lvm boolean true
d-i partman-lvm/confirm boolean true
d-i partman-lvm/confirm_nooverwrite boolean true
d-i partman-auto-lvm/new_vg_name string main
d-i partman-md/device_remove_md boolean true
d-i partman-md/confirm boolean true
d-i partman-partitioning/confirm_write_new_label boolean true
d-i partman/choose_partition select finish
d-i partman/confirm boolean true
d-i partman/confirm_nooverwrite boolean true
d-i partman-basicmethods/method_only boolean false
### Disk layout
d-i partman-auto/expert_recipe string \
boot-root :: \
512 512 512 ext4 \
$primary{{ }} \
$bootable{{ }} \
method{{ format }} format{{ }} \
use_filesystem{{ }} filesystem{{ ext4 }} \
mountpoint{{ /boot }} \
. \
1024 102400000 1000000000 ext4 \
$lvmok{{ }} \
method{{ format }} format{{ }} \
use_filesystem{{ }} filesystem{{ ext4 }} \
mountpoint{{ / }} \
lv_name{{ root }} \
. \
200% 200% 200% linux-swap \
$lvmok{{ }} \
method{{ swap }} format{{ }} \
lv_name{{ swap }} \
.
### Base system installation
d-i base-installer/install-recommends boolean true
d-i base-installer/kernel/image string linux-generic
### Apt setup
d-i apt-setup/restricted boolean true
d-i apt-setup/universe boolean true
d-i apt-setup/backports boolean true
d-i apt-setup/use_mirror boolean false
d-i apt-setup/services-select multiselect security, updates
d-i apt-setup/security_host string security.ubuntu.com
d-i apt-setup/security_path string /ubuntu
### Package selection
d-i tasksel/first multiselect none
d-i pkgsel/include string openssh-server
d-i pkgsel/upgrade select full-upgrade
d-i pkgsel/update-policy select unattended-upgrades
d-i grub-installer/only_debian boolean true
d-i grub-installer/with_other_os boolean true
### Finishing up the installation
d-i debian-installer/splash boolean false
d-i cdrom-detect/eject boolean true
d-i preseed/late_command string \
cp /rc.local /target/etc/rc.local
### Shutdown machine
d-i finish-install/reboot_in_progress note
d-i debian-installer/exit/poweroff boolean true
'''
RC_LOCAL_SCRIPT = r'''#!/bin/bash -e
if [ ! -f /post-install-done ]; then
set -x
echo >/etc/sudoers.d/testuser "testuser ALL=(ALL) NOPASSWD:ALL"
apt-get -q -y install virtualbox-guest-utils
# Do not start the guest utils as part of normal system init,
# because there seems to be a race with the network config.
update-rc.d virtualbox-guest-utils remove
touch /post-install-done
poweroff
fi
/etc/init.d/virtualbox-guest-utils start
'''
WRAP_SCRIPT = r'''#!/bin/bash
set -x
sudo apt-get -q -y install git python3.5 python3.5-venv python3-setuptools xterm
touch test.log
xterm -e tail -f test.log &
XTERMPID=$!
./runtest.sh >>test.log 2>&1 && echo "SUCCESS" || echo "FAILED"
kill $XTERMPID
'''
TEST_SCRIPT = r'''#!/bin/bash
SOURCE="{settings.source}"
BRANCH="{settings.branch}"
set -e
set -x
rm -fr noisicaa/
if [ $SOURCE == git ]; then
git clone --branch=$BRANCH --single-branch https://github.com/odahoda/noisicaa
elif [ $SOURCE == local ]; then
mkdir noisicaa/
tar -x -z -Cnoisicaa/ -flocal.tar.gz
fi
cd noisicaa/
pyvenv-3.5 ENV
. ENV/bin/activate
sudo apt-get -q -y install $(cat requirements.ubuntu.pkgs | grep -vE '^\s*#' | grep -vE '^\s*$')
pip install -r requirements.txt
python3 setup.py build
bin/runtests --gdb=false
'''
class Ubuntu_16_04(vm.VM):
def __init__(self):
super().__init__(name='ubuntu-16.04')
self.iso_url = (
'http://archive.ubuntu.com/ubuntu/dists/xenial-updates/main/'
'installer-amd64/current/images/netboot/mini.iso')
def do_install(self):
orig_iso_path = os.path.join(self.vm_dir, 'installer-orig.iso')
iso_path = os.path.join(self.vm_dir, 'installer.iso')
if not os.path.isfile(orig_iso_path):
self.download_file(self.iso_url, orig_iso_path)
if not os.path.isfile(iso_path):
self.patch_iso(orig_iso_path, iso_path)
# Run once with installer from ISO
self.attach_iso(iso_path)
self.start_vm()
self.wait_for_state(self.POWEROFF, timeout=3600)
self.detach_iso()
# Run once again to execute port install script.
self.start_vm()
self.wait_for_state(self.POWEROFF, timeout=3600)
def run_cmd(self, cmd, **kwargs):
if isinstance(cmd, str):
cmd_str = cmd
else:
cmd_str = ' '.join(cmd)
print(cmd_str)
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs)
out = proc.stdout.read()
proc.wait()
if proc.returncode != 0:
print(out)
raise RuntimeError(
"Command '%s' failed with rc=%d" % (cmd_str, proc.returncode))
def unpack_iso(self, path):
iso_dir = os.path.join(self.vm_dir, 'tmpiso')
if os.path.isdir(iso_dir):
shutil.rmtree(iso_dir)
self.run_cmd(['7z', 'x', path, '-o' + iso_dir])
shutil.rmtree(os.path.join(iso_dir, '[BOOT]'))
self.run_cmd(
['gzip', '-d', 'initrd.gz'],
cwd=iso_dir)
return iso_dir
def generate_iso(self, iso_dir, path):
self.run_cmd(
['gzip', 'initrd'],
cwd=iso_dir)
self.run_cmd(
['mkisofs',
'-r',
'-V', 'ubuntu 16.04 netboot unattended',
'-cache-inodes', '-J', '-l',
'-b', 'isolinux.bin',
'-c', 'boot.cat',
'-no-emul-boot',
'-boot-load-size', '4',
'-boot-info-table',
'-input-charset', 'utf-8',
'-o', os.path.join(path + '.partial'),
'./'],
cwd=iso_dir,
)
os.rename(path + '.partial', path)
def get_xkb_layout(self):
return subprocess.check_output(
'. /etc/default/keyboard && echo $XKBLAYOUT',
shell=True).decode('ascii').strip()
def get_timezone(self):
with open('/etc/timezone', 'r') as fp:
return fp.readline().strip()
def create_preseed_cfg(self, iso_dir):
preseed = PRESEED_TEMPLATE.format(
hostname='noisicaa-test',
username='testuser',
password='123',
keyboard_layout=self.get_xkb_layout(),
timezone=self.get_timezone(),
)
with open(os.path.join(iso_dir, 'preseed.cfg'), 'w') as fp:
fp.write(preseed)
# Add to initrd.
self.run_cmd(
'echo "./preseed.cfg" | fakeroot cpio -o -H newc -A -F "./initrd"',
shell=True,
cwd=iso_dir)
with open(os.path.join(iso_dir, 'rc.local'), 'w') as fp:
fp.write(RC_LOCAL_SCRIPT)
os.chmod(os.path.join(iso_dir, 'rc.local'), 0o775)
# Add to initrd.
self.run_cmd(
'echo "./rc.local" | fakeroot cpio -o -H newc -A -F "./initrd"',
shell=True,
cwd=iso_dir)
def patch_grub_cfg(self, iso_dir):
path = os.path.join(iso_dir, 'boot', 'grub', 'grub.cfg')
with open(path, 'r') as fpin:
with open(path + '.new', 'w') as fpout:
for line in fpin:
if line.startswith('menuentry "Install"'):
fpout.write(r'''menuentry "Unattended Install" {
set gfxpayload=keep
linux /linux auto=true priority=critical preseed/file=/preseed.cfg --- quiet
initrd /initrd.gz
}
''')
fpout.write(line)
os.rename(path + '.new', path)
def patch_isolinux_cfg(self, iso_dir):
path = os.path.join(iso_dir, 'isolinux.cfg')
with open(path, 'r') as fpin:
with open(path + '.new', 'w') as fpout:
for line in fpin:
if line.startswith('timeout '):
fpout.write('timeout 100\n')
continue
fpout.write(line)
os.rename(path + '.new', path)
def patch_txt_cfg(self, iso_dir):
path = os.path.join(iso_dir, 'txt.cfg')
with open(path, 'r') as fpin:
with open(path + '.new', 'w') as fpout:
for line in fpin:
if line.startswith('default install'):
fpout.write(r'''default unattended
label unattended
menu label ^Unattended Install
menu default
kernel linux
append vga=788 initrd=initrd.gz auto=true priority=critical preseed/file=/preseed.cfg --- quiet
''')
continue
if line.strip().startswith('menu default'):
continue
fpout.write(line)
os.rename(path + '.new', path)
def patch_iso(self, orig_iso_path, patched_iso_path):
iso_dir = self.unpack_iso(orig_iso_path)
self.create_preseed_cfg(iso_dir)
self.patch_grub_cfg(iso_dir)
self.patch_isolinux_cfg(iso_dir)
self.patch_txt_cfg(iso_dir)
self.generate_iso(iso_dir, patched_iso_path)
def do_test(self, settings):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
client.connect(self.get_ip(timeout=600), username='testuser', password='123')
with client.open_sftp() as sftp:
with sftp.open('runtest.sh', 'w') as fp:
fp.write(TEST_SCRIPT.format(settings=settings))
sftp.chmod('runtest.sh', 0o775)
with sftp.open('wrap.sh', 'w') as fp:
fp.write(WRAP_SCRIPT)
sftp.chmod('wrap.sh', 0o775)
if settings.source == 'local':
self.run_cmd(['git', 'config', 'core.quotepath', 'off'])
proc = subprocess.Popen(
['bash', '-c', 'tar -c -z -T<(git ls-tree --full-tree -r --name-only HEAD) -f-'],
cwd=vm.ROOT_DIR,
stdout=subprocess.PIPE)
with sftp.open('local.tar.gz', 'wb') as fp:
while True:
buf = proc.stdout.read(1024)
if not buf:
break
fp.write(buf)
proc.wait()
assert proc.returncode == 0
print("%s: Running test..." % self.name)
_, stdout, _ = client.exec_command('./wrap.sh')
out = stdout.read().strip()
if out == b'SUCCESS':
print(' OK')
return True
else:
print(' FAILED')
with client.open_sftp() as sftp:
with sftp.open('test.log', 'rb') as fp:
log = fp.read()
sys.stdout.buffer.write(log)
return False

290
noisidev/vmtests/vm.py Normal file
View File

@ -0,0 +1,290 @@
#!/usr/bin/python3
import os
import os.path
import re
import shutil
import subprocess
import sys
import tempfile
import time
import traceback
import unittest
import urllib.parse
import urllib.request
# import paramiko
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
VM_BASE_DIR = os.path.join(ROOT_DIR, 'vmtests')
class VM(object):
# VM states
POWEROFF = 'poweroff'
RUNNING = 'running'
def __init__(self, name):
self.name = name
self.vm_dir = os.path.join(VM_BASE_DIR, self.name)
self.installed_sentinel = os.path.join(self.vm_dir, 'installed')
def get_ip(self, *, timeout):
state = self.get_state()
if state != self.RUNNING:
raise RuntimeError("VM not running (current state: '%s')" % state)
msg = False
deadline = time.time() + timeout
while time.time() < deadline:
ip = self.get_guest_property('/VirtualBox/GuestInfo/Net/0/V4/IP')
if ip is not None:
return ip
if not msg:
print("Waiting for IP address...")
msg = True
time.sleep(5)
raise TimeoutError("Timed out waiting for IP address.")
def is_registered(self):
for line in self.vboxmanage('list', 'vms', echo_cmd=False, get_output=True).splitlines():
m = re.match(r'^"([^"]+)" {[-0-9a-f]+}$', line.strip())
if m and m.group(1) == self.name:
return True
return False
def get_guest_property(self, prop):
value = None
for line in self.vboxmanage(
'guestproperty', 'get', self.name, prop,
echo_cmd=False, get_output=True).splitlines():
if line.startswith('Value:'):
value = line.split(':')[1].strip()
return value
def get_vm_info(self):
vm_info = {}
for line in self.vboxmanage(
'showvminfo', self.name, '--machinereadable',
echo_cmd=False, get_output=True).splitlines():
key, value = line.split('=', 1)
if value.startswith('"') and value.endswith('"'):
value = value.strip('"')
elif re.match(r'^\d+$', value):
value = int(value)
vm_info[key] = value
return vm_info
def get_state(self):
return self.get_vm_info()['VMState']
def start_vm(self):
self.vboxmanage(
'startvm', self.name,
'--type', 'gui'
)
def wait_for_state(self, state, *, timeout):
deadline = time.time() + timeout
while time.time() < deadline:
if self.get_state() == state:
return
time.sleep(5)
raise TimeoutError("State '%s' not reached (currently: '%s')" % (state, self.get_state()))
def download_file(self, url, path):
total_bytes = 0
with urllib.request.urlopen(url) as fp_in:
with open(path + '.partial', 'wb') as fp_out:
last_report = time.time()
try:
while True:
dat = fp_in.read(10240)
if not dat:
break
fp_out.write(dat)
total_bytes += len(dat)
if time.time() - last_report > 1:
sys.stderr.write(
'Downloading %s: %d bytes\r'
% (url, total_bytes))
sys.stderr.flush()
last_report = time.time()
finally:
sys.stderr.write('\033[K')
sys.stderr.flush()
os.rename(path + '.partial', path)
print('Downloaded %s: %d bytes' % (url, total_bytes))
def vboxmanage(self, *args, get_output=False, echo_cmd=True):
argv = ['vboxmanage']
argv.extend(args)
# Tweak $HOME to create a separate "VirtualBox installation", and we don't mess up the
# users VirtualBox config.
env = dict(**os.environ)
env['HOME'] = VM_BASE_DIR
if echo_cmd:
print(' '.join(argv))
if get_output:
proc = subprocess.run(argv, check=True, env=env, stdout=subprocess.PIPE)
return proc.stdout.decode('utf-8')
else:
subprocess.run(argv, check=True, env=env)
def get_default_if(self):
proc = subprocess.run(['route', '-n'], check=True, stdout=subprocess.PIPE)
for line in proc.stdout.decode('utf-8').splitlines():
if line.startswith('0.0.0.0'):
return line.split()[-1]
raise RuntimeError
def delete(self):
if self.is_registered():
self.vboxmanage('unregistervm', self.name)
if os.path.isdir(self.vm_dir):
print("rm -fr %s" % self.vm_dir)
shutil.rmtree(self.vm_dir)
def setup_vm(self):
vm_cfg_path = os.path.join(self.vm_dir, '%s.vbox' % self.name)
if not os.path.isfile(vm_cfg_path):
self.vboxmanage(
'createvm',
'--register',
'--name', self.name,
'--basefolder', VM_BASE_DIR,
)
self.vboxmanage(
'modifyvm', self.name,
'--memory', '1024', # 1G
'--cpus', '2',
'--nic1', 'bridged',
'--bridgeadapter1', self.get_default_if(),
'--audio', 'alsa',
)
self.vboxmanage(
'storagectl', self.name,
'--name', 'ctrl0',
'--add', 'ide',
'--controller', 'PIIX4',
'--portcount', '2',
'--bootable', 'on',
)
def setup_hd(self):
hd_path = os.path.join(self.vm_dir, 'disk.img')
if not os.path.isfile(hd_path):
self.vboxmanage(
'createmedium', 'disk',
'--filename', hd_path,
'--size', '10240', # 10G
)
self.vboxmanage(
'storageattach', self.name,
'--storagectl', 'ctrl0',
'--port', '0',
'--device', '0',
'--type', 'hdd',
'--medium', hd_path,
)
def attach_iso(self, path):
self.vboxmanage(
'storageattach', self.name,
'--storagectl', 'ctrl0',
'--port', '1',
'--device', '0',
'--type', 'dvddrive',
'--medium', path,
)
def detach_iso(self):
self.vboxmanage(
'storageattach', self.name,
'--storagectl', 'ctrl0',
'--port', '1',
'--device', '0',
'--type', 'dvddrive',
'--medium', 'emptydrive',
)
def install(self):
if not os.path.isfile(self.installed_sentinel):
print("Installing VM %s..." % self.name)
if not os.path.isdir(self.vm_dir):
os.makedirs(self.vm_dir)
self.setup_vm()
self.setup_hd()
self.do_install()
self.vboxmanage(
'snapshot', self.name,
'take', 'clean',
)
open(self.installed_sentinel, 'w').close()
def do_install(self):
raise NotImplementedError
def runtest(self, settings):
try:
self.install()
if settings.clean_snapshot:
self.vboxmanage(
'snapshot', self.name,
'restore', 'clean',
)
self.vboxmanage(
'startvm', self.name,
'--type', 'gui'
)
return self.do_test(settings)
except Exception: # pylint: disable=broad-except
traceback.print_exc()
return False
finally:
if settings.shutdown:
self.vboxmanage('controlvm', self.name, 'acpipowerbutton')
try:
self.wait_for_state('poweroff', timeout=300)
except TimeoutError:
self.vboxmanage('controlvm', self.name, 'poweroff')
def do_test(self):
raise NotImplementedError
class VMTest(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
def cleanUp(self):
shutil.rmtree(self.temp_dir)
def test_download_file(self):
vm = VM(name='test')
path = os.path.join(self.temp_dir, 'file')
vm.download_file('http://www.google.com/', path)
self.assertTrue(os.path.isfile(path))
if __name__ == '__main__':
unittest.main()