Commit 50792314 authored by Andrew Walker's avatar Andrew Walker
Browse files

Remove test

parent 08e90245
Showing with 0 additions and 4927 deletions
+0 -4927
This diff is collapsed.
#!/usr/bin/env python3
# Author: Eric Turgeon
# License: BSD
import pytest
from pytest_dependency import depends
from functions import SSH_TEST
from auto_config import hostname, ip
from middlewared.test.integration.assets.directory_service import active_directory, ldap
from middlewared.test.integration.utils import call
try:
from config import AD_DOMAIN, ADPASSWORD, ADUSERNAME
except ImportError:
Reason = 'ADNameServer AD_DOMAIN, ADPASSWORD, or/and ADUSERNAME are missing in config.py"'
pytestmark = pytest.mark.skip(reason=Reason)
try:
from config import (
LDAPUSER,
LDAPPASSWORD
)
except ImportError:
Reason = 'LDAP* variable are not setup in config.py'
pytestmark = pytest.mark.skipif(True, reason=Reason)
@pytest.fixture(scope="function")
def do_ad_connection(request):
with active_directory() as ad:
yield ad
@pytest.fixture(scope="function")
def do_ldap_connection(request):
with ldap() as ldap_conn:
yield ldap_conn
def test_08_test_ssh_ad(do_ad_connection):
userobj = do_ad_connection['user_obj']
groupobj = call('group.get_group_obj', {'gid': userobj['pw_gid']})
call('ssh.update', {"password_login_groups": [groupobj['gr_name']]})
cmd = 'ls -la'
results = SSH_TEST(cmd, f'{ADUSERNAME}@{AD_DOMAIN}', ADPASSWORD, ip)
call('ssh.update', {"password_login_groups": []})
assert results['result'] is True, results
def test_09_test_ssh_ldap(do_ldap_connection):
userobj = call('user.get_user_obj', {'username': LDAPUSER})
groupobj = call('group.get_group_obj', {'gid': userobj['pw_gid']})
call('ssh.update', {"password_login_groups": [groupobj['gr_name']]})
cmd = 'ls -la'
results = SSH_TEST(cmd, LDAPUSER, LDAPPASSWORD, ip)
call('ssh.update', {"password_login_groups": []})
assert results['result'] is True, results
#!/usr/bin/env python3
import errno
import sys
import os
apifolder = os.getcwd()
sys.path.append(apifolder)
import pytest
from functions import GET, SSH_TEST, make_ws_request
from auto_config import ha, user, password
from pytest_dependency import depends
from middlewared.client import ClientException
from middlewared.test.integration.assets.account import unprivileged_user
from middlewared.test.integration.utils import call, client
if ha and "virtual_ip" in os.environ:
ip = os.environ["virtual_ip"]
else:
from auto_config import ip
@pytest.fixture(scope='module')
def readonly_admin():
# READONLY role implies FAILOVER_READ
with unprivileged_user(
username='failover_guy',
group_name='failover_admins',
privilege_name='FAILOVER_PRIV',
allowlist=[],
web_shell=False,
roles=['READONLY_ADMIN']
) as acct:
yield acct
@pytest.mark.dependency(name='hactl_install_dir')
def test_01_check_hactl_installed(request):
rv = SSH_TEST('which hactl', user, password, ip)
assert rv['stdout'].strip() == '/usr/local/sbin/hactl', rv['output']
@pytest.mark.dependency(name='hactl_status')
def test_02_check_hactl_status(request):
depends(request, ['hactl_install_dir'])
rv = SSH_TEST('hactl', user, password, ip)
output = rv['stdout'].strip()
if ha:
for i in ('Node status:', 'This node serial:', 'Other node serial:', 'Failover status:'):
assert i in output, output
else:
assert 'Not an HA node' in output, output
@pytest.mark.dependency(name='hactl_takeover')
def test_03_check_hactl_takeover(request):
# integration tests run against the master node (at least they should...)
depends(request, ['hactl_status'])
rv = SSH_TEST('hactl takeover', user, password, ip)
output = rv['stdout'].strip()
if ha:
assert 'This command can only be run on the standby node.' in output, output
else:
assert 'Not an HA node' in output, output
@pytest.mark.dependency(name='hactl_enable')
def test_04_check_hactl_enable(request):
# integration tests run against the master node (at least they should...)
depends(request, ['hactl_takeover'])
rv = SSH_TEST('hactl enable', user, password, ip)
output = rv['stdout'].strip()
if ha:
assert 'Failover already enabled.' in output, output
else:
assert 'Not an HA node' in output, output
def test_05_check_hactl_disable(request):
# integration tests run against the master node (at least they should...)
depends(request, ['hactl_enable'])
rv = SSH_TEST('hactl disable', user, password, ip)
output = rv['stdout'].strip()
if ha:
assert 'Failover disabled.' in output, output
rv = make_ws_request(ip, {'msg': 'method', 'method': 'failover.config', 'params': []})
assert isinstance(rv['result'], dict), rv['result']
assert rv['result']['disabled'] is True, rv['result']
rv = SSH_TEST('hactl enable', user, password, ip)
output = rv['stdout'].strip()
assert 'Failover enabled.' in output, output
rv = make_ws_request(ip, {'msg': 'method', 'method': 'failover.config', 'params': []})
assert isinstance(rv['result'], dict), rv['result']
assert rv['result']['disabled'] is False, rv['result']
else:
assert 'Not an HA node' in output, output
def test_06_test_failover_get_ips():
results = GET('/failover/get_ips', controller_a=ha)
assert results.status_code == 200, results.text
rv = results.json()
assert (isinstance(rv, list)), rv
if ha:
assert rv
if ha:
def test_07_failover_replicate():
old_ns = call('network.configuration.config')['nameserver3']
new_ns = '1.1.1.1'
try:
call('network.configuration.update', {'nameserver3': new_ns})
remote = call('failover.call_remote', 'network.configuration.config')
assert remote['nameserver3'] == new_ns
assert remote['state']['nameserver3'] == new_ns
finally:
call('network.configuration.update', {'nameserver3': old_ns})
remote = call('failover.call_remote', 'network.configuration.config')
assert remote['nameserver3'] == old_ns
assert remote['state']['nameserver3'] == old_ns
def test_08_readonly_ops(request, readonly_admin):
with client(auth=(readonly_admin.username, readonly_admin.password)) as c:
c.call('failover.config')
c.call('failover.node')
c.call('failover.upgrade_pending')
with pytest.raises(ClientException) as ce:
c.call('failover.call_remote', 'user.update')
assert ce.value.errno == errno.EACCES
import time
import sys
import os
apifolder = os.getcwd()
sys.path.append(apifolder)
import pytest
from middlewared.service_exception import CallError
from middlewared.test.integration.utils import call, ssh
@pytest.mark.flaky(reruns=5, reruns_delay=5) # Sometimes systemd unit state is erroneously reported as active
def test_non_silent_service_start_failure():
"""
This test for 2 conditions:
1. middleware raises CallError that isn't empty
2. each time a CallError is raised, the message
has a timestamp and that timestamp changes
with each failure
"""
with pytest.raises(CallError) as e:
call('service.start', 'ups', {'silent': False})
# Error looks like
"""
middlewared.service_exception.CallError: [EFAULT] Jan 10 08:49:14 systemd[1]: Starting Network UPS Tools - power device monitor and shutdown controller...
Jan 10 08:49:14 nut-monitor[3032658]: fopen /run/nut/upsmon.pid: No such file or directory
Jan 10 08:49:14 nut-monitor[3032658]: Unable to use old-style MONITOR line without a username
Jan 10 08:49:14 nut-monitor[3032658]: Convert it and add a username to upsd.users - see the documentation
Jan 10 08:49:14 nut-monitor[3032658]: Fatal error: unusable configuration
Jan 10 08:49:14 nut-monitor[3032658]: Network UPS Tools upsmon 2.7.4
Jan 10 08:49:14 systemd[1]: nut-monitor.service: Control process exited, code=exited, status=1/FAILURE
Jan 10 08:49:14 systemd[1]: nut-monitor.service: Failed with result 'exit-code'.
Jan 10 08:49:14 systemd[1]: Failed to start Network UPS Tools - power device monitor and shutdown controller.
"""
lines1 = e.value.errmsg.splitlines()
first_ts, len_lines1 = ' '.join(lines1.pop(0).split()[:3]), len(lines1)
assert any('nut-monitor[' in line for line in lines1), lines1
assert any('systemd[' in line for line in lines1), lines1
# make sure we don't trigger system StartLimitBurst threshold
# by removing this service from failed unit list (if it's there)
ssh('systemctl reset-failed nut-monitor')
# we have to sleep 1 second here or the timestamp will be the
# same as when we first tried to start the service which is
# what we're testing to make sure the message is up to date
# with reality
time.sleep(1)
with pytest.raises(CallError) as e:
call('service.start', 'ups', {'silent': False})
# Error looks like: (Notice timestamp change, which is what we verify
"""
middlewared.service_exception.CallError: [EFAULT] Jan 10 08:49:15 systemd[1]: Starting Network UPS Tools - power device monitor and shutdown controller...
Jan 10 08:49:15 nut-monitor[3032739]: fopen /run/nut/upsmon.pid: No such file or directory
Jan 10 08:49:15 nut-monitor[3032739]: Unable to use old-style MONITOR line without a username
Jan 10 08:49:15 nut-monitor[3032739]: Convert it and add a username to upsd.users - see the documentation
Jan 10 08:49:15 nut-monitor[3032739]: Fatal error: unusable configuration
Jan 10 08:49:15 nut-monitor[3032739]: Network UPS Tools upsmon 2.7.4
Jan 10 08:49:15 systemd[1]: nut-monitor.service: Control process exited, code=exited, status=1/FAILURE
Jan 10 08:49:15 systemd[1]: nut-monitor.service: Failed with result 'exit-code'.
Jan 10 08:49:15 systemd[1]: Failed to start Network UPS Tools - power device monitor and shutdown controller.
"""
lines2 = e.value.errmsg.splitlines()
second_ts, len_lines2 = ' '.join(lines2.pop(0).split()[:3]), len(lines2)
assert any('nut-monitor[' in line for line in lines2), lines2
assert any('systemd[' in line for line in lines2), lines2
# timestamp should change since we sleep(1)
assert first_ts != second_ts
# the error messages will differ slightly (different PID for upsmon) but the number
# of lines should be the same
assert len_lines1 == len_lines2
# Stop the service to avoid syslog spam
call('service.stop', 'ups')
#!/usr/bin/env python3
# Author: Eric Turgeon
# License: BSD
# Location for tests into REST API of FreeNAS
import sys
import os
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import DELETE, GET, POST
def delete_group_delete_users(delete_users):
results = POST("/user/", {
"username": "test",
"group_create": True,
"full_name": "Test",
"smb": False,
"password_disabled": True,
})
assert results.status_code == 200, results.text
user_id = results.json()
results = GET(f"/user/id/{user_id}")
assert results.status_code == 200, results.text
group_id = results.json()["group"]["id"]
results = DELETE(f"/group/id/{group_id}", {"delete_users": delete_users})
assert results.status_code == 200, results.text
return user_id, group_id
def test_01_delete_group_delete_users():
user_id, group_id = delete_group_delete_users(True)
results = GET(f"/user/id/{user_id}")
assert results.status_code == 404, results.text
def test_01_delete_group_no_delete_users():
user_id, group_id = delete_group_delete_users(False)
results = GET(f"/user/id/{user_id}")
assert results.status_code == 200, results.text
assert results.json()["group"]["bsdgrp_group"] in ["nogroup", "nobody"]
results = DELETE(f"/user/id/{user_id}")
assert results.status_code == 200, results.text
import pytest
import time
from pytest_dependency import depends
from functions import GET, PUT, wait_on_job
from auto_config import ha, pool_name, interface, ip
from middlewared.test.integration.utils import call
# Read all the test below only on non-HA
if not ha:
def test_01_get_kubernetes_bindip_choices():
results = GET('/kubernetes/bindip_choices/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert '0.0.0.0' in results.json(), results.text
assert ip in results.json(), results.text
@pytest.mark.dependency(name='setup_kubernetes')
def test_02_setup_kubernetes(request):
global payload
gateway = GET("/network/general/summary/").json()['default_routes'][0]
payload = {
'pool': pool_name,
'route_v4_interface': interface,
'route_v4_gateway': gateway,
'node_ip': ip
}
results = PUT('/kubernetes/', payload)
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
@pytest.mark.parametrize('data', ['pool', 'route_v4_interface', 'route_v4_gateway', 'node_ip'])
def test_03_verify_kubernetes(request, data):
depends(request, ["setup_kubernetes"])
results = GET('/kubernetes/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()[data] == payload[data], results.text
def test_04_get_kubernetes_node_ip(request):
depends(request, ["setup_kubernetes"])
results = GET('/kubernetes/node_ip/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), str), results.text
assert results.json() == ip, results.text
def test_05_get_kubernetes_events(request):
depends(request, ["setup_kubernetes"])
results = GET('/kubernetes/events/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
def test_06_kubernetes_config_acl(request):
depends(request, ['setup_kubernetes'])
acl_mode = call('filesystem.stat', '/etc/rancher/k3s/k3s.yaml')
netdata_group = call('group.query', [['group', '=', 'netdata']], {'get': True})
assert acl_mode['gid'] == netdata_group['gid']
assert (acl_mode['mode'] & 0o640) == 0o640
def test_07_kubernetes_pods_stats(request):
depends(request, ['setup_kubernetes'])
last_update = None
timeout = 150
while True:
time.sleep(5)
kube_system_pods = call(
'k8s.pod.query', [
['metadata.namespace', '=', 'kube-system']
], {'select': ['metadata.name', 'status.phase']}
)
k3s_metrics = call('netdata.get_all_metrics').get('k3s_stats.k3s_stats', {})
if len([pod for pod in kube_system_pods if pod['status']['phase'] == 'Running']) >= 3 and k3s_metrics:
# The 3 number here is to ensure that by the time we try to retrieve stats, some pods are running
# and netdata is able to collect some data
if not last_update:
last_update = k3s_metrics['last_updated']
if last_update and last_update != k3s_metrics['last_updated']:
break
if timeout <= 0:
pytest.fail('Time to setup kubernetes exceeded 150 seconds')
timeout -= 5
stats = call('chart.release.stats_internal', kube_system_pods)
assert any(
d[k] > 0 for d, k in (
(stats, 'memory'), (stats, 'cpu'), (stats['network'], 'incoming'), (stats['network'], 'outgoing')
)
), stats
This diff is collapsed.
import json
import os
import pytest
import sys
from pytest_dependency import depends
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import GET, POST, DELETE, SSH_TEST, wait_on_job
from auto_config import ha, artifacts, password, ip, pool_name
from middlewared.test.integration.utils import call, ssh
from middlewared.test.integration.assets.apps import chart_release
from middlewared.test.integration.assets.catalog import catalog
from middlewared.test.integration.assets.kubernetes import backup
from middlewared.test.integration.utils import file_exists_and_perms_check
backup_release_name = 'backupsyncthing'
# Read all the test below only on non-HA
if not ha:
@pytest.mark.dependency(name='plex_version')
def test_01_get_plex_version():
global plex_version
payload = {
"item_name": "plex",
"item_version_details": {
"catalog": "TRUENAS",
"train": 'charts'
}
}
results = POST('/catalog/get_item_details/', payload)
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
plex_version = results.json()['latest_version']
@pytest.mark.dependency(name='release_plex')
def test_02_create_plex_chart_release(request):
depends(request, ['setup_kubernetes', 'plex_version'], scope='session')
global plex_id
payload = {
'catalog': 'TRUENAS',
'item': 'plex',
'release_name': 'myplex',
'train': 'charts',
'version': plex_version
}
results = POST('/chart/release/', payload)
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
plex_id = job_status['results']['result']['id']
@pytest.mark.dependency(name='ix_app_backup')
def test_03_create_kubernetes_backup_chart_releases_for_ix_applications(request):
depends(request, ['release_plex'])
global backup_name
results = POST('/kubernetes/backup_chart_releases/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
backup_name = job_status['results']['result']
@pytest.mark.dependency(name='check_datasets_to_ignore')
def test_04_check_to_ignore_datasets_exist(request):
datasets_to_ignore = set(call('kubernetes.to_ignore_datasets_on_backup', call('kubernetes.config')['dataset']))
assert set(ds['id'] for ds in call(
'zfs.dataset.query', [['OR', [['id', '=', directory] for directory in datasets_to_ignore]]]
)) == datasets_to_ignore
def test_05_backup_chart_release(request):
depends(request, ['ix_app_backup', 'check_datasets_to_ignore'])
datasets_to_ignore = set(call('kubernetes.to_ignore_datasets_on_backup', call('kubernetes.config')['dataset']))
datasets = set(snap['dataset'] for snap in call('zfs.snapshot.query', [['id', 'rin', backup_name]]))
assert datasets_to_ignore.intersection(datasets) == set()
def test_06_get_ix_applications_kubernetes_backup(request):
depends(request, ['ix_app_backup'])
results = GET('/kubernetes/list_backups/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert backup_name in results.json(), results.text
@pytest.mark.dependency(name='ix_app_backup_restored')
def test_07_restore_ix_applications_kubernetes_backup(request):
depends(request, ['ix_app_backup'])
results = POST('/kubernetes/restore_backup/', backup_name)
assert results.status_code == 200, results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_08_verify_plex_chart_release_still_exist(request):
depends(request, ['release_plex', 'ix_app_backup_restored'])
results = GET(f'/chart/release/id/{plex_id}/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
@pytest.mark.dependency(name='release_ipfs')
def test_09_create_ipfs_chart_release(request):
depends(request, ['setup_kubernetes'], scope='session')
global ipfs_id
payload = {
'catalog': 'TRUENAS',
'item': 'ipfs',
'release_name': 'ipfs',
'train': 'community'
}
results = POST('/chart/release/', payload)
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
ipfs_id = job_status['results']['result']['id']
@pytest.mark.dependency(name='my_app_backup')
def test_10_create_custom_name_kubernetes_chart_releases_backup(request):
depends(request, ['release_plex', 'release_ipfs'])
results = POST('/kubernetes/backup_chart_releases/', 'mybackup')
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_11_backup_snapshot_name_validation(request):
depends(request, ['my_app_backup'])
results = POST('/kubernetes/backup_chart_releases/', 'mybackup')
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'FAILED'
assert job_status['results']['error'] == "[EEXIST] 'ix-applications-backup-mybackup' snapshot already exists"
def test_12_get_custom_name_kubernetes_backup(request):
depends(request, ['my_app_backup'])
results = GET('/kubernetes/list_backups/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert 'mybackup' in results.json(), results.text
def test_13_restore_custom_name_kubernetes_backup(request):
depends(request, ['my_app_backup'])
results = POST('/kubernetes/restore_backup/', 'mybackup')
assert results.status_code == 200, results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_14_verify_plex_and_ipfs_chart_release_still_exist(request):
depends(request, ['my_app_backup'])
results = GET(f'/chart/release/id/{plex_id}/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
results = GET(f'/chart/release/id/{ipfs_id}/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
@pytest.mark.dependency(name='my_second_backup')
def test_15_create_mysecondbackup_kubernetes_chart_releases_backup(request):
depends(request, ['release_plex', 'release_ipfs'])
results = POST('/kubernetes/backup_chart_releases/', 'mysecondbackup')
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_16_delete_ipfs_chart_release(request):
depends(request, ['release_ipfs'])
results = DELETE(f'/chart/release/id/{ipfs_id}/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_17_restore_custom_name_kubernetes_backup(request):
depends(request, ['my_second_backup'])
results = POST('/kubernetes/restore_backup/', 'mysecondbackup')
assert results.status_code == 200, results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_18_verify_plex_chart_still_exist_and_ipfs_does_not_exist(request):
depends(request, ['my_app_backup'])
results = GET(f'/chart/release/id/{plex_id}/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
results = GET(f'/chart/release/id/{ipfs_id}/')
assert results.status_code == 404, results.text
assert isinstance(results.json(), dict), results.text
def test_19_delete_mybackup_kubernetes_backup(request):
depends(request, ['my_app_backup'])
results = POST('/kubernetes/delete_backup/', 'mybackup')
assert results.status_code == 200, results.text
assert results.json() is None, results.text
def test_20_delete_ix_applications_kubernetes_backup(request):
depends(request, ['ix_app_backup', 'ix_app_backup_restored'])
results = POST('/kubernetes/delete_backup/', backup_name)
assert results.status_code == 200, results.text
assert results.json() is None, results.text
@pytest.mark.dependency(name='k8s_snapshot_regression')
def test_21_recreate_mybackup_kubernetes_backup_for_snapshots_regression(request):
depends(request, ['my_app_backup'])
results = POST('/kubernetes/backup_chart_releases/', 'mybackup')
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_22_delete_mybackup_kubernetes_backup(request):
depends(request, ['k8s_snapshot_regression'])
results = POST('/kubernetes/delete_backup/', 'mybackup')
assert results.status_code == 200, results.text
assert results.json() is None, results.text
def test_23_delete_mysecondbackup_kubernetes_backup(request):
depends(request, ['my_second_backup'])
results = POST('/kubernetes/delete_backup/', 'mysecondbackup')
assert results.status_code == 200, results.text
assert results.json() is None, results.text
def test_24_delete_plex_chart_release(request):
depends(request, ['release_plex'])
results = DELETE(f'/chart/release/id/{plex_id}/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 300)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_25_get_k3s_logs():
results = SSH_TEST('journalctl --no-pager -u k3s', 'root', password, ip)
ks3_logs = open(f'{artifacts}/k3s-scale.log', 'w')
ks3_logs.writelines(results['output'])
ks3_logs.close()
def test_26_backup_structure():
def read_file_content(file_path: str) -> str:
return ssh(f'cat {file_path}')
with chart_release({
'catalog': 'TRUENAS',
'item': 'syncthing',
'release_name': backup_release_name,
'train': 'charts',
}, wait_until_active=True) as chart_release_info:
with backup() as backup_name:
app_info = call(
'chart.release.get_instance', chart_release_info['id'], {'extra': {'retrieve_resources': True}}
)
backup_path = os.path.join(
'/mnt', pool_name, 'ix-applications/backups', backup_name, app_info['id']
)
for f in ('namespace.yaml', 'workloads_replica_counts.json'):
test_path = os.path.join(backup_path, f)
assert file_exists_and_perms_check(test_path) is True, test_path
secrets_data = call(
'k8s.secret.query', [
['type', 'in', ['helm.sh/release.v1', 'Opaque']],
['metadata.namespace', '=', app_info['namespace']]
]
)
for secret in secrets_data:
secret_file_path = os.path.join(backup_path, 'secrets', secret['metadata']['name'])
assert file_exists_and_perms_check(secret_file_path) is True, secret_file_path
exported_secret = call('k8s.secret.export_to_yaml', secret['metadata']['name'])
assert read_file_content(secret_file_path) == exported_secret
assert read_file_content(os.path.join(backup_path, 'namespace.yaml')) == call(
'k8s.namespace.export_to_yaml', app_info['namespace']
)
assert json.loads(read_file_content(
os.path.join(backup_path, 'workloads_replica_counts.json')
)) == call('chart.release.get_replica_count_for_resources', app_info['resources'])
import contextlib
import pytest
from middlewared.test.integration.utils import call, ssh
from pytest_dependency import depends
from time import sleep
@contextlib.contextmanager
def official_chart_release(chart_name, release_name):
payload = {
'catalog': 'TRUENAS',
'item': chart_name,
'release_name': release_name,
'train': 'community',
}
chart_release = call('chart.release.create', payload, job=True)
try:
yield chart_release
finally:
call('chart.release.delete', release_name, job=True)
@contextlib.contextmanager
def get_chart_release_pods(release_name, timeout=90):
status = call('chart.release.pod_status', release_name)
time_spend = 0
while status.get('status') != 'ACTIVE':
if time_spend > timeout:
raise Exception('Time out chart release is not in running state')
sleep(6)
time_spend += 6
status = call('chart.release.pod_status', release_name)
# Give some time for the pods to actually propagate some logs
sleep(10)
chart_pods = call('chart.release.pod_logs_choices', release_name)
yield chart_pods
@pytest.mark.flaky(reruns=5, reruns_delay=5)
def test_get_chart_release_logs(request):
depends(request, ['setup_kubernetes'], scope='session')
release_name = 'test-logs'
with official_chart_release('tftpd-hpa', release_name) as chart_release:
with get_chart_release_pods(release_name, 300) as pods:
for pod_name, containers in pods.items():
for container in containers:
logs = call('k8s.pod.get_logs', pod_name, container, chart_release['namespace'])
assert logs != ''
def test_get_chart_exec_result(request):
depends(request, ['setup_kubernetes'], scope='session')
release_name = 'test-exec'
with official_chart_release('searxng', release_name) as chart_release:
with get_chart_release_pods(release_name, 300) as pods:
for pod_name, containers in pods.items():
for container in containers:
result = ssh(
f'k3s kubectl exec -n {chart_release["namespace"]} pods/{pod_name} -c {container} -- /bin/ls',
check=False)
assert result != ''
import ipaddress
import errno
import os
import sys
from time import sleep
import pytest
from pytest_dependency import depends
apifolder = os.getcwd()
sys.path.append(apifolder)
from auto_config import ip, ha
from functions import GET, POST, make_ws_request
from protocols import smb_connection, smb_share
from middlewared.service_exception import CallError, ValidationErrors
from middlewared.client.client import ValidationErrors as ClientValidationErrors
from middlewared.test.integration.assets.pool import dataset
from middlewared.test.integration.assets.privilege import privilege
from middlewared.test.integration.assets.directory_service import active_directory, override_nameservers
from middlewared.test.integration.utils import call, ssh, client
from middlewared.test.integration.assets.product import product_type
if ha and "hostname_virtual" in os.environ:
hostname = os.environ["hostname_virtual"]
else:
from auto_config import hostname
try:
from config import AD_DOMAIN, ADPASSWORD, ADUSERNAME
AD_USER = fr"AD02\{ADUSERNAME.lower()}"
except ImportError:
Reason = 'ADNameServer AD_DOMAIN, ADPASSWORD, or/and ADUSERNAME are missing in config.py"'
pytestmark = pytest.mark.skip(reason=Reason)
SMB_NAME = "TestADShare"
def remove_dns_entries(payload):
res = make_ws_request(ip, {
'msg': 'method',
'method': 'dns.nsupdate',
'params': [{'ops': payload}]
})
error = res.get('error')
assert error is None, str(error)
def cleanup_forward_zone():
res = make_ws_request(ip, {
'msg': 'method',
'method': 'dnsclient.forward_lookup',
'params': [{'names': [f'{hostname}.{AD_DOMAIN}']}]
})
error = res.get('error')
if error and error['trace']['class'] == 'NXDOMAIN':
# No entry, nothing to do
return
assert error is None, str(error)
ips_to_remove = [rdata['address'] for rdata in res['result']]
payload = []
for i in ips_to_remove:
addr = ipaddress.ip_address(i)
payload.append({
'command': 'DELETE',
'name': f'{hostname}.{AD_DOMAIN}.',
'address': str(addr),
'type': 'A' if addr.version == 4 else 'AAAA'
})
remove_dns_entries(payload)
def cleanup_reverse_zone():
res = make_ws_request(ip, {
'msg': 'method',
'method': 'activedirectory.ipaddresses_to_register',
'params': [
{'hostname': f'{hostname}.{AD_DOMAIN}.', 'bindip': []},
False
],
})
error = res.get('error')
assert error is None, str(error)
ptr_table = {f'{ipaddress.ip_address(i).reverse_pointer}.': i for i in res['result']}
res = make_ws_request(ip, {
'msg': 'method',
'method': 'dnsclient.reverse_lookup',
'params': [{'addresses': list(ptr_table.values())}],
})
error = res.get('error')
if error and error['trace']['class'] == 'NXDOMAIN':
# No entry, nothing to do
return
assert error is None, str(error)
payload = []
for host in res['result']:
reverse_pointer = host["name"]
assert reverse_pointer in ptr_table, str(ptr_table)
addr = ipaddress.ip_address(ptr_table[reverse_pointer])
payload.append({
'command': 'DELETE',
'name': host['target'],
'address': str(addr),
'type': 'A' if addr.version == 4 else 'AAAA'
})
remove_dns_entries(payload)
@pytest.fixture(scope="function")
def set_product_type(request):
with product_type():
yield
@pytest.fixture(scope="function")
def set_ad_nameserver(request):
with override_nameservers() as ns:
yield (request, ns)
def test_02_cleanup_nameserver(set_ad_nameserver):
results = POST("/activedirectory/domain_info/", AD_DOMAIN)
assert results.status_code == 200, results.text
domain_info = results.json()
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.get_cred',
'params': [{
'dstype': 'DS_TYPE_ACTIVEDIRECTORY',
'conf': {
'bindname': ADUSERNAME,
'bindpw': ADPASSWORD,
'domainname': AD_DOMAIN,
}
}],
})
error = res.get('error')
assert error is None, str(error)
cred = res['result']
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.do_kinit',
'params': [{
'krb5_cred': cred,
'kinit-options': {
'kdc_override': {
'domain': AD_DOMAIN.upper(),
'kdc': domain_info['KDC server']
},
}
}],
})
error = res.get('error')
assert error is None, str(error)
# Now that we have proper kinit as domain admin
# we can nuke stale DNS entries from orbit.
#
cleanup_forward_zone()
cleanup_reverse_zone()
def test_03_get_activedirectory_data(request):
global results
results = GET('/activedirectory/')
assert results.status_code == 200, results.text
def test_05_get_activedirectory_state(request):
results = GET('/activedirectory/get_state/')
assert results.status_code == 200, results.text
assert results.json() == 'DISABLED', results.text
def test_06_get_activedirectory_started_before_starting_activedirectory(request):
results = GET('/activedirectory/started/')
assert results.status_code == 200, results.text
assert results.json() is False, results.text
@pytest.mark.dependency(name="ad_works")
def test_07_enable_leave_activedirectory(request):
with pytest.raises(ValidationErrors):
# At this point we are not enterprise licensed
call("system.general.update", {"ds_auth": True})
short_name = None
with active_directory(dns_timeout=15) as ad:
short_name = ad['dc_info']['Pre-Win2k Domain']
# Make sure we can read our secrets.tdb file
secrets_has_domain = call('directoryservices.secrets.has_domain', short_name)
assert secrets_has_domain is True
# Check that our database has backup of this info written to it.
db_secrets = call('directoryservices.secrets.get_db_secrets')[f'{hostname.upper()}$']
assert f'SECRETS/MACHINE_PASSWORD/{short_name}' in db_secrets
# Last password change should be populated
passwd_change = call('directoryservices.get_last_password_change')
assert passwd_change['dbconfig'] is not None
assert passwd_change['secrets'] is not None
# We should be able tZZo change some parameters when joined to AD
call('activedirectory.update', {'domainname': AD_DOMAIN, 'verbose_logging': True}, job=True)
# Changing kerberos realm should raise ValidationError
with pytest.raises(ClientValidationErrors) as ve:
call('activedirectory.update', {'domainname': AD_DOMAIN, 'kerberos_realm': None}, job=True)
assert ve.value.errors[0].errmsg.startswith('Kerberos realm may not be altered')
# This should be caught by our catchall
with pytest.raises(ClientValidationErrors) as ve:
call('activedirectory.update', {'domainname': AD_DOMAIN, 'createcomputer': ''}, job=True)
assert ve.value.errors[0].errmsg.startswith('Parameter may not be changed')
# Verify that AD state is reported as healthy
assert call('activedirectory.get_state') == 'HEALTHY'
# Verify that `started` endpoint works correctly
assert call('activedirectory.started') is True
# Verify that idmapping is working
pw = ad['user_obj']
# Verify winbindd information
assert pw['sid_info'] is not None, str(ad)
assert not pw['sid_info']['sid'].startswith('S-1-22-1-'), str(ad)
assert pw['sid_info']['domain_information']['domain'] != 'LOCAL', str(ad)
assert pw['sid_info']['domain_information']['domain_sid'] is not None, str(ad)
assert pw['sid_info']['domain_information']['online'], str(ad)
assert pw['sid_info']['domain_information']['activedirectory'], str(ad)
res = make_ws_request(ip, {
'msg': 'method',
'method': 'dnsclient.forward_lookup',
'params': [{'names': [f'{hostname}.{AD_DOMAIN}']}],
})
error = res.get('error')
assert error is None, str(error)
assert len(res['result']) != 0
addresses = [x['address'] for x in res['result']]
assert ip in addresses
res = call('privilege.query', [['name', 'C=', AD_DOMAIN]], {'get': True})
assert res['ds_groups'][0]['name'].endswith('domain admins')
assert res['ds_groups'][0]['sid'].endswith('512')
assert res['allowlist'][0] == {'method': '*', 'resource': '*'}
assert call('activedirectory.get_state') == 'DISABLED'
secrets_has_domain = call('directoryservices.secrets.has_domain', short_name)
assert secrets_has_domain is False
results = POST("/user/get_user_obj/", {'username': AD_USER})
assert results.status_code != 200, results.text
results = GET('/activedirectory/started/')
assert results.status_code == 200, results.text
assert results.json() is False, results.text
res = make_ws_request(ip, {
'msg': 'method',
'method': 'privilege.query',
'params': [[['name', 'C=', AD_DOMAIN]]]
})
error = res.get('error')
assert error is None, str(error)
assert len(res['result']) == 0, str(res['result'])
def test_08_activedirectory_smb_ops(request):
depends(request, ["ad_works"], scope="session")
with active_directory(dns_timeout=15) as ad:
short_name = ad['dc_info']['Pre-Win2k Domain']
machine_password_key = f'SECRETS/MACHINE_PASSWORD/{short_name}'
running_pwd = call('directoryservices.secrets.dump')[machine_password_key]
db_pwd = call('directoryservices.secrets.get_db_secrets')[f'{hostname.upper()}$'][machine_password_key]
# We've joined and left AD already. Verify secrets still getting backed up correctly.
assert running_pwd == db_pwd
with dataset(
"ad_smb",
{'share_type': 'SMB'},
acl=[{
'tag': 'GROUP',
'id': ad['user_obj']['pw_uid'],
'perms': {'BASIC': 'FULL_CONTROL'},
'flags': {'BASIC': 'INHERIT'},
'type': 'ALLOW'
}]
) as ds:
results = POST("/service/restart/", {"service": "cifs"})
assert results.status_code == 200, results.text
with smb_share(f'/mnt/{ds}', {'name': SMB_NAME}):
with smb_connection(
host=ip,
share=SMB_NAME,
username=ADUSERNAME,
domain='AD02',
password=ADPASSWORD
) as c:
fd = c.create_file('testfile.txt', 'w')
c.write(fd, b'foo')
val = c.read(fd, 0, 3)
c.close(fd, True)
assert val == b'foo'
c.mkdir('testdir')
fd = c.create_file('testdir/testfile2.txt', 'w')
c.write(fd, b'foo2')
val = c.read(fd, 0, 4)
c.close(fd, True)
assert val == b'foo2'
c.rmdir('testdir')
with dataset(
"ad_datasets",
{'share_type': 'SMB'},
acl=[{
'tag': 'GROUP',
'id': ad['user_obj']['pw_uid'],
'perms': {'BASIC': 'FULL_CONTROL'},
'flags': {'BASIC': 'INHERIT'},
'type': 'ALLOW'
}]
) as ds:
with smb_share(f'/mnt/{ds}', {
'name': 'DATASETS',
'purpose': 'NO_PRESET',
'auxsmbconf': 'zfs_core:zfs_auto_create = true',
'path_suffix': '%D/%U'
}):
with smb_connection(
host=ip,
share='DATASETS',
username=ADUSERNAME,
domain='AD02',
password=ADPASSWORD
) as c:
fd = c.create_file('nested_test_file', "w")
c.write(fd, b'EXTERNAL_TEST')
c.close(fd)
results = POST('/filesystem/getacl/', {
'path': os.path.join(f'/mnt/{ds}', 'AD02', ADUSERNAME),
'simplified': True
})
assert results.status_code == 200, results.text
acl = results.json()
assert acl['trivial'] is False, str(acl)
with dataset(
"ad_home",
{'share_type': 'SMB'},
acl=[{
'tag': 'GROUP',
'id': ad['user_obj']['pw_uid'],
'perms': {'BASIC': 'FULL_CONTROL'},
'flags': {'BASIC': 'INHERIT'},
'type': 'ALLOW'
}]
) as ds:
results = POST("/service/restart/", {"service": "cifs"})
assert results.status_code == 200, results.text
with smb_share(f'/mnt/{ds}', {
'name': 'TEST_HOME',
'purpose': 'NO_PRESET',
'home': True,
}):
# must refresh idmap cache to get new homedir from NSS
# this means we may need a few seconds for winbindd
# service to settle down on slow systems (like our CI VMs)
sleep(5)
with smb_connection(
host=ip,
share='HOMES',
username=ADUSERNAME,
domain='AD02',
password=ADPASSWORD
) as c:
fd = c.create_file('homes_test_file', "w")
c.write(fd, b'EXTERNAL_TEST')
c.close(fd)
file_local_path = os.path.join(f'/mnt/{ds}', 'AD02', ADUSERNAME, 'homes_test_file')
results = POST('/filesystem/getacl/', {
'path': file_local_path,
'simplified': True
})
assert results.status_code == 200, results.text
acl = results.json()
assert acl['trivial'] is False, str(acl)
def test_10_account_privilege_authentication(request, set_product_type):
depends(request, ["ad_works"], scope="session")
with active_directory(dns_timeout=15):
call("system.general.update", {"ds_auth": True})
try:
# RID 513 is constant for "Domain Users"
domain_sid = call("idmap.domain_info", AD_DOMAIN.split(".")[0])['sid']
with privilege({
"name": "AD privilege",
"local_groups": [],
"ds_groups": [f"{domain_sid}-513"],
"allowlist": [{"method": "CALL", "resource": "system.info"}],
"web_shell": False,
}):
with client(auth=(f"limiteduser@{AD_DOMAIN}", ADPASSWORD)) as c:
methods = c.call("core.get_methods")
me = c.call("auth.me")
assert 'DIRECTORY_SERVICE' in me['account_attributes']
assert 'ACTIVE_DIRECTORY' in me['account_attributes']
assert "system.info" in methods
assert "pool.create" not in methods
# ADUSERNAME is member of domain admins and will have
# all privileges
with client(auth=(f"{ADUSERNAME}@{AD_DOMAIN}", ADPASSWORD)) as c:
methods = c.call("core.get_methods")
assert "pool.create" in methods
# Alternative formatting for user name <DOMAIN>\<username>.
# this should also work for auth
with client(auth=(AD_USER, ADPASSWORD)) as c:
methods = c.call("core.get_methods")
assert "pool.create" in methods
finally:
call("system.general.update", {"ds_auth": False})
def test_11_secrets_restore(request):
depends(request, ["ad_works"], scope="session")
with active_directory():
assert call('activedirectory.started') is True
ssh('rm /var/db/system/samba4/private/secrets.tdb')
call('service.restart', 'idmap')
with pytest.raises(CallError) as ce:
call('activedirectory.started')
# WBC_ERR_WINBIND_NOT_AVAILABLE gets converted to ENOTCONN
assert 'WBC_ERR_WINBIND_NOT_AVAILABLE' in ce.value.errmsg
call('directoryservices.secrets.restore')
call('service.restart', 'idmap')
call('activedirectory.started')
This diff is collapsed.
#!/usr/bin/env python3
# Author: Eric Turgeon
# License: BSD
# Location for tests into REST API of FreeNAS
import pytest
import sys
import os
import json
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import PUT, POST, GET, DELETE, SSH_TEST, wait_on_job
from auto_config import ip, hostname, password, user
from base64 import b64decode
from middlewared.test.integration.assets.directory_service import active_directory
from middlewared.test.integration.utils import call, ssh
from pytest_dependency import depends
from time import sleep
try:
from config import AD_DOMAIN, ADPASSWORD, ADUSERNAME, ADNameServer, AD_COMPUTER_OU
from config import (
LDAPBASEDN,
LDAPBINDDN,
LDAPBINDPASSWORD,
LDAPHOSTNAME
)
except ImportError:
Reason = 'ADNameServer AD_DOMAIN, ADPASSWORD, or/and ADUSERNAME are missing in config.py"'
pytestmark = pytest.mark.skip(reason=Reason)
BACKENDS = [
"AD",
"AUTORID",
"LDAP",
"NSS",
"RFC2307",
"TDB",
"RID",
]
BACKEND_OPTIONS = None
WORKGROUP = None
nameserver1 = None
nameserver2 = None
job_id = None
dom_id = None
@pytest.fixture(scope="module")
def do_ad_connection(request):
with active_directory(
AD_DOMAIN,
ADUSERNAME,
ADPASSWORD,
netbiosname=hostname,
createcomputer=AD_COMPUTER_OU,
) as ad:
yield (request, ad)
@pytest.mark.dependency(name="AD_IS_HEALTHY")
def test_03_enabling_activedirectory(do_ad_connection):
results = GET('/activedirectory/started/')
assert results.status_code == 200, results.text
def test_04_name_sid_resolution(request):
depends(request, ["AD_IS_HEALTHY"])
# get list of AD group gids for user from NSS
ad_acct = call('user.get_user_obj', {'username': f'{ADUSERNAME}@{AD_DOMAIN}', 'get_groups': True})
groups = set(ad_acct['grouplist'])
# convert list of gids into sids
sids = call('idmap.convert_unixids', [{'id_type': 'GROUP', 'id': x} for x in groups])
sidlist = set([x['sid'] for x in sids['mapped'].values()])
assert len(groups) == len(sidlist)
# convert sids back into unixids
unixids = call('idmap.convert_sids', list(sidlist))
assert set([x['id'] for x in unixids['mapped'].values()]) == groups
@pytest.mark.dependency(name="GATHERED_BACKEND_OPTIONS")
def test_07_get_idmap_backend_options(request):
"""
Create large set of SMB shares for testing registry.
"""
depends(request, ["AD_IS_HEALTHY"])
global BACKEND_OPTIONS
global WORKGROUP
results = GET("/idmap/backend_options")
assert results.status_code == 200, results.text
BACKEND_OPTIONS = results.json()
results = GET("/smb")
assert results.status_code == 200, results.text
WORKGROUP = results.json()['workgroup']
@pytest.mark.parametrize('backend', BACKENDS)
def test_08_test_backend_options(request, backend):
"""
Tests for backend options are performend against
the backend for the domain we're joined to
(DS_TYPE_ACTIVEDIRECTORY) so that auto-detection
works correctly. The three default idmap backends
DS_TYPE_ACTIVEDIRECTORY, DS_TYPE_LDAP,
DS_TYPE_DEFAULT_DOMAIN have hard-coded ids and
so we don't need to look them up.
"""
depends(request, ["GATHERED_BACKEND_OPTIONS"], scope="session")
opts = BACKEND_OPTIONS[backend]['parameters'].copy()
set_secret = False
payload = {
"name": "DS_TYPE_ACTIVEDIRECTORY",
"range_low": "1000000001",
"range_high": "2000000000",
"idmap_backend": backend,
"options": {}
}
payload3 = {"options": {}}
for k, v in opts.items():
"""
Populate garbage data where an opt is required.
This should get us past the first step of
switching to the backend before doing more
comprehensive tests.
"""
if v['required']:
payload["options"].update({k: "canary"})
if backend == 'RFC2307':
payload['options'].update({"ldap_server": "STANDALONE"})
if not payload['options']:
payload.pop('options')
sleep(5)
results = PUT("/idmap/id/1/", payload)
assert results.status_code == 200, f'payload: {payload}, results: {results.text}'
if backend == "AUTORID":
IDMAP_CFG = "idmap config * "
else:
IDMAP_CFG = f"idmap config {WORKGROUP} "
"""
Validate that backend was correctly set in smb.conf.
"""
cmd = f'midclt call smb.getparm "{IDMAP_CFG}: backend" GLOBAL'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
running_backend = results['stdout'].strip()
assert running_backend == backend.lower(), results['output']
if backend == "RID":
"""
sssd_compat generates a lower range based
on murmur3 hash of domain SID. Since we're validating
basic functionilty, checking that our range_low
changed is sufficient for now.
"""
payload2 = {"options": {"sssd_compat": True}}
results = PUT("/idmap/id/1/", payload2)
assert results.status_code == 200, results.text
out = results.json()
assert out['range_low'] != payload['range_low']
elif backend == "AUTORID":
"""
autorid is unique among the idmap backends because
its configuration replaces the default idmap backend
"idmap config *".
"""
payload3["options"] = {
"rangesize": 200000,
"readonly": True,
"ignore_builtin": True,
}
results = PUT("/idmap/id/1/", payload3)
assert results.status_code == 200, results.text
elif backend == "AD":
payload3["options"] = {
"schema_mode": "SFU",
"unix_primary_group": True,
"unix_nss_info": True,
}
results = PUT("/idmap/id/1/", payload3)
assert results.status_code == 200, results.text
elif backend == "LDAP":
payload3["options"] = {
"ldap_base_dn": LDAPBASEDN,
"ldap_user_dn": LDAPBINDDN,
"ldap_url": LDAPHOSTNAME,
"ldap_user_dn_password": LDAPBINDPASSWORD,
"ssl": "ON",
"readonly": True,
}
results = PUT("/idmap/id/1/", payload3)
assert results.status_code == 200, results.text
secret = payload3["options"].pop("ldap_user_dn_password")
set_secret = True
elif backend == "RFC2307":
payload3["options"] = {
"ldap_server": "STANDALONE",
"bind_path_user": LDAPBASEDN,
"bind_path_group": LDAPBASEDN,
"user_cn": True,
"ldap_domain": "",
"ldap_url": LDAPHOSTNAME,
"ldap_user_dn": LDAPBINDDN,
"ldap_user_dn_password": LDAPBINDPASSWORD,
"ssl": "ON",
"ldap_realm": True,
}
results = PUT("/idmap/id/1/", payload3)
assert results.status_code == 200, results.text
r = payload3["options"].pop("ldap_realm")
payload3["options"]["realm"] = r
secret = payload3["options"].pop("ldap_user_dn_password")
set_secret = True
for k, v in payload3['options'].items():
"""
At this point we should have added every supported option
for the current backend. Iterate through each option and verify
that it was written to samba's running configuration.
"""
if k in ['realm', 'ssl']:
continue
cmd = f'midclt call smb.getparm "{IDMAP_CFG}: {k}" GLOBAL'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
if k == 'ldap_url':
v = f'ldaps://{v}'
elif k == 'ldap_domain':
v = None
if v == 'STANDALONE':
v = 'stand-alone'
try:
res = json.loads(results['stdout'].strip())
assert res == v, f"{backend} - [{k}]: {res}"
except json.decoder.JSONDecodeError:
res = results['stdout'].strip()
if isinstance(v, bool):
v = str(v)
assert v.casefold() == res.casefold(), f"{backend} - [{k}]: {res}"
if set_secret:
"""
API calls that set an idmap secret should result in the
secret being written to secrets.tdb in Samba's private
directory. To check this, force a secrets db dump, check
for keys, then decode secret.
"""
idmap_secret = call('directoryservices.secrets.get_ldap_idmap_secret', WORKGROUP, LDAPBINDDN)
db_secrets = call('directoryservices.secrets.get_db_secrets')[f'{hostname.upper()}$']
# Check that our secret is written and stored in secrets backup correctly
assert idmap_secret == db_secrets[f"SECRETS/GENERIC/IDMAP_LDAP_{WORKGROUP}/{LDAPBINDDN}"]
decoded_sec = b64decode(idmap_secret).rstrip(b'\x00').decode()
assert secret == decoded_sec, idmap_secret
# Use net command via samba to rewrite secret and make sure it is same
ssh(f"net idmap set secret {WORKGROUP} '{secret}'")
new_idmap_secret = call('directoryservices.secrets.get_ldap_idmap_secret', WORKGROUP, LDAPBINDDN)
assert idmap_secret == new_idmap_secret
secrets_dump = call('directoryservices.secrets.dump')
assert secrets_dump == db_secrets
# reset idmap backend to RID to ensure that winbindd is running
payload = {
"name": "DS_TYPE_ACTIVEDIRECTORY",
"range_low": "1000000001",
"range_high": "2000000000",
"idmap_backend": 'RID',
"options": {}
}
sleep(5)
results = PUT("/idmap/id/1/", payload)
assert results.status_code == 200, results.text
def test_09_clear_idmap_cache(request):
depends(request, ["AD_IS_HEALTHY"])
results = GET("/idmap/clear_idmap_cache")
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_10_idmap_overlap_fail(request):
"""
It should not be possible to set an idmap range for a new
domain that overlaps an existing one.
"""
depends(request, ["AD_IS_HEALTHY"])
payload = {
"name": "canary",
"range_low": "20000",
"range_high": "2000000000",
"idmap_backend": "RID",
"options": {}
}
results = POST("/idmap/", payload)
assert results.status_code == 422, results.text
def test_11_idmap_default_domain_name_change_fail(request):
"""
It should not be possible to change the name of a
default idmap domain.
"""
depends(request, ["AD_IS_HEALTHY"])
payload = {
"name": "canary",
"range_low": "1000000000",
"range_high": "2000000000",
"idmap_backend": "RID",
"options": {}
}
results = PUT("/idmap/id/1", payload)
assert results.status_code == 422, results.text
def test_12_idmap_low_high_range_inversion_fail(request):
"""
It should not be possible to set an idmap low range
that is greater than its high range.
"""
depends(request, ["AD_IS_HEALTHY"])
payload = {
"name": "canary",
"range_low": "2000000000",
"range_high": "1900000000",
"idmap_backend": "RID",
}
results = POST("/idmap/", payload)
assert results.status_code == 422, results.text
@pytest.mark.dependency(name="CREATED_NEW_DOMAIN")
def test_13_idmap_new_domain(request):
depends(request, ["AD_IS_HEALTHY"], scope="session")
global dom_id
cmd = 'midclt call idmap.get_next_idmap_range'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
low, high = json.loads(results['stdout'].strip())
payload = {
"name": "canary",
"range_low": low,
"range_high": high,
"idmap_backend": "RID",
"options": {},
}
results = POST("/idmap/", payload)
assert results.status_code == 200, results.text
dom_id = results.json()['id']
def test_14_idmap_new_domain_duplicate_fail(request):
"""
It should not be possible to create a new domain that
has a name conflict with an existing one.
"""
depends(request, ["AD_IS_HEALTHY"], scope="session")
cmd = 'midclt call idmap.get_next_idmap_range'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
low, high = json.loads(results['stdout'].strip())
payload = {
"name": "canary",
"range_low": low,
"range_high": high,
"idmap_backend": "RID",
}
results = POST("/idmap/", payload)
assert results.status_code == 422, results.text
def test_15_idmap_new_domain_autorid_fail(request):
"""
It should only be possible to set AUTORID on
default domain.
"""
depends(request, ["CREATED_NEW_DOMAIN"])
payload = {
"idmap_backend": "AUTORID",
}
results = PUT(f"/idmap/id/{dom_id}", payload)
assert results.status_code == 422, f"[update: {dom_id}]: {results.text}"
def test_16_idmap_delete_new_domain(request):
"""
It should only be possible to set AUTORID on
default domain.
"""
depends(request, ["CREATED_NEW_DOMAIN"])
results = DELETE(f"/idmap/id/{dom_id}")
assert results.status_code == 200, f"[delete: {dom_id}]: {results.text}"
from contextlib import contextmanager
import os
import sys
import pytest
from middlewared.test.integration.assets.pool import dataset
from middlewared.test.integration.utils import call
apifolder = os.getcwd()
sys.path.append(apifolder)
from middlewared.test.integration.assets.directory_service import active_directory, ldap
from auto_config import ip, hostname, password, pool_name, user, ha
from functions import GET, POST, PUT, SSH_TEST, make_ws_request, wait_on_job
from protocols import nfs_share, SSH_NFS
from pytest_dependency import depends
try:
from config import AD_DOMAIN, ADPASSWORD, ADUSERNAME, ADNameServer, AD_COMPUTER_OU
except ImportError:
Reason = 'ADNameServer AD_DOMAIN, ADPASSWORD, or/and ADUSERNAME are missing in config.py"'
pytestmark = pytest.mark.skip(reason=Reason)
pytestmark = pytest.mark.skip("LDAP KRB5 NFS tests disabled pending CI framework changes")
test_perms = {
"READ_DATA": True,
"WRITE_DATA": True,
"EXECUTE": True,
"APPEND_DATA": True,
"DELETE_CHILD": False,
"DELETE": True,
"READ_ATTRIBUTES": True,
"WRITE_ATTRIBUTES": True,
"READ_NAMED_ATTRS": True,
"WRITE_NAMED_ATTRS": True,
"READ_ACL": True,
"WRITE_ACL": True,
"WRITE_OWNER": True,
"SYNCHRONIZE": False,
}
test_flags = {
"FILE_INHERIT": True,
"DIRECTORY_INHERIT": True,
"INHERIT_ONLY": False,
"NO_PROPAGATE_INHERIT": False,
"INHERITED": False
}
@pytest.fixture(scope="module")
def kerberos_config(request):
# DNS in automation domain is often broken.
# Setting rdns helps to pass this
results = PUT("/kerberos/", {"libdefaults_aux": "rdns = false"})
assert results.status_code == 200, results.text
results = PUT("/nfs/", {"v4_krb": True})
assert results.status_code == 200, results.text
try:
yield (request, results.json())
finally:
results = POST('/service/stop/', {'service': 'nfs'})
assert results.status_code == 200, results.text
results = PUT("/nfs/", {"v4_krb": False})
assert results.status_code == 200, results.text
results = PUT("/kerberos/", {"libdefaults_aux": ""})
assert results.status_code == 200, results.text
@pytest.fixture(scope="module")
def do_ad_connection(request):
with active_directory(
AD_DOMAIN,
ADUSERNAME,
ADPASSWORD,
netbiosname=hostname,
createcomputer=AD_COMPUTER_OU,
) as ad:
yield (request, ad)
@contextmanager
def stop_activedirectory(request):
results = PUT("/activedirectory/", {"enable": False})
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
try:
yield results.json()
finally:
results = PUT("/activedirectory/", {"enable": True})
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
@pytest.fixture(scope="module")
def do_ldap_connection(request):
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.keytab.kerberos_principal_choices',
'params': [],
})
error = res.get('error')
assert error is None, str(error)
kerberos_principal = res['result'][0]
results = GET("/kerberos/realm/")
assert results.status_code == 200, results.text
realm_id = results.json()[0]['id']
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos._klist_test',
'params': [],
})
error = res.get('error')
assert error is None, str(error)
assert res['result'] is True
results = POST("/activedirectory/domain_info/", AD_DOMAIN)
assert results.status_code == 200, results.text
domain_info = results.json()
with stop_activedirectory(request) as ad:
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.get_cred',
'params': [{
'dstype': 'DS_TYPE_LDAP',
'conf': {
'kerberos_realm': realm_id,
'kerberos_principal': kerberos_principal,
}
}],
})
error = res.get('error')
assert error is None, str(error)
cred = res['result']
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.do_kinit',
'params': [{
'krb5_cred': cred,
'kinit-options': {
'kdc_override': {
'domain': AD_DOMAIN.upper(),
'kdc': domain_info['KDC server']
},
}
}],
})
error = res.get('error')
assert error is None, str(error)
with ldap(
domain_info['Bind Path'],
'', '', f'{domain_info["LDAP server name"].upper()}.',
has_samba_schema=False,
ssl="OFF",
kerberos_realm=realm_id,
kerberos_principal=kerberos_principal,
validate_certificates=False,
enable=True
) as ldap_conn:
yield (request, ldap_conn)
@pytest.fixture(scope="module")
def setup_nfs_share(request):
results = POST("/user/get_user_obj/", {'username': f'{ADUSERNAME}@{AD_DOMAIN}'})
assert results.status_code == 200, results.text
target_uid = results.json()['pw_uid']
target_acl = [
{'tag': 'owner@', 'id': -1, 'perms': test_perms, 'flags': test_flags, 'type': 'ALLOW'},
{'tag': 'group@', 'id': -1, 'perms': test_perms, 'flags': test_flags, 'type': 'ALLOW'},
{'tag': 'everyone@', 'id': -1, 'perms': test_perms, 'flags': test_flags, 'type': 'ALLOW'},
{'tag': 'USER', 'id': target_uid, 'perms': test_perms, 'flags': test_flags, 'type': 'ALLOW'},
]
with dataset(
'NFSKRB5',
{'acltype': 'NFSV4'},
acl=target_acl
) as ds:
with nfs_share(f'/mnt/{ds}', options={
'comment': 'KRB Functional Test Share',
'security': ['KRB5', 'KRB5I', 'KRB5P'],
}) as share:
yield (request, {'share': share, 'uid': target_uid})
@pytest.mark.dependency(name="AD_CONFIGURED")
def test_02_enabling_activedirectory(do_ad_connection):
results = GET('/activedirectory/started/')
assert results.status_code == 200, results.text
assert results.json() is True, results.text
results = GET('/activedirectory/get_state/')
assert results.status_code == 200, results.text
assert results.json() == 'HEALTHY', results.text
def test_03_kerberos_nfs4_spn_add(kerberos_config):
depends(kerberos_config[0], ["AD_CONFIGURED"], scope="session")
assert kerberos_config[1]['v4_krb_enabled']
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.keytab.has_nfs_principal',
'params': [],
})
error = res.get('error')
assert error is None, str(error)
assert res['result'] is False
res = make_ws_request(ip, {
'msg': 'method',
'method': 'nfs.add_principal',
'params': [{
'username': ADUSERNAME,
'password': ADPASSWORD
}],
})
error = res.get('error')
assert error is None, str(error)
assert res['result'] is True
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.keytab.has_nfs_principal',
'params': [],
})
error = res.get('error')
assert error is None, str(error)
assert res['result'] is True
results = POST('/service/reload/', {'service': 'idmap'})
assert results.status_code == 200, results.text
results = POST('/service/restart/', {'service': 'ssh'})
assert results.status_code == 200, results.text
@pytest.mark.dependency(name="AD_LDAP_USER_CCACHE")
def test_05_kinit_as_ad_user(setup_nfs_share):
"""
Set up an NFS share and ensure that permissions are
set correctly to allow writes via out test user.
This test does kinit as our test user so that we have
kerberos ticket that we will use to verify NFS4 + KRB5
work correctly.
"""
depends(setup_nfs_share[0], ["AD_CONFIGURED"], scope="session")
kinit_opts = {'ccache': 'USER', 'ccache_uid': setup_nfs_share[1]['uid']}
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.get_cred',
'params': [{
'dstype': 'DS_TYPE_ACTIVEDIRECTORY',
'conf': {
'domainname': AD_DOMAIN,
'bindname': ADUSERNAME,
'bindpw': ADPASSWORD,
}
}],
})
error = res.get('error')
assert error is None, str(error)
cred = res['result']
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.do_kinit',
'params': [{
'krb5_cred': cred,
'kinit-options': kinit_opts
}],
})
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos._klist_test',
'params': [kinit_opts],
})
error = res.get('error')
assert error is None, str(error)
assert res['result'] is True
results = POST('/service/restart/', {'service': 'nfs'})
assert results.status_code == 200, results.text
if not ha:
"""
we skip this test for a myriad of profoundly complex reasons
on our HA pipeline. If you cherish your sanity, don't try to
understand, just accept and move on :)
"""
def test_06_krb5nfs_ops_with_ad(request):
my_fqdn = f'{hostname.strip()}.{AD_DOMAIN}'
res = make_ws_request(ip, {
'msg': 'method',
'method': 'dnsclient.forward_lookup',
'params': [{'names': [my_fqdn]}],
})
error = res.get('error')
assert error is None, str(error)
addresses = [rdata['address'] for rdata in res['result']]
assert ip in addresses
"""
The following creates a loopback mount using our kerberos
keytab (AD computer account) and then performs ops via SSH
using a limited AD account for which we generated a kerberos
ticket above. Due to the odd nature of this setup, the loopback
mount gets mapped as the guest account on the NFS server.
This is fine for our purposes as we're validating that
sec=krb5 works.
"""
userobj = call('user.get_user_obj', {'username': f'{ADUSERNAME}@{AD_DOMAIN}'})
groupobj = call('group.get_group_obj', {'gid': userobj['pw_gid']})
call('ssh.update', {"password_login_groups": [groupobj['gr_name']]})
with SSH_NFS(
my_fqdn,
f'/mnt/{pool_name}/NFSKRB5',
vers=4,
mount_user=user,
mount_password=password,
ip=ip,
kerberos=True,
user=ADUSERNAME,
password=ADPASSWORD,
) as n:
n.create('testfile')
n.mkdir('testdir')
contents = n.ls('.')
assert 'testdir' in contents
assert 'testfile' in contents
file_acl = n.getacl('testfile')
for idx, ace in enumerate(file_acl):
assert ace['perms'] == test_perms, str(ace)
dir_acl = n.getacl('testdir')
for idx, ace in enumerate(dir_acl):
assert ace['perms'] == test_perms, str(ace)
assert ace['flags'] == test_flags, str(ace)
n.unlink('testfile')
n.rmdir('testdir')
contents = n.ls('.')
assert 'testdir' not in contents
assert 'testfile' not in contents
@pytest.mark.dependency(name="SET_UP_AD_VIA_LDAP")
def test_07_setup_and_enabling_ldap(do_ldap_connection):
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.stop',
'params': [],
})
error = res.get('error')
assert error is None, str(error)
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.start',
'params': [],
})
error = res.get('error')
assert error is None, str(error)
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos._klist_test',
'params': [],
})
error = res.get('error')
assert error is None, str(error)
assert res['result'] is True
# Verify that our NFS kerberos principal is
# still present
res = make_ws_request(ip, {
'msg': 'method',
'method': 'kerberos.keytab.has_nfs_principal',
'params': [],
})
error = res.get('error')
assert error is None, str(error)
assert res['result'] is True
def test_08_verify_ldap_users(request):
depends(request, ["SET_UP_AD_VIA_LDAP"], scope="session")
results = GET('/user', payload={
'query-filters': [['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) > 0, results.text
results = GET('/group', payload={
'query-filters': [['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) > 0, results.text
#!/usr/bin/env python3
# License: BSD
import pytest
import sys
import os
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import PUT, POST, GET, SSH_TEST, wait_on_job
from auto_config import ip, hostname, password, user
from pytest_dependency import depends
from middlewared.test.integration.assets.account import user as create_user
from middlewared.test.integration.assets.directory_service import active_directory
from middlewared.test.integration.utils import call
try:
from config import AD_DOMAIN, ADPASSWORD, ADUSERNAME, ADNameServer, AD_COMPUTER_OU
except ImportError:
Reason = 'ADNameServer AD_DOMAIN, ADPASSWORD, or/and ADUSERNAME are missing in config.py"'
pytestmark = pytest.mark.skip(reason=Reason)
WINBIND_SEPARATOR = "\\"
@pytest.fixture(scope="module")
def do_ad_connection(request):
with active_directory(
AD_DOMAIN,
ADUSERNAME,
ADPASSWORD,
netbiosname=hostname,
createcomputer=AD_COMPUTER_OU,
) as ad:
yield (request, ad)
@pytest.mark.dependency(name="AD_IS_HEALTHY")
def test_03_enabling_activedirectory(do_ad_connection):
global WORKGROUP
results = GET('/activedirectory/started/')
assert results.status_code == 200, results.text
results = GET("/smb")
assert results.status_code == 200, results.text
WORKGROUP = results.json()['workgroup']
@pytest.mark.dependency(name="INITIAL_CACHE_FILL")
def test_06_wait_for_cache_fill(request):
"""
Local user/group cache fill is a backgrounded task.
Wait for it to successfully complete.
"""
depends(request, ["AD_IS_HEALTHY"])
results = GET(f'/core/get_jobs/?method=activedirectory.fill_cache')
job_status = wait_on_job(results.json()[-1]['id'], 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
@pytest.mark.dependency(name="AD_USERS_CACHED")
def test_07_check_for_ad_users(request):
"""
This test validates that we can query AD users using
filter-option {"extra": {"search_dscache": True}}
"""
depends(request, ["INITIAL_CACHE_FILL"], scope="session")
cmd = "wbinfo -u"
results = SSH_TEST(cmd, user, password, ip)
assert results['result'], str(results['output'])
wbinfo_entries = results['stdout'].splitlines()
results = GET('/user', payload={
'query-filters': [['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) > 0, results.text
cache_names = [x['username'] for x in results.json()]
for entry in wbinfo_entries:
assert entry in cache_names, str(cache_names)
@pytest.mark.dependency(name="AD_GROUPS_CACHED")
def test_08_check_for_ad_groups(request):
"""
This test validates that we can query AD groups using
filter-option {"extra": {"search_dscache": True}}
"""
depends(request, ["INITIAL_CACHE_FILL"], scope="session")
cmd = "wbinfo -g"
results = SSH_TEST(cmd, user, password, ip)
assert results['result'], str(results['output'])
wbinfo_entries = results['stdout'].splitlines()
results = GET('/group', payload={
'query-filters': [['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) > 0, results.text
cache_names = [x['name'] for x in results.json()]
for entry in wbinfo_entries:
assert entry in cache_names, str(cache_names)
@pytest.mark.dependency(name="REBUILD_AD_CACHE")
def test_09_check_directoryservices_cache_refresh(request):
"""
This test validates that middleware can successfully rebuild the
directory services cache from scratch using the public API.
This currently happens once per 24 hours. Result of failure here will
be lack of users/groups visible in webui.
"""
depends(request, ["AD_USERS_CACHED", "AD_GROUPS_CACHED"], scope="session")
rebuild_ok = False
"""
Cache resides in tdb files. Remove the files to clear cache.
"""
cmd = 'rm -f /root/tdb/persistent/*'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
"""
directoryservices.cache_refresh job causes us to rebuild / refresh
LDAP / AD users.
"""
results = GET('/directoryservices/cache_refresh/')
assert results.status_code == 200, results.text
if results.status_code == 200:
refresh_job = results.json()
job_status = wait_on_job(refresh_job, 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
if job_status['state'] == 'SUCCESS':
rebuild_ok = True
"""
Verify that the AD user / group cache was rebuilt successfully.
"""
if rebuild_ok:
results = GET('/group', payload={
'query-filters': [['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) > 0, results.text
results = GET('/user', payload={
'query-filters': [['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) > 0, results.text
@pytest.mark.dependency(name="LAZY_INITIALIZATION_BY_NAME")
def test_10_check_lazy_initialization_of_users_and_groups_by_name(request):
"""
When users explicitly search for a directory service or other user
by name or id we should hit pwd and grp modules and synthesize a
result if the user / group is not in the cache. This special behavior
only occurs when single filter of "name =" or "id =". So after the
initial query that should result in insertion, we add a second filter
to only hit the cache. Code paths are slightly different for lookups
by id or by name and so they are tested separately.
"""
depends(request, ["REBUILD_AD_CACHE"], scope="session")
global ad_user_id
global ad_domain_users_id
domain_prefix = f'{WORKGROUP.upper()}{WINBIND_SEPARATOR}'
ad_user = f'{domain_prefix}{ADUSERNAME.lower()}'
ad_group = f'{domain_prefix}domain users'
cmd = 'rm -f /root/tdb/persistent/*'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
if not results['result']:
return
results = GET('/user', payload={
'query-filters': [['username', '=', ad_user]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) > 0, results.text
if len(results.json()) == 0:
return
ad_user_id = results.json()[0]['uid']
assert results.json()[0]['username'] == ad_user, results.text
results = GET('/group', payload={
'query-filters': [['name', '=', ad_group]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) > 0, results.text
if len(results.json()) == 0:
return
ad_domain_users_id = results.json()[0]['gid']
assert results.json()[0]['name'] == ad_group, results.text
"""
The following two tests validate that cache insertion occured.
"""
results = GET('/user', payload={
'query-filters': [['username', '=', ad_user], ['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) == 1, results.text
results = GET('/group', payload={
'query-filters': [['name', '=', ad_group], ['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) == 1, results.text
@pytest.mark.dependency(name="LAZY_INITIALIZATION_BY_ID")
def test_11_check_lazy_initialization_of_users_and_groups_by_id(request):
"""
When users explicitly search for a directory service or other user
by name or id we should hit pwd and grp modules and synthesize a
result if the user / group is not in the cache. This special behavior
only occurs when single filter of "name =" or "id =". So after the
initial query that should result in insertion, we add a second filter
to only hit the cache. Code paths are slightly different for lookups
by id or by name and so they are tested separately.
"""
depends(request, ["LAZY_INITIALIZATION_BY_NAME"], scope="session")
cmd = 'rm -f /root/tdb/persistent/*'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
if not results['result']:
return
results = GET('/user', payload={
'query-filters': [['uid', '=', ad_user_id]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert results.json()[0]['uid'] == ad_user_id, results.text
results = GET('/group', payload={
'query-filters': [['gid', '=', ad_domain_users_id]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert results.json()[0]['gid'] == ad_domain_users_id, results.text
"""
The following two tests validate that cache insertion occured.
"""
results = GET('/user', payload={
'query-filters': [['uid', '=', ad_user_id], ['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) == 1, results.text
results = GET('/group', payload={
'query-filters': [['gid', '=', ad_domain_users_id], ['local', '=', False]],
'query-options': {'extra': {"search_dscache": True}},
})
assert results.status_code == 200, results.text
assert len(results.json()) == 1, results.text
with create_user({
'username': 'canary',
'full_name': 'canary',
'group_create': True,
'password': 'canary',
'smb': True
}) as u:
user_data = call('user.translate_username', 'canary')
assert u['id'] == user_data['id'], str(user_data)
assert u['uid'] == user_data['uid'], str(user_data)
assert user_data['local'] is True, str(user_data)
#!/usr/bin/env python3
import pytest
import os
import sys
from pytest_dependency import depends
from time import sleep
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import GET, POST, SSH_TEST
from auto_config import ip, password, user, pool_name
from middlewared.test.integration.utils import call
def test_01_get_alert_list():
results = GET("/alert/list/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
def test_02_get_alert_list_categories():
results = GET("/alert/list_categories/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
assert results.json(), results.json()
def test_03_get_alert_list_policies():
results = GET("/alert/list_policies/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
assert results.json(), results.json()
@pytest.mark.dependency(name='degrade_pool')
def test_04_degrading_a_pool_to_create_an_alert(request):
global gptid
get_pool = GET(f"/pool/?name={pool_name}").json()[0]
id_path = '/dev/disk/by-partuuid/'
gptid = get_pool['topology']['data'][0]['path'].replace(id_path, '')
cmd = f'zinject -d {gptid} -A fault {pool_name}'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
def test_05_verify_the_pool_is_degraded(request):
depends(request, ['degrade_pool'], scope="session")
cmd = f'zpool status {pool_name} | grep {gptid}'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
assert 'DEGRADED' in results['output'], results['output']
@pytest.mark.timeout(120)
def test_06_wait_for_the_alert_and_get_the_id(request):
depends(request, ["degrade_pool"], scope="session")
global alert_id
call("alert.process_alerts")
while True:
for line in GET("/alert/list/").json():
if (
line['source'] == 'VolumeStatus' and
line['args']['volume'] == pool_name and
line['args']['state'] == 'DEGRADED'
):
alert_id = line['id']
return
sleep(1)
def test_08_dimiss_the_alert(request):
depends(request, ["degrade_pool"], scope="session")
results = POST("/alert/dismiss/", alert_id)
assert results.status_code == 200, results.text
assert isinstance(results.json(), type(None)), results.text
def test_09_verify_the_alert_is_dismissed(request):
depends(request, ["degrade_pool"], scope="session")
results = GET("/alert/list/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
for line in results.json():
if line['id'] == alert_id:
assert line['dismissed'] is True, results.text
break
def test_10_restore_the_alert(request):
depends(request, ["degrade_pool"], scope="session")
results = POST("/alert/restore/", alert_id)
assert results.status_code == 200, results.text
assert isinstance(results.json(), type(None)), results.text
def test_11_verify_the_alert_is_restored(request):
depends(request, ["degrade_pool"], scope="session")
results = GET(f"/alert/list/?id={alert_id}")
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
for line in results.json():
if line['id'] == alert_id:
assert line['dismissed'] is False, results.text
break
def test_12_clear_the_pool_degradation(request):
depends(request, ["degrade_pool"], scope="session")
cmd = f'zpool clear {pool_name}'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
def test_13_verify_the_pool_is_not_degraded(request):
depends(request, ["degrade_pool"], scope="session")
cmd = f'zpool status {pool_name} | grep {gptid}'
results = SSH_TEST(cmd, user, password, ip)
assert results['result'] is True, results['output']
assert 'DEGRADED' not in results['output'], results['output']
@pytest.mark.timeout(120)
def test_14_wait_for_the_alert_to_disappear(request):
depends(request, ["degrade_pool"], scope="session")
while True:
if alert_id not in GET("/alert/list/").text:
assert True
break
sleep(1)
#!/usr/bin/env python3
import pytest
import os
import sys
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import GET, POST, PUT, DELETE
def test_01_get_alertservice():
results = GET("/alertservice/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
def test_02_get_alertservice_list_types():
results = GET("/alertservice/list_types/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
assert results.json(), results.text
def test_03_create_an_alertservice():
global alertservice_id, payload, results
payload = {
"name": "Critical Email Test",
"type": "Mail",
"attributes": {
"email": "eric.spam@ixsystems.com"
},
"level": "CRITICAL",
"enabled": True
}
results = POST("/alertservice/", payload)
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
alertservice_id = results.json()['id']
@pytest.mark.parametrize('data', ["name", "type", "attributes", "level", "enabled"])
def test_04_verify_the_alertservice_creation_results(data):
assert results.json()[data] == payload[data], results.text
def test_05_get_alertservice_with_id():
global results
results = GET(f"/alertservice/id/{alertservice_id}")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
@pytest.mark.parametrize('data', ["name", "type", "attributes", "level", "enabled"])
def test_06_verify_the_id_alertservice_results(data):
assert results.json()[data] == payload[data], results.text
def test_07_change_config_to_alertservice_id():
global alertservice_id, payload, results
payload = {
"name": "Warning Email Test",
"type": "Mail",
"attributes": {
"email": "william.spam@ixsystems.com@"
},
"level": "WARNING",
"enabled": False
}
results = PUT(f"/alertservice/id/{alertservice_id}", payload)
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
@pytest.mark.parametrize('data', ["name", "type", "attributes", "level", "enabled"])
def test_08_verify_the_alertservice_changes_results(data):
assert results.json()[data] == payload[data], results.text
def test_09_get_alertservice_changes_with_id():
global results
results = GET(f"/alertservice/id/{alertservice_id}")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
@pytest.mark.parametrize('data', ["name", "type", "attributes", "level", "enabled"])
def test_10_verify_the_id_alertservice_changes_results(data):
assert results.json()[data] == payload[data], results.text
def test_11_delete_alertservice():
results = DELETE(f"/alertservice/id/{alertservice_id}")
assert results.status_code == 200, results.text
def test_12_verify_alertservice_is_delete():
results = GET(f"/alertservice/id/{alertservice_id}")
assert results.status_code == 404, results.text
#!/usr/bin/env python3
# Author: Eric Turgeon
# License: BSD
import pytest
import sys
import os
from time import time, sleep
from pytest_dependency import depends
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import GET
@pytest.mark.dependency(name='BOOT_DISKS')
def test_01_get_boot_disks():
results = GET('/boot/get_disks/')
assert results.status_code == 200, results.text
disks = results.json()
assert isinstance(disks, list) is True, results.text
assert disks, results.text
@pytest.mark.dependency(name='BOOT_STATE')
def test_02_get_boot_state(request):
depends(request, ['BOOT_DISKS'])
results = GET('/boot/get_state/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict) is True, results.text
global boot_state
boot_state = results.json()
@pytest.mark.dependency(name='BOOT_SCRUB')
def test_03_get_boot_scrub(request):
depends(request, ['BOOT_STATE'])
global JOB_ID
results = GET('/boot/scrub/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), int) is True, results.text
JOB_ID = results.json()
def test_04_verify_boot_scrub_job(request):
depends(request, ['BOOT_SCRUB'])
stop_time = time() + 600
while True:
get_job = GET(f'/core/get_jobs/?id={JOB_ID}')
job_status = get_job.json()[0]
if job_status['state'] in ('RUNNING', 'WAITING'):
if stop_time <= time():
assert False, "Job Timeout\n\n" + get_job.text
break
sleep(5)
else:
assert job_status['state'] == 'SUCCESS', get_job.text
break
import sys
import os
from time import sleep
from unittest.mock import ANY
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import POST, DELETE, GET, PUT, wait_on_job
def test_01_get_the_activated_bootenv():
global active_be_id
results = GET('/bootenv/?activated=True')
assert results.status_code == 200, results.text
active_be_id = results.json()[0]['id']
def test_02_create_be_duplicate_name():
payload = {"name": active_be_id, "source": active_be_id}
results = POST("/bootenv/", payload)
assert results.status_code == 422, results.text
assert results.json() == {"bootenv_create.name": ANY}
def test_02_creating_a_new_boot_environment_from_the_active_boot_environment():
payload = {"name": "bootenv01", "source": active_be_id}
results = POST("/bootenv/", payload)
assert results.status_code == 200, results.text
sleep(1)
def test_03_look_new_bootenv_is_created():
assert len(GET('/bootenv?name=bootenv01').json()) == 1
def test_04_activate_bootenv01():
results = POST("/bootenv/id/bootenv01/activate/", None)
assert results.status_code == 200, results.text
# Update tests
def test_05_cloning_a_new_boot_environment():
payload = {"name": "bootenv02", "source": "bootenv01"}
results = POST("/bootenv/", payload)
assert results.status_code == 200, results.text
sleep(1)
def test_06_activate_bootenv02():
payload = None
results = POST("/bootenv/id/bootenv02/activate/", payload)
assert results.status_code == 200, results.text
def test_07_change_boot_environment_name():
payload = {"name": "bootenv03"}
results = PUT("/bootenv/id/bootenv01/", payload)
assert results.status_code == 200, results.text
def test_08_set_keep_attribute_true():
payload = {"keep": True}
results = POST("/bootenv/id/bootenv03/set_attribute/", payload)
assert results.status_code == 200, results.text
def test_09_activate_bootenv03():
payload = None
results = POST("/bootenv/id/bootenv03/activate/", payload)
assert results.status_code == 200, results.text
# Delete tests
def test_10_removing_a_boot_environment_02():
global job_id
results = DELETE("/bootenv/id/bootenv02/")
assert results.status_code == 200, results.text
job_id = results.json()
def test_11_verify_the_removing_be_job_is_successfull(request):
job_status = wait_on_job(job_id, 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
def test_12_set_keep_attribute_true():
payload = {"keep": False}
results = POST("/bootenv/id/bootenv03/set_attribute/", payload)
assert results.status_code == 200, results.text
def test_13_activate_default():
payload = None
results = POST(f"/bootenv/id/{active_be_id}/activate/", payload)
assert results.status_code == 200, results.text
def test_14_removing_a_boot_environment_03():
global job_id
results = DELETE("/bootenv/id/bootenv03/")
assert results.status_code == 200, results.text
job_id = results.json()
def test_15_verify_the_removing_be_job_is_successfull(request):
job_status = wait_on_job(job_id, 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
#!/usr/bin/env python3
# Author: Eric Turgeon
# License: BSD
import pytest
import re
import sys
import os
from time import sleep
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import GET, DELETE, POST
try:
from config import (
LDAPBASEDN,
LDAPBINDDN,
LDAPBINDPASSWORD,
LDAPHOSTNAME,
)
except ImportError:
Reason = 'LDAP* variable are not setup in config.py'
# comment pytestmark for development testing with --dev-test
pytestmark = pytest.mark.skipif(True, reason=Reason)
def test_01_get_certificate_query():
results = GET('/certificate/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
def test_create_idmap_certificate():
global certificate_id, idmap_id
payload = {
'name': 'BOB',
'range_low': 1000,
'range_high': 2000,
'certificate': 1,
"idmap_backend": "RFC2307",
'options': {
"ldap_server": "STANDALONE",
"bind_path_user": LDAPBASEDN,
"bind_path_group": LDAPBASEDN,
"ldap_url": LDAPHOSTNAME,
"ldap_user_dn": LDAPBINDDN,
"ldap_user_dn_password": LDAPBINDPASSWORD,
"ssl": "ON",
"ldap_realm": False,
}
}
results = POST('/idmap/', payload)
assert results.status_code == 200, results.text
idmap_id = results.json()['id']
certificate_id = results.json()['certificate']['id']
def test_02_delete_used_certificate():
global job_id
results = DELETE(f'/certificate/id/{certificate_id}/', True)
assert results.status_code == 200, results.text
job_id = int(results.text)
def test_03_verify_certificate_delete_failed():
while True:
get_job = GET(f'/core/get_jobs/?id={job_id}')
assert get_job.status_code == 200, get_job.text
job_status = get_job.json()[0]
if job_status['state'] in ('RUNNING', 'WAITING'):
sleep(5)
else:
assert job_status['state'] == 'FAILED', get_job.text
assert bool(re.search(
r'Certificate is being used by following service.*IDMAP', job_status['error'], flags=re.DOTALL
)) is True, job_status['error']
break
def test_04_delete_idmap():
results = DELETE(f'/idmap/id/{idmap_id}/')
assert results.status_code == 200, results.text
#!/usr/bin/env python3
# Author: Eric Turgeon
# License: BSD
import pytest
import sys
import os
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import GET
def test_01_get_certificateauthority_query():
results = GET('/certificateauthority/')
assert results.status_code == 200, results.text
assert isinstance(results.json(), list), results.text
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment