1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#!/usr/bin/env python3
import pytest
import sys
import os
from pytest_dependency import depends
apifolder = os.getcwd()
sys.path.append(apifolder)
from functions import fail, POST, GET, PUT, SSH_TEST, wait_on_job
from auto_config import ha, dev_test, hostname, user, password
# comment pytestmark for development testing with --dev-test
pytestmark = pytest.mark.skipif(dev_test, reason='Skipping for test development testing')
try:
from config import AD_DOMAIN, ADPASSWORD, ADUSERNAME, ADNameServer
except ImportError:
Reason = 'ADNameServer AD_DOMAIN, ADPASSWORD, or/and ADUSERNAME are missing in config.py"'
ad_test = pytest.mark.skip(reason=Reason)
if ha and "virtual_ip" in os.environ:
ip = os.environ["controller1_ip"]
else:
from auto_config import ip
@pytest.fixture(scope='module')
def pool_data():
return {}
@pytest.fixture(scope='module')
def logs_data():
return {}
def test_01_verify_system_dataset_is_set_to_boot_pool():
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'boot-pool', results.text
assert results.json()['basename'] == 'boot-pool/.system', results.text
def test_02_verify_sysds_is_moved_after_first_pool_with_encrytion_is_created(request, pool_data):
pool_disk = [POST('/disk/get_unused/').json()[0]['name']]
payload = {
'name': 'encrypted',
'encryption': True,
'encryption_options': {
'algorithm': 'AES-128-CCM',
'passphrase': 'my_pool_passphrase',
},
'topology': {
'data': [
{'type': 'STRIPE', 'disks': pool_disk}
],
}
}
results = POST('/pool/', payload)
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 240)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
pool_data['encrypted'] = job_status['results']['result']
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'boot-pool', results.text
assert results.json()['basename'] == 'boot-pool/.system', results.text
def test_03_verify_sysds_cant_move_to_a_passphrase_encrypted_pool(request):
results = PUT("/systemdataset/", {'pool': 'encrypted'})
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 120)
assert job_status['state'] == 'FAILED', str(job_status['results'])
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'boot-pool', results.text
assert results.json()['basename'] == 'boot-pool/.system', results.text
def test_04_verify_sysds_after_passphrase_encrypted_pool_is_deleted(request, pool_data):
payload = {
'cascade': True,
'restart_services': True,
'destroy': True
}
results = POST(f'/pool/id/{pool_data["encrypted"]["id"]}/export/', payload)
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 120)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'boot-pool', results.text
assert results.json()['basename'] == 'boot-pool/.system', results.text
@pytest.mark.dependency(name="first_pool")
def test_05_verify_the_first_pool_created_become_sysds(request, pool_data):
pool_disk = [POST('/disk/get_unused/').json()[0]['name']]
payload = {
"name": 'first_pool',
"encryption": False,
"topology": {
"data": [
{"type": "STRIPE", "disks": pool_disk}
],
},
"allow_duplicate_serials": True,
}
results = POST("/pool/", payload)
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 180)
if job_status['state'] != SUCCESS:
fail(f'Failed to create first pool: {job_status["state"]}: {job_status["results"]}')
pool_data['first_pool'] = job_status['results']['result']
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'first_pool', results.text
assert results.json()['basename'] == 'first_pool/.system', results.text
@pytest.mark.dependency(name="second_pool")
def test_06_creating_a_second_pool_and_verify_it_doesnt_become_sysds(request, pool_data):
depends(request, ["first_pool"])
pool_disk = [POST('/disk/get_unused/').json()[0]['name']]
payload = {
"name": 'second_pool',
"encryption": False,
"topology": {
"data": [
{"type": "STRIPE", "disks": pool_disk}
],
},
"allow_duplicate_serials": True,
}
results = POST("/pool/", payload)
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
pool_data['second_pool'] = job_status['results']['result']
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'first_pool', results.text
assert results.json()['basename'] == 'first_pool/.system', results.text
def test_07_verify_changes_to_sysds_are_forbidden_while_AD_is_running(request):
depends(request, ["second_pool"])
results = GET("/network/configuration/")
assert results.status_code == 200, results.text
nameserver1 = results.json()['nameserver1']
payload = {
"nameserver1": ADNameServer,
}
results = PUT("/network/configuration/", payload)
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
payload = {
"bindpw": ADPASSWORD,
"bindname": ADUSERNAME,
"domainname": AD_DOMAIN,
"netbiosname": hostname,
"dns_timeout": 15,
"verbose_logging": True,
"enable": True
}
results = PUT("/activedirectory/", payload)
assert results.status_code == 200, results.text
job_status = wait_on_job(results.json()['job_id'], 180)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
results = GET('/activedirectory/get_state/')
assert results.status_code == 200, results.text
assert results.json() == 'HEALTHY', results.text
results = PUT("/systemdataset/", {'pool': 'second_pool'})
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 120)
assert job_status['state'] == 'FAILED', str(job_status['results'])
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'first_pool', results.text
assert results.json()['basename'] == 'first_pool/.system', results.text
leave_payload = {
"username": ADUSERNAME,
"password": ADPASSWORD
}
results = POST("/activedirectory/leave/", leave_payload)
assert results.status_code == 200, results.text
results = PUT("/network/configuration/", {"nameserver1": nameserver1})
assert results.status_code == 200, results.text
def test_08_get_logs_before_moving_the_sysds_to_the_second_pool(logs_data):
cmd = "cat /var/log/middlewared.log"
middlewared_log = SSH_TEST(cmd, user, password, ip)
assert middlewared_log['result'] is True, str(middlewared_log)
logs_data['middleware_log_4'] = middlewared_log['output'].splitlines()[-1]
def test_09_move_sysds_to_second_pool(request):
depends(request, ["second_pool"])
results = PUT("/systemdataset/", {'pool': 'second_pool'})
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 120)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'second_pool', results.text
assert results.json()['basename'] == 'second_pool/.system', results.text
def test_10_verify_logs_after_sysds_is_moved_to_second_pool(logs_data):
cmd = "cat /var/log/middlewared.log"
middlewared_log = SSH_TEST(cmd, user, password, ip)
assert middlewared_log['result'] is True, str(middlewared_log)
logs_data['middleware_log_5'] = middlewared_log['output'].splitlines()[-1]
assert logs_data['middleware_log_4'] in middlewared_log['output'], str(middlewared_log['output'])
assert logs_data['middleware_log_4'] != logs_data['middleware_log_5']
def test_11_verify_sysds_can_be_moved_while_services_are_running(request):
depends(request, ["second_pool"])
services = {i['service']: i for i in GET('/service').json()}
services_list = list(services.keys())
for service in services_list:
results = POST("/service/start/", {"service": service})
assert results.status_code == 200, results.text
results = PUT("/systemdataset/", {'pool': 'first_pool'})
assert results.status_code == 200, results.text
assert isinstance(results.json(), int), results.text
job_status = wait_on_job(results.json(), 120)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'first_pool', results.text
assert results.json()['basename'] == 'first_pool/.system', results.text
for service in services_list:
if service != 'ssh':
results = POST("/service/stop/", {"service": service})
assert results.status_code == 200, results.text
def test_12_delete_second_pool_and_verify_sysds_is_moved_to_first_pool(request, pool_data):
payload = {
'cascade': True,
'restart_services': True,
'destroy': True
}
results = POST(f'/pool/id/{pool_data["second_pool"]["id"]}/export/', payload)
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 120)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'first_pool', results.text
assert results.json()['basename'] == 'first_pool/.system', results.text
def test_13_delete_first_pool_and_verify_sysds_moved_to_the_boot_pool(request, pool_data):
payload = {
'cascade': True,
'restart_services': True,
'destroy': True
}
results = POST(f'/pool/id/{pool_data["first_pool"]["id"]}/export/', payload)
assert results.status_code == 200, results.text
job_id = results.json()
job_status = wait_on_job(job_id, 120)
assert job_status['state'] == 'SUCCESS', str(job_status['results'])
results = GET("/systemdataset/")
assert results.status_code == 200, results.text
assert isinstance(results.json(), dict), results.text
assert results.json()['pool'] == 'boot-pool', results.text
assert results.json()['basename'] == 'boot-pool/.system', results.text