test_002_ldap.py 9.82 KB
import pytest

from config import CLUSTER_INFO, CLUSTER_IPS, CLUSTER_LDAP, PUBLIC_IPS
from utils import make_request, make_ws_request, ssh_test, wait_on_job
from exceptions import JobTimeOut
from pytest_dependency import depends
from helpers import smb_connection
from samba import ntstatus, NTSTATUSError

SHARE_FUSE_PATH = f'CLUSTER:{CLUSTER_INFO["GLUSTER_VOLUME"]}/ds_smb_share_02'


@pytest.mark.parametrize('ip', CLUSTER_IPS)
@pytest.mark.dependency(name='VALID_SMB_BIND_IPS')
def test_001_validate_smb_bind_ips(ip, request):
    url = f'http://{ip}/api/v2.0/smb/bindip_choices'
    res = make_request('get', url)
    assert res.status_code == 200, res.text

    smb_ip_set = set(res.json().values())
    cluster_ip_set = set(PUBLIC_IPS)
    assert smb_ip_set == cluster_ip_set, res.text


@pytest.mark.parametrize('ip', CLUSTER_IPS)
@pytest.mark.dependency(name='DS_LDAP_NETWORK_CONFIGURED')
def test_002_validate_network_configuration(ip, request):
    depends(request, ['VALID_SMB_BIND_IPS'])

    url = f'http://{ip}/api/v2.0/network/configuration/'
    res = make_request('get', url)
    assert res.status_code == 200, res.text

    data = res.json()
    assert data['nameserver1'] == CLUSTER_INFO['DNS1']
    assert data['ipv4gateway'] == CLUSTER_INFO['DEFGW']


@pytest.mark.dependency(name="BOUND_LDAP")
def test_003_bind_ldap(request):
    depends(request, ['DS_LDAP_NETWORK_CONFIGURED'])

    payload = {
        "hostname": [CLUSTER_LDAP['HOSTNAME']],
        "basedn": CLUSTER_LDAP['BASEDN'],
        "binddn": CLUSTER_LDAP['BINDDN'],
        "bindpw": CLUSTER_LDAP['BINDPW'],
        "ssl": "ON",
        "enable": True
    }
    url = f'http://{CLUSTER_IPS[0]}/api/v2.0/ldap/'
    res = make_request('put', url, data=payload)
    assert res.status_code == 200, res.text

    try:
        status = wait_on_job(res.json()['job_id'], CLUSTER_IPS[0], 300)
    except JobTimeOut:
        assert False, JobTimeOut
    else:
        assert status['state'] == 'SUCCESS', status

    for ip in CLUSTER_IPS:
        payload = {
            'msg': 'method',
            'method': 'ldap.started',
        }
        res = make_ws_request(ip, payload)
        assert res.get('error') is None, res

        url = f'http://{ip}/api/v2.0/ldap/get_state'
        res = make_request('get', url)
        assert res.status_code == 200, f'ip: {ip}, res: {res.text}'
        assert res.json() == 'HEALTHY'


@pytest.mark.parametrize('ip', CLUSTER_IPS)
@pytest.mark.dependency(name="LDAP_ACCOUNTS_CONFIGURED")
def test_004_verify_ldap_accounts_present(ip, request):
    depends(request, ['BOUND_LDAP'])

    passwd = ssh_test(ip, "getent passwd")

    payload = {"username": CLUSTER_LDAP["TEST_USERNAME"]}
    url = f'http://{ip}/api/v2.0/user/get_user_obj/'
    res = make_request('post', url, data=payload)
    assert res.status_code == 200, passwd['output']

    payload = {"groupname": CLUSTER_LDAP["TEST_GROUPNAME"]}
    url = f'http://{ip}/api/v2.0/group/get_group_obj/'
    res = make_request('post', url, data=payload)
    assert res.status_code == 200, res.text


@pytest.mark.parametrize('ip', CLUSTER_IPS)
def test_005_validate_cached_ldap_accounts(ip, request):
    depends(request, ['LDAP_ACCOUNTS_CONFIGURED'])

    payload = {
        'query-filters': [["method", "=", "ldap.fill_cache"]],
        'query-options': {'order_by': ['-id']},
    }
    url = f'http://{ip}/api/v2.0/core/get_jobs'
    res = make_request('get', url, data=payload)
    assert res.status_code == 200, res.text

    try:
        status = wait_on_job(res.json()[0]['id'], ip, 300)
    except JobTimeOut:
        assert False, JobTimeOut
    else:
        assert status['state'] == 'SUCCESS', status

    payload = {
        'query-filters': [["local", "=", False]],
        'query-options': {'extra': {"additional_information": ['DS']}},
    }
    url = f'http://{ip}/api/v2.0/user'
    res = make_request('get', url, data=payload)
    assert res.status_code == 200, res.text
    assert len(res.json()) != 0, 'No cached users'

    url = f'http://{ip}/api/v2.0/group'
    res = make_request('get', url, data=payload)
    assert res.status_code == 200, res.text
    assert len(res.json()) != 0, 'No cached groups'


@pytest.mark.dependency(name="DS_LDAP_SMB_SHARE_CREATED")
def test_006_create_clustered_smb_share(request):
    depends(request, ['BOUND_LDAP'])
    global ds_smb_share_id
    global ds_wrk

    url = f'http://{CLUSTER_IPS[0]}/api/v2.0/filesystem/mkdir/'
    res = make_request('post', url, data=SHARE_FUSE_PATH)
    assert res.status_code == 200, res.text

    url = f'http://{CLUSTER_IPS[0]}/api/v2.0/user/get_user_obj/'
    payload = {"username": CLUSTER_LDAP["TEST_USERNAME"]}
    res = make_request('post', url, data=payload)
    assert res.status_code == 200, res.text
    user_obj = res.json()

    url = f'http://{CLUSTER_IPS[0]}/api/v2.0/filesystem/chown/'
    payload = {"path": SHARE_FUSE_PATH, "uid": user_obj["pw_uid"]}
    res = make_request('post', url, data=payload)
    assert res.status_code == 200, res.text

    try:
        status = wait_on_job(res.json(), CLUSTER_IPS[0], 300)
    except JobTimeOut:
        assert False, JobTimeOut
    else:
        assert status['state'] == 'SUCCESS', status

    url = f'http://{CLUSTER_IPS[0]}/api/v2.0/sharing/smb/'
    payload = {
        "comment": "LDAP clustered SMB share",
        "path": '/ds_smb_share_02',
        "name": "DS_CL_SMB2",
        "purpose": "NO_PRESET",
        "shadowcopy": False,
        "cluster_volname": CLUSTER_INFO["GLUSTER_VOLUME"]
    }

    res = make_request('post', url, data=payload)
    assert res.status_code == 200, res.text
    ds_smb_share_id = res.json()['id']

    url = f'http://{CLUSTER_IPS[0]}/api/v2.0/smb'
    res = make_request('get', url)
    assert res.status_code == 200, res.text
    ds_wrk = res.json()['workgroup']


@pytest.mark.parametrize('ip', PUBLIC_IPS)
def test_007_auth_known_failure_nosmb(ip, request):
    """
    Samba schema is disable. SMB auth should fail
    with STATUS_LOGON_FAILURE.
    """
    depends(request, ['DS_LDAP_SMB_SHARE_CREATED'])

    with pytest.raises(NTSTATUSError) as e:
        with smb_connection(
            host=ip,
            share="DS_CL_SMB2",
            username=CLUSTER_LDAP['TEST_USERNAME'],
            domain=ds_wrk,
            password=CLUSTER_LDAP['TEST_PASSWORD'],
            smb1=False
        ) as tcon:
            fd = tcon.create_file("testfile", "w")
            tcon.close(fd, True)

    assert e.value.args[0] == ntstatus.NT_STATUS_LOGON_FAILURE, e.value.args[1]


@pytest.mark.dependency(name="BOUND_LDAP_SMB")
def test_008_bind_ldap(request):
    depends(request, ['DS_LDAP_SMB_SHARE_CREATED', 'BOUND_LDAP'])

    payload = {
        "has_samba_schema": True,
    }
    url = f'http://{CLUSTER_IPS[0]}/api/v2.0/ldap/'
    res = make_request('put', url, data=payload)
    assert res.status_code == 200, res.text

    try:
        status = wait_on_job(res.json()['job_id'], CLUSTER_IPS[0], 300)
    except JobTimeOut:
        assert False, JobTimeOut
    else:
        assert status['state'] == 'SUCCESS', status


@pytest.mark.dependency(name="DS_LDAP_SMB_SHARE_IS_WRITABLE")
@pytest.mark.parametrize('ip', PUBLIC_IPS)
def test_009_share_is_writable_via_public_ips(ip, request):
    """
    This test verifies that the SMB share is writable once
    we enable a samba schema.
    """
    depends(request, ['DS_LDAP_SMB_SHARE_CREATED', 'BOUND_LDAP_SMB'])

    with smb_connection(
        host=ip,
        share="DS_CL_SMB2",
        username=CLUSTER_LDAP['TEST_USERNAME'],
        domain=ds_wrk,
        password=CLUSTER_LDAP['TEST_PASSWORD'],
        smb1=False
    ) as tcon:
        fd = tcon.create_file("testfile", "w")
        tcon.close(fd, True)


def test_010_xattrs_writable_via_smb(request):
    depends(request, ['DS_LDAP_SMB_SHARE_IS_WRITABLE'])

    with smb_connection(
        host=PUBLIC_IPS[0],
        share="DS_CL_SMB2",
        username=CLUSTER_LDAP['TEST_USERNAME'],
        domain=ds_wrk,
        password=CLUSTER_LDAP['TEST_PASSWORD'],
        smb1=False
    ) as tcon:
        fd = tcon.create_file("streamstestfile:smb2_stream", "w")
        tcon.write(fd, b'test1', 0)
        tcon.close(fd)

        fd2 = tcon.create_file("streamstestfile:smb2_stream", "w")
        contents = tcon.read(fd2, 0, 5)
        tcon.close(fd2)

    assert(contents.decode() == "test1")


def test_048_delete_clustered_smb_share(request):
    depends(request, ['DS_LDAP_SMB_SHARE_CREATED'])

    url = f'http://{CLUSTER_IPS[1]}/api/v2.0/sharing/smb/id/{ds_smb_share_id}'
    res = make_request('delete', url)
    assert res.status_code == 200, res.text


@pytest.mark.parametrize('ip', CLUSTER_IPS)
def test_049_verify_clustered_share_removed(ip, request):
    depends(request, ['DS_LDAP_SMB_SHARE_CREATED'])

    url = f'http://{ip}/api/v2.0/sharing/smb?id={ds_smb_share_id}'
    res = make_request('get', url)
    assert res.status_code == 200, res.text
    assert res.json() == [], res.text

    cmd = f'rm -rf /cluster/{CLUSTER_INFO["GLUSTER_VOLUME"]}/ds_smb_share_02'
    res = ssh_test(CLUSTER_IPS[0], cmd)
    assert res['result'], res['stderr']


def test_050_unbind_ldap(request):
    depends(request, ['BOUND_LDAP'])

    url = f'http://{CLUSTER_IPS[0]}/api/v2.0/ldap'
    payload = {
        "has_samba_schema": False,
        "enable": False,
    }
    res = make_request('put', url, data=payload)
    assert res.status_code == 200, res.text

    try:
        status = wait_on_job(res.json()['job_id'], CLUSTER_IPS[0], 300)
    except JobTimeOut:
        assert False, JobTimeOut
    else:
        assert status['state'] == 'SUCCESS', status

    for ip in CLUSTER_IPS:
        url = f'http://{ip}/api/v2.0/ldap/get_state'
        res = make_request('get', url)
        assert res.status_code == 200, f'ip: {ip}, res: {res.text}'
        assert res.json() == 'DISABLED'

        payload = {
            'msg': 'method',
            'method': 'ldap.started',
        }
        res = make_ws_request(ip, payload)
        assert res.get('error') is None, res