#!/usr/bin/env python@PY_VERSION@
# Copyright (C) 2013-2016 Red Hat, Inc.
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; If not, see .
#
# Author: tasleson
# Joe Handzik
# Gris Ge
import lsm
import functools
import time
import random
import string
import traceback
import unittest
import argparse
try:
# Python 3.8 required change
from collections.abc import Sequence
except ImportError:
from collections import Sequence
import atexit
import sys
import os
import tempfile
from lsm import LsmError, ErrorNumber
from lsm import Capabilities as Cap
results = {}
stats = {}
MIN_POOL_SIZE = 4096
MIN_OBJECT_SIZE = 512
def _rs(l):
return ''.join(random.choice(string.ascii_uppercase) for _ in range(l))
# Lets create a random prefix so we can run multiple copies at the same time
# for hardware that will allow it.
g_prefix = _rs(4)
def rs(component, l=4):
"""
Generate a random string
"""
rp = _rs(l)
if component is not None:
return '%s_%s_%s' % (g_prefix, component, rp)
return rp
# If you print anything during execution make sure it's to stderr as the
# automated tests are making the assumption that test messages go to stderr and
# execution information gets written to stdout as yaml
def print_stderr(msg):
sys.stderr.write(msg)
sys.stderr.flush()
def mb_in_bytes(mib):
return 1024 * 1024 * mib
def record_result(method):
def recorder(*args, **kwargs):
try:
result = method(*args, **kwargs)
results[method.__name__] = dict(rc=True, msg=None)
return result
except Exception as e:
results[method.__name__] = dict(rc=False,
stack_trace=traceback.format_exc(),
msg=str(e))
return recorder
def update_stats(method_name, duration, number_results):
if method_name in stats:
stats[method_name]["count"] += 1
else:
stats[method_name] = dict(count=1, total_time=0, number_items=0)
stats[method_name]["total_time"] += duration
if number_results > 0:
stats[method_name]["number_items"] += number_results
def r_fcpn():
"""
Generate a random 16 character hex number
"""
rnd_fcpn = '%016x' % random.randrange(2 ** 64)
return ':'.join(rnd_fcpn[i:i + 2] for i in range(0, len(rnd_fcpn), 2))
def r_iqn():
# RFC 3722, only lower case characters are allowed in IQN
return 'iqn.1994-05.com.domain:01.' + rs(None, 6).lower()
class Duration(object):
def __init__(self):
self.start = 0
self.end = 0
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *ignore):
self.end = time.time()
def amount(self):
return self.end - self.start
def supported(cap, capability):
for c in capability:
if not cap.supported(c):
return False
return True
class TestProxy(object):
# Errors that we are forcing to occur
not_logging = [lsm.ErrorNumber.NO_SUPPORT,
lsm.ErrorNumber.NAME_CONFLICT,
lsm.ErrorNumber.NO_STATE_CHANGE,
lsm.ErrorNumber.IS_MASKED,
lsm.ErrorNumber.HAS_CHILD_DEPENDENCY,
lsm.ErrorNumber.EXISTS_INITIATOR]
# Hash of all calls that can be async
async_calls = {'volume_create': (str, lsm.Volume),
'volume_resize': (str, lsm.Volume),
'volume_replicate': (str, lsm.Volume),
'volume_replicate_range': (str,),
'volume_delete': (str,),
'volume_child_dependency_rm': (str,),
'fs_delete': (str,),
'fs_resize': (str, lsm.FileSystem),
'fs_create': (str, lsm.FileSystem),
'fs_clone': (str, lsm.FileSystem),
'fs_file_clone': (str,),
'fs_snapshot_create': (str, lsm.FsSnapshot),
'fs_snapshot_delete': (str,),
'fs_snapshot_restore': (str,),
'fs_child_dependency_rm': (str,)}
# The constructor.
# @param self The object self
# @param obj The object instance to wrap
def __init__(self, obj=None):
"""
Constructor which takes an object to wrap.
"""
self.o = obj
# Called each time an attribute is requested of the object
# @param self The object self
# @param name Name of the attribute being accessed
# @return The result of the method
def __getattr__(self, name):
"""
Called each time an attribute is requested of the object
"""
if hasattr(self.o, name):
return functools.partial(self.present, name)
else:
raise AttributeError("No such method %s" % name)
@staticmethod
def log_result(method, v):
if method not in results:
results[method] = []
results[method].append(v)
# Method which is called to invoke the actual method of interest.
#
# The intentions of this method is this:
# - Invoke the method just like it normally would without this
# so signature in & out is identical
# - Collect results of the method call
# - Collect stats on the execution time of call
#
# @param self The object self
# @param _proxy_method_name Method to invoke
# @param args Arguments
# @param kwargs Keyword arguments
# @return The result of the method invocation
def present(self, _proxy_method_name, *args, **kwargs):
"""
Method which is called to invoke the actual method of interest.
"""
job_possible = _proxy_method_name in TestProxy.async_calls
# Timer block
with Duration() as method_time:
try:
rc = getattr(self.o, _proxy_method_name)(*args, **kwargs)
TestProxy.log_result(_proxy_method_name,
dict(rc=True, stack_trace=None, msg=None))
except lsm.LsmError as le:
# We are forcing some types of error, for these we won't log
# but will allow the test case asserts to check to make sure
# we actually got them.
if le.code not in self.not_logging:
TestProxy.log_result(
_proxy_method_name,
dict(rc=False,
stack_trace=traceback.format_exc(),
msg=str(le)))
raise
# If the job can do async, we will block looping on it.
if job_possible and rc is not None:
# Note: Some return a single unicode or None,
# others return a tuple (job, object)
if type(rc) != tuple and type(rc) != list:
rc = (rc, None)
rc = self.wait_for_it(_proxy_method_name, *rc)
# Fix up return value to match what it would normally be
if job_possible:
if 2 == len(TestProxy.async_calls[_proxy_method_name]):
rc = (None, rc)
# We don't care about time per operation when there is only one
# possible.
if not job_possible and isinstance(rc, Sequence) and len(rc) > 2:
num_results = len(rc)
else:
num_results = 0
update_stats(_proxy_method_name, method_time.amount(), num_results)
return rc
def wait_for_it(self, msg, job, item):
if not job:
return item
else:
while True:
(s, percent, i) = self.job_status(job)
if s == lsm.JobStatus.INPROGRESS:
time.sleep(0.25)
elif s == lsm.JobStatus.COMPLETE:
self.job_free(job)
return i
else:
raise Exception(msg + " job error code= " + str(s))
def check_type(value, *expected):
assert type(value) in expected, "type expected (%s), type actual (%s)" % \
(str(type(value)), str(expected))
class TestPlugin(unittest.TestCase):
"""
Anything that starts with test_ will be run as a separate unit test with
the setUp and tearDown methods called before and after respectively
"""
URI = 'sim://'
PASSWORD = None
@staticmethod
def _min_size():
return mb_in_bytes(MIN_OBJECT_SIZE)
def setUp(self):
for skip_test_case in TestPlugin.SKIP_TEST_CASES:
if self.id().endswith(skip_test_case):
self.skipTest("Tested has been skiped as requested")
self.c = TestProxy(lsm.Client(TestPlugin.URI, TestPlugin.PASSWORD))
self.systems = self.c.systems()
self.pools = self.c.pools()
self.pool_by_sys_id = {}
for s in self.systems:
self.pool_by_sys_id[s.id] = [p for p in self.pools if
p.system_id == s.id]
# TODO Store what exists, so that we don't remove it
def _get_pool_by_usage(self, system_id, element_type,
unsupported_features=0):
largest_free = 0
rc = None
for p in self.pool_by_sys_id[system_id]:
# If the pool matches our criteria and min size we will consider
# it, but we will select the one with the most free space for
# testing and one that support volume expansion
if p.element_type & element_type and \
p.free_space > mb_in_bytes(MIN_POOL_SIZE) and \
(not p.unsupported_actions & unsupported_features):
if p.free_space > largest_free:
largest_free = p.free_space
rc = p
return rc
def _clean_up(self):
# Note: We make best effort to clean things up, thus we are ignoring
# exceptions in this code, depending on what's failing in the plugin
# we may not be able to remove something we created
# noinspection PyBroadException
try:
for s in self.systems:
cap = self.c.capabilities(s)
# Remove any access groups we created, removing any mappings
# first.
if supported(cap, [Cap.ACCESS_GROUP_DELETE,
Cap.VOLUMES_ACCESSIBLE_BY_ACCESS_GROUP]):
for ag in self.c.access_groups():
if g_prefix in ag.name and s.id == ag.system_id:
try:
# Make sure it doesn't have any mappings
mapped_volume = self.c.\
volumes_accessible_by_access_group(ag)
# Remove any mappings
for vol in mapped_volume:
self.c.volume_unmask(ag, vol)
# Lastly, remove the access group
self.c.access_group_delete(ag)
except LsmError as le:
print_stderr("[WARNING] error when "
"removing ag %s\n" % str(le))
pass
# Remove any volumes we created
if supported(cap, [Cap.VOLUME_DELETE]):
for v in self.c.volumes():
if g_prefix in v.name and s.id == v.system_id:
# Check to see if this volume is participating in
# an access group, if it is we will remove the
# volume from it, but we will not delete the access
# group as we likely didn't create it
access_groups = self.c.\
access_groups_granted_to_volume(v)
for ag in access_groups:
try:
self.c.volume_unmask(ag, v)
except LsmError as le:
print_stderr(
"[WARNING] error when unmasking "
"volume %s\n" % str(le))
pass
try:
self.c.volume_delete(v)
except LsmError as le:
print_stderr("[WARNING] error when removing "
"volume %s\n" % str(le))
pass
# Remove any fs exports we created
if supported(cap, [Cap.FS, Cap.EXPORTS, Cap.EXPORT_REMOVE]):
# Get all the FS
fs_list = self.c.fs()
for e_fs in self.c.exports():
# Get the fs object that is exported
fs = [x for x in fs_list if x.id == e_fs.fs_id][0]
# Make sure we are un-exporting a FS we created
if g_prefix in fs.name and s.id == fs.system_id:
try:
print_stderr("Export remove: %s" % str(e_fs))
self.c.export_remove(e_fs)
except LsmError as le:
print_stderr("[WARNING] error when removing "
"export %s\n" % str(le))
pass
# Remove any fs we created
if supported(cap, [Cap.FS, Cap.FS_DELETE]):
for f in self.c.fs():
if g_prefix in f.name and s.id == f.system_id:
if supported(cap, [Cap.FS_CHILD_DEPENDENCY,
Cap.FS_CHILD_DEPENDENCY_RM]):
if self.c.fs_child_dependency(f, None):
self.c.fs_child_dependency_rm(f, None)
try:
self.c.fs_delete(f)
except LsmError as le:
print_stderr("[WARNING] error when removing "
"fs %s\n" % str(le))
pass
except Exception:
msg = str(traceback.format_exc())
print_stderr("[WARNING] exception in _clean_up %s\n" % msg)
pass
def tearDown(self):
self._clean_up()
self.c.close()
def test_plugin_info(self):
(desc, version) = self.c.plugin_info()
self.assertTrue(desc is not None and len(desc) > 0)
self.assertTrue(version is not None and len(version) > 0)
def test_fw_version_get(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.SYS_FW_VERSION_GET]):
fw_ver = s.fw_version
self.assertTrue(fw_ver is not None and len(fw_ver) > 0,
"Firmware version retrieval failed")
def test_sys_mode_get(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.SYS_MODE_GET]):
sys_mode = s.mode
self.assertTrue(sys_mode is not None,
"System mode retrieval failed")
def test_sys_read_cache_pct_get(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.SYS_READ_CACHE_PCT_GET]):
read_pct = s.read_cache_pct
self.assertTrue(read_pct is not None,
"Read cache percentage retrieval failed")
def test_system_read_cache_pct_update(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.SYS_READ_CACHE_PCT_UPDATE]):
cache_status = self.c.system_read_cache_pct_update(s, 100)
self.assertTrue(cache_status is None,
"system_read_cache_pct_update failed")
def test_timeout(self):
tmo = 40000
self.c.time_out_set(tmo)
self.assertEqual(self.c.time_out_get(), tmo)
def test_systems_list(self):
self.c.systems()
def test_pools_list(self):
self.c.pools()
def _find_or_create_volumes(self):
"""
Find existing volumes, if not found, try to create one.
Return (volumes, flag_created)
If 'flag_created' is True, then returned volumes is newly created.
"""
volumes = self.c.volumes()
flag_created = False
if len(self.c.volumes()) == 0:
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]):
self._volume_create(s.id)
flag_created = True
break
volumes = self.c.volumes()
return volumes, flag_created
def test_volume_list(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUMES]):
(volumes, flag_created) = self._find_or_create_volumes()
self.assertTrue(
len(volumes) > 0,
"We need at least 1 volume to test")
if flag_created:
self._volume_delete(volumes[0])
def test_volume_vpd83(self):
# You cannot test for vpd83 if the device doesn't support volumes
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUMES]):
(volumes, flag_created) = self._find_or_create_volumes()
self.assertTrue(
len(volumes) > 0, "We need at least 1 volume to test")
for v in volumes:
# Don't test this on HDS as VPD is empty
if '10.1.134.227' not in TestPlugin.URI:
self.assertTrue(
lsm.Volume.vpd83_verify(v.vpd83),
"VPD is not as expected '%s' for volume id: '%s'" %
(v.vpd83, v.id))
if flag_created:
self._volume_delete(volumes[0])
def test_disks_list(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.DISKS]):
disks = self.c.disks()
self.assertTrue(len(disks) > 0,
"We need at least 1 disk to test")
if supported(cap, [Cap.DISK_VPD83_GET]):
try:
list(disk.vpd83 for disk in disks)
list(disk.rpm for disk in disks)
list(disk.link_type for disk in disks)
except LsmError as lsm_err:
if lsm_err.code != ErrorNumber.NO_SUPPORT:
raise
def test_disk_location_get(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.DISK_LOCATION]):
for disk in self.c.disks():
disk_location = disk.location
self.assertTrue(disk_location is not None and
len(disk_location) > 0,
"Disk location retrieval failed")
def _volume_create(self, system_id,
element_type=lsm.Pool.ELEMENT_TYPE_VOLUME,
unsupported_features=0):
if system_id in self.pool_by_sys_id:
p = self._get_pool_by_usage(system_id, element_type,
unsupported_features)
self.assertTrue(p is not None, "Unable to find a suitable pool")
if p:
vol_size = self._min_size()
vol = self.c.volume_create(p, rs('v'), vol_size,
lsm.Volume.PROVISION_DEFAULT)[1]
self.assertTrue(self._volume_exists(vol.id), p.id)
self.assertTrue(vol.pool_id == p.id)
self.assertTrue(vol.system_id == p.system_id)
return vol, p
def _fs_create(self, system_id):
if system_id in self.pool_by_sys_id:
fs = None
pool = self._get_pool_by_usage(system_id,
lsm.Pool.ELEMENT_TYPE_FS)
self.assertTrue(pool is not None, "Unable to find a suitable pool "
"for fs creation")
if pool is not None:
fs_size = self._min_size()
fs = self.c.fs_create(pool, rs('fs'), fs_size)[1]
self.assertTrue(fs is not None)
if fs:
self.assertTrue(self._fs_exists(fs.id))
self.assertTrue(self._system_exists(fs.system_id))
self.assertTrue(pool.system_id == fs.system_id)
return fs, pool
def _volume_delete(self, volume):
self.c.volume_delete(volume)
self.assertFalse(self._volume_exists(volume.id))
def _fs_delete(self, fs):
self.c.fs_delete(fs)
self.assertFalse(self._fs_exists(fs.id))
def _fs_snapshot_delete(self, fs, ss):
self.c.fs_snapshot_delete(fs, ss)
self.assertFalse(self._fs_snapshot_exists(fs, ss.id))
def _volume_exists(self, volume_id, pool_id=None):
volumes = self.c.volumes()
for v in volumes:
if v.id == volume_id:
if pool_id is not None:
if v.pool_id == pool_id:
return True
else:
return False
return True
return False
def _fs_exists(self, fs_id):
fs = self.c.fs()
for f in fs:
if f.id == fs_id:
return True
return False
def _system_exists(self, system_id):
systems = self.c.systems()
for s in systems:
if s.id == system_id:
return True
return False
def _fs_snapshot_exists(self, fs, ss_id):
snapshots = self.c.fs_snapshots(fs)
for s in snapshots:
if s.id == ss_id:
return True
return False
def test_volume_create_delete(self):
if self.pool_by_sys_id:
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_CREATE]):
vol = self._volume_create(s.id)[0]
self.assertTrue(vol is not None)
if vol is not None and \
supported(cap, [Cap.VOLUME_DELETE]):
self._volume_delete(vol)
def test_volume_resize(self):
if self.pool_by_sys_id:
for s in self.systems:
cap = self.c.capabilities(s)
# We need to make sure that the pool supports volume grow.
unsupported = lsm.Pool.UNSUPPORTED_VOLUME_GROW
if supported(cap, [Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE,
Cap.VOLUME_RESIZE]):
vol = self._volume_create(
s.id,
unsupported_features=unsupported)[0]
vol_resize = self.c.volume_resize(
vol, vol.size_bytes + mb_in_bytes(16))[1]
self.assertTrue(vol.size_bytes < vol_resize.size_bytes)
# The 3par in the lab has a firmware bug that causes
# this assert to fail, lets skip this test for that box.
# we need to come up with a better way to handle these
# kind of things, ie. expected failures for certain things
if '10.1.134.160' not in TestPlugin.URI:
self.assertTrue(vol.id == vol_resize.id,
"Expecting re-sized volume to refer to "
"same volume. Expected %s, got %s" %
(vol.id, vol_resize.id))
if vol.id == vol_resize.id:
self._volume_delete(vol_resize)
else:
# Delete the original
self._volume_delete(vol)
def _replicate_test(self, capability, replication_type):
if self.pool_by_sys_id:
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE]):
vol, pool = self._volume_create(s.id)
# For the moment lets allow the array to pick the pool
# to supply the backing store for the replicate
if supported(cap, [capability]):
volume_clone = self.c.volume_replicate(
None, replication_type, vol,
rs('v_c_'))[1]
self.assertTrue(volume_clone is not None)
if volume_clone:
self.assertTrue(
self._volume_exists(volume_clone.id))
# Lets test for creating a clone with an
# existing name
error_num = None
try:
self.c.volume_replicate(
None, replication_type, vol,
volume_clone.name)[1]
except LsmError as le:
error_num = le.code
self.assertTrue(error_num ==
ErrorNumber.NAME_CONFLICT)
self._volume_delete(volume_clone)
self._volume_delete(vol)
def test_volume_replication(self):
self._replicate_test(Cap.VOLUME_REPLICATE_CLONE,
lsm.Volume.REPLICATE_CLONE)
self._replicate_test(Cap.VOLUME_REPLICATE_COPY,
lsm.Volume.REPLICATE_COPY)
self._replicate_test(Cap.VOLUME_REPLICATE_MIRROR_ASYNC,
lsm.Volume.REPLICATE_MIRROR_ASYNC)
self._replicate_test(Cap.VOLUME_REPLICATE_MIRROR_SYNC,
lsm.Volume.REPLICATE_MIRROR_SYNC)
def test_volume_replicate_range_block_size(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_COPY_RANGE_BLOCK_SIZE]):
size = self.c.volume_replicate_range_block_size(s)
self.assertTrue(size > 0)
else:
self.assertRaises(lsm.LsmError,
self.c.volume_replicate_range_block_size, s)
def test_replication_range(self):
if self.pool_by_sys_id:
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap,
[Cap.VOLUME_COPY_RANGE_BLOCK_SIZE,
Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE,
Cap.VOLUME_COPY_RANGE]):
size = self.c.volume_replicate_range_block_size(s)
vol, pool = self._volume_create(s.id)
br = lsm.BlockRange(0, size, size)
if supported(
cap, [Cap.VOLUME_COPY_RANGE_CLONE]):
self.c.volume_replicate_range(
lsm.Volume.REPLICATE_CLONE, vol, vol, [br])
else:
self.assertRaises(
lsm.LsmError,
self.c.volume_replicate_range,
lsm.Volume.REPLICATE_CLONE, vol, vol, [br])
br = lsm.BlockRange(size * 2, size, size)
if supported(
cap, [Cap.VOLUME_COPY_RANGE_COPY]):
self.c.volume_replicate_range(
lsm.Volume.REPLICATE_COPY, vol, vol, [br])
else:
self.assertRaises(
lsm.LsmError,
self.c.volume_replicate_range,
lsm.Volume.REPLICATE_COPY, vol, vol, [br])
self._volume_delete(vol)
def test_fs_creation_deletion(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.FS_CREATE]):
fs, pool = self._fs_create(s.id)
if fs is not None:
if supported(cap, [Cap.FS_DELETE]):
self._fs_delete(fs)
def test_fs_resize(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.FS_CREATE]):
fs, pool = self._fs_create(s.id)
if fs is not None:
if supported(cap, [Cap.FS_RESIZE]):
fs_size = fs.total_space + mb_in_bytes(16)
fs_resized = self.c.fs_resize(fs, fs_size)[1]
self.assertTrue(fs_resized.total_space)
if supported(cap, [Cap.FS_DELETE]):
self._fs_delete(fs)
def test_fs_clone(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.FS_CREATE,
Cap.FS_CLONE]):
fs, pool = self._fs_create(s.id)
if fs is not None:
fs_clone = self.c.fs_clone(fs, rs('fs_c'))[1]
if supported(cap, [Cap.FS_DELETE]):
self._fs_delete(fs_clone)
self._fs_delete(fs)
def test_fs_snapshot(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.FS_CREATE,
Cap.FS_SNAPSHOT_CREATE]):
fs, pool = self._fs_create(s.id)
if fs is not None:
ss = self.c.fs_snapshot_create(fs, rs('ss'))[1]
self.assertTrue(self._fs_snapshot_exists(fs, ss.id))
if supported(cap, [Cap.FS_SNAPSHOT_RESTORE]):
self.c.fs_snapshot_restore(fs, ss, None, None, True)
# Delete snapshot
if supported(cap, [Cap.FS_SNAPSHOT_DELETE]):
self._fs_snapshot_delete(fs, ss)
def test_target_ports(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.TARGET_PORTS]):
ports = self.c.target_ports()
for p in ports:
self.assertTrue(p.id is not None)
self.assertTrue(p.port_type is not None)
self.assertTrue(p.service_address is not None)
self.assertTrue(p.network_address is not None)
self.assertTrue(p.physical_address is not None)
self.assertTrue(p.physical_name is not None)
self.assertTrue(p.system_id is not None)
def _masking_state(self, cap, ag, vol, masked):
if supported(cap,
[Cap.
VOLUMES_ACCESSIBLE_BY_ACCESS_GROUP]):
vol_masked = \
self.c.volumes_accessible_by_access_group(ag)
match = [x for x in vol_masked if x.id == vol.id]
if masked:
self.assertTrue(len(match) == 1, "len = %d" % len(match))
else:
self.assertTrue(len(match) == 0, "len = %d" % len(match))
if supported(cap,
[Cap.
ACCESS_GROUPS_GRANTED_TO_VOLUME]):
ag_masked = \
self.c.access_groups_granted_to_volume(vol)
match = [x for x in ag_masked if x.id == ag.id]
if masked:
self.assertTrue(len(match) == 1, "len = %d" % len(match))
else:
self.assertTrue(len(match) == 0, "len = %d" % len(match))
def test_mask_unmask(self):
for s in self.systems:
ag_created = None
cap = self.c.capabilities(s)
if supported(cap, [Cap.ACCESS_GROUPS,
Cap.VOLUME_MASK,
Cap.VOLUME_UNMASK,
Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE]):
if supported(cap, [Cap.ACCESS_GROUP_CREATE_ISCSI_IQN]):
ag_name = rs("ag")
ag_iqn = r_iqn()
ag_created = self.c.access_group_create(
ag_name, ag_iqn, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN,
s)
# Make sure we have an access group to test with, many
# smi-s providers don't provide functionality to create them!
ag_list = self.c.access_groups('system_id', s.id)
if len(ag_list):
vol = self._volume_create(s.id)[0]
self.assertTrue(vol is not None)
chose_ag = ag_created
if chose_ag is None:
for ag in ag_list:
if len(ag.init_ids) >= 1:
chose_ag = ag
break
if chose_ag is None:
raise Exception("No access group with 1+ member "
"found, cannot do volume mask test")
if vol is not None and chose_ag is not None:
self.c.volume_mask(chose_ag, vol)
self._masking_state(cap, chose_ag, vol, True)
# Test duplicate call for NO_STATE_CHANGE error
flag_dup_error_found = False
try:
self.c.volume_mask(chose_ag, vol)
except LsmError as lsm_error:
self.assertTrue(
lsm_error.code == ErrorNumber.NO_STATE_CHANGE)
flag_dup_error_found = True
self.assertTrue(flag_dup_error_found)
self.c.volume_unmask(chose_ag, vol)
self._masking_state(cap, chose_ag, vol, False)
# Test duplicate call for NO_STATE_CHANGE error
flag_dup_error_found = False
try:
self.c.volume_unmask(chose_ag, vol)
except LsmError as lsm_error:
self.assertTrue(
lsm_error.code == ErrorNumber.NO_STATE_CHANGE)
flag_dup_error_found = True
self.assertTrue(flag_dup_error_found)
if vol:
self._volume_delete(vol)
if ag_created:
self.assertTrue(s.id == ag_created.system_id)
self.c.access_group_delete(ag_created)
def _create_access_group(self, name, s, init_type):
ag_created = None
if init_type == lsm.AccessGroup.INIT_TYPE_ISCSI_IQN:
ag_created = self.c.access_group_create(
name, r_iqn(),
lsm.AccessGroup.INIT_TYPE_ISCSI_IQN, s)
elif init_type == lsm.AccessGroup.INIT_TYPE_WWPN:
ag_created = self.c.access_group_create(
name,
r_fcpn(),
lsm.AccessGroup.INIT_TYPE_WWPN, s)
self.assertTrue(ag_created is not None)
if ag_created is not None:
ag_list = self.c.access_groups()
match = [x for x in ag_list if x.id == ag_created.id]
self.assertTrue(
len(match) == 1,
"Newly created access group %s not in the access group listing"
% ag_created.name)
self.assertTrue(s.id == ag_created.system_id)
return ag_created
def _delete_access_group(self, ag):
self.c.access_group_delete(ag)
ag_list = self.c.access_groups()
match = [x for x in ag_list if x.id == ag.id]
self.assertTrue(len(match) == 0, "Expected access group that was "
"deleted to not show up in the "
"access group list!")
def _test_ag_create_dup(self, lsm_ag, lsm_system):
"""
Test NAME_CONFLICT and EXISTS_INITIATOR of access_group_create().
"""
flag_got_expected_error = False
if lsm_ag.init_type == lsm.AccessGroup.INIT_TYPE_ISCSI_IQN:
new_init_id = r_iqn()
else:
new_init_id = r_fcpn()
try:
self.c.access_group_create(
lsm_ag.name, new_init_id, lsm_ag.init_type, lsm_system)
except LsmError as lsm_error:
self.assertTrue(lsm_error.code == ErrorNumber.NAME_CONFLICT)
flag_got_expected_error = True
self.assertTrue(flag_got_expected_error)
flag_got_expected_error = False
try:
self.c.access_group_create(
rs('ag'), lsm_ag.init_ids[0], lsm_ag.init_type, lsm_system)
except LsmError as lsm_error:
self.assertTrue(lsm_error.code == ErrorNumber.EXISTS_INITIATOR)
flag_got_expected_error = True
self.assertTrue(flag_got_expected_error)
def _test_ag_create_delete(self, cap, s):
if supported(cap, [Cap.ACCESS_GROUPS,
Cap.ACCESS_GROUP_CREATE_ISCSI_IQN]):
ag = self._create_access_group(
rs('ag'), s, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN)
if ag is not None and \
supported(cap, [Cap.ACCESS_GROUP_DELETE]):
self._test_ag_create_dup(ag, s)
self._delete_access_group(ag)
if supported(cap, [Cap.ACCESS_GROUPS, Cap.ACCESS_GROUP_CREATE_WWPN]):
ag = self._create_access_group(
rs('ag'), s, lsm.AccessGroup.INIT_TYPE_WWPN)
if ag is not None and \
supported(cap, [Cap.ACCESS_GROUP_DELETE]):
self._test_ag_create_dup(ag, s)
self._delete_access_group(ag)
def test_iscsi_chap(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.ACCESS_GROUPS,
Cap.ACCESS_GROUP_CREATE_ISCSI_IQN,
Cap.VOLUME_ISCSI_CHAP_AUTHENTICATION]):
ag = self._create_access_group(
rs('ag'), s, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN)
self.assertTrue(ag is not None)
if ag:
self.c.iscsi_chap_auth(ag.init_ids[0], 'foo', rs(None, 12),
None, None)
if supported(cap, [Cap.ACCESS_GROUP_DELETE]):
self._test_ag_create_dup(ag, s)
self._delete_access_group(ag)
def test_access_group_create_delete(self):
for s in self.systems:
cap = self.c.capabilities(s)
self._test_ag_create_delete(cap, s)
def test_access_group_list(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.ACCESS_GROUPS]):
ag_list = self.c.access_groups('system_id', s.id)
if len(ag_list) == 0:
self._test_ag_create_delete(cap, s)
else:
self.assertTrue(len(ag_list) > 0,
"Need at least 1 access group for testing "
"and no support exists for creation of "
"access groups for this system")
def _ag_init_add(self, ag):
if ag.init_type == lsm.AccessGroup.INIT_TYPE_ISCSI_IQN:
t_id = r_iqn()
t = lsm.AccessGroup.INIT_TYPE_ISCSI_IQN
else:
# We will try FC PN
t_id = r_fcpn()
t = lsm.AccessGroup.INIT_TYPE_WWPN
ag_add = self.c.access_group_initiator_add(ag, t_id, t)
self.assertTrue(ag_add.system_id == ag.system_id)
ag_after = self.c.access_groups('id', ag.id)[0]
match = [x for x in ag_after.init_ids if x == t_id]
self.assertTrue(len(match) == 1)
return t_id
def _ag_init_delete(self, ag, init_id, init_type):
self.c.access_group_initiator_delete(ag, init_id, init_type)
ag_after = self.c.access_groups('id', ag.id)[0]
match = [x for x in ag_after.init_ids if x == init_id]
self.assertTrue(len(match) == 0)
def test_access_group_initiator_add_delete(self):
usable_ag_types = [lsm.AccessGroup.INIT_TYPE_WWPN,
lsm.AccessGroup.INIT_TYPE_ISCSI_IQN]
for s in self.systems:
ag_to_delete = None
cap = self.c.capabilities(s)
if supported(cap, [Cap.ACCESS_GROUPS]):
ag_list = self.c.access_groups('system_id', s.id)
if supported(cap, [Cap.ACCESS_GROUP_CREATE_WWPN])\
or supported(cap, [Cap.ACCESS_GROUP_CREATE_ISCSI_IQN]):
if supported(
cap, [Cap.ACCESS_GROUP_CREATE_ISCSI_IQN,
Cap.ACCESS_GROUP_DELETE]):
ag_to_delete = self._create_access_group(
rs('ag'), s,
lsm.AccessGroup.INIT_TYPE_ISCSI_IQN)
self.c.access_groups('system_id', s.id)
if supported(cap,
[Cap.
ACCESS_GROUP_INITIATOR_ADD_ISCSI_IQN]):
init_id = self._ag_init_add(ag_to_delete)
if supported(
cap, [Cap.ACCESS_GROUP_INITIATOR_DELETE]):
self._ag_init_delete(
ag_to_delete, init_id,
lsm.AccessGroup.INIT_TYPE_ISCSI_IQN)
if supported(
cap, [Cap.ACCESS_GROUP_CREATE_WWPN,
Cap.ACCESS_GROUP_DELETE]):
ag_to_delete = self._create_access_group(
rs('ag'), s, lsm.AccessGroup.INIT_TYPE_WWPN)
self.c.access_groups('system_id', s.id)
if ag_to_delete is not None:
self._delete_access_group(ag_to_delete)
else:
if len(ag_list):
# Try and find an initiator group that has a usable
# access group type instead of unknown or other...
ag = ag_list[0]
for a_tmp in ag_list:
if a_tmp.init_type in usable_ag_types:
ag = a_tmp
break
if supported(cap, [Cap.
ACCESS_GROUP_INITIATOR_ADD_WWPN]):
init_id = self._ag_init_add(ag)
if supported(
cap, [Cap.ACCESS_GROUP_INITIATOR_DELETE]):
self._ag_init_delete(
ag, init_id,
lsm.AccessGroup.INIT_TYPE_WWPN)
if supported(
cap,
[Cap.ACCESS_GROUP_INITIATOR_ADD_ISCSI_IQN]):
init_id = self._ag_init_add(ag)
if supported(cap,
[Cap.ACCESS_GROUP_INITIATOR_DELETE]):
self._ag_init_delete(
ag, init_id,
lsm.AccessGroup.INIT_TYPE_ISCSI_IQN)
def test_duplicate_volume_name(self):
if self.pool_by_sys_id:
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_CREATE]):
vol, pool = self._volume_create(s.id)
self.assertTrue(vol is not None)
# Try to create another with same name
try:
self.c.volume_create(
pool, vol.name, vol.size_bytes,
lsm.Volume.PROVISION_DEFAULT)[1]
except LsmError as le:
self.assertTrue(le.code == ErrorNumber.NAME_CONFLICT)
if vol is not None and \
supported(cap, [Cap.VOLUME_DELETE]):
self._volume_delete(vol)
def test_duplicate_access_group_name(self):
for s in self.systems:
ag_name = rs('ag_dupe')
cap = self.c.capabilities(s)
if supported(cap, [Cap.ACCESS_GROUPS,
Cap.ACCESS_GROUP_DELETE]):
self.c.access_groups('system_id', s.id)
if supported(
cap, [Cap.ACCESS_GROUP_CREATE_ISCSI_IQN]):
ag_type = lsm.AccessGroup.INIT_TYPE_ISCSI_IQN
elif supported(cap,
[Cap.ACCESS_GROUP_CREATE_WWPN]):
ag_type = lsm.AccessGroup.INIT_TYPE_WWPN
else:
return
ag_created = self._create_access_group(
ag_name, s, ag_type)
if ag_created is not None:
# Try to create a duplicate
got_exception = False
try:
self._create_access_group(
ag_name, s, ag_type)
except LsmError as le:
got_exception = True
self.assertTrue(le.code == ErrorNumber.NAME_CONFLICT)
self.assertTrue(got_exception)
self._delete_access_group(ag_created)
def test_ag_vol_delete_with_vol_masked(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.ACCESS_GROUPS,
Cap.ACCESS_GROUP_CREATE_ISCSI_IQN,
Cap.ACCESS_GROUP_DELETE,
Cap.VOLUME_UNMASK,
Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE,
Cap.VOLUME_MASK,
Cap.VOLUME_UNMASK]):
ag_name = rs("ag")
ag_iqn = r_iqn()
ag = self.c.access_group_create(
ag_name, ag_iqn, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN, s)
pool = self._get_pool_by_usage(s.id,
lsm.Pool.ELEMENT_TYPE_VOLUME)
if ag and pool:
vol_size = self._min_size()
vol = self.c.volume_create(
pool, rs('v'), vol_size,
lsm.Volume.PROVISION_DEFAULT)[1]
if vol:
got_exception = False
self.c.volume_mask(ag, vol)
# Try to delete the access group
try:
self.c.access_group_delete(ag)
except LsmError as le:
if le.code == lsm.ErrorNumber.IS_MASKED:
got_exception = True
self.assertTrue(le.code ==
lsm.ErrorNumber.IS_MASKED)
self.assertTrue(got_exception)
# Try to delete the volume
got_exception = False
try:
self.c.volume_delete(vol)
except LsmError as le:
if le.code == lsm.ErrorNumber.IS_MASKED:
got_exception = True
self.assertTrue(le.code ==
lsm.ErrorNumber.IS_MASKED)
self.assertTrue(got_exception)
# Clean up
self.c.volume_unmask(ag, vol)
self.c.volume_delete(vol)
self.c.access_group_delete(ag)
def test_volume_delete_with_replication(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_REPLICATE_CLONE,
Cap.VOLUME_CHILD_DEPENDENCY,
Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE]):
pool = self._get_pool_by_usage(s.id,
lsm.Pool.ELEMENT_TYPE_VOLUME)
if pool:
vol_size = self._min_size()
vol = self.c.volume_create(
pool, rs('v'), vol_size,
lsm.Volume.PROVISION_DEFAULT)[1]
clone_vol = self.c.volume_replicate(
pool, lsm.Volume.REPLICATE_CLONE, vol,
rs('test_vd_with_repo_'))[1]
if self.c.volume_child_dependency(vol):
got_exception = False
try:
self.c.volume_delete(vol)
except LsmError as le:
if le.code == lsm.ErrorNumber.HAS_CHILD_DEPENDENCY:
got_exception = True
self.assertTrue(
le.code == lsm.ErrorNumber.HAS_CHILD_DEPENDENCY)
self.assertTrue(got_exception)
# Clean up
self.c.volume_delete(clone_vol)
self.c.volume_delete(vol)
else:
self._skip_current_test(
"Skip test: current system does not support "
"volume_create() or volume_replicate() with CLONE "
"or volume_delete()")
def test_fs_delete_with_clone(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.FS_CREATE, Cap.FS_CLONE, Cap.FS_DELETE,
Cap.FS_CHILD_DEPENDENCY]):
fs, pool = self._fs_create(s.id)
self.assertTrue(fs is not None)
fs_child = self.c.fs_clone(fs, rs('fs_c_'))[1]
if self.c.fs_child_dependency(fs, None):
got_exception = False
try:
self.c.fs_delete(fs)
except LsmError as le:
if le.code == lsm.ErrorNumber.HAS_CHILD_DEPENDENCY:
got_exception = True
self.assertTrue(le.code ==
lsm.ErrorNumber.HAS_CHILD_DEPENDENCY)
self.assertTrue(got_exception)
# Clean up
self.c.fs_delete(fs_child)
self.c.fs_delete(fs)
else:
self._skip_current_test(
"Skip test: current system does not support "
"fs_create() or fs_clone() or fs_delete()")
def test_volume_vpd83_verify(self):
failing = [None,
"012345678901234567890123456789AB",
"012345678901234567890123456789ax",
"012345678901234567890123456789ag",
"1234567890123456789012345abcdef",
"01234567890123456789012345abcdefa",
"55cd2e404beec32e0", "55cd2e404beec32ex",
"35cd2e404beec32A"]
for f in failing:
self.assertFalse(lsm.Volume.vpd83_verify(f))
self.assertTrue(
lsm.Volume.vpd83_verify("61234567890123456789012345abcdef"))
self.assertTrue(
lsm.Volume.vpd83_verify("55cd2e404beec32e"))
self.assertTrue(
lsm.Volume.vpd83_verify("35cd2e404beec32e"))
self.assertTrue(
lsm.Volume.vpd83_verify("25cd2e404beec32e"))
def test_available_plugins(self):
plugins = self.c.available_plugins(':')
self.assertTrue(plugins is not None)
self.assertTrue(len(plugins) > 0)
self.assertTrue(':' in plugins[0])
def test_volume_enable_disable(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE,
Cap.VOLUME_ENABLE, Cap.VOLUME_DISABLE]):
vol, pool = self._volume_create(s.id)
self.c.volume_disable(vol)
self.c.volume_enable(vol)
self._volume_delete(vol)
def test_daemon_not_running(self):
current = None
got_exception = False
# Force a ErrorNumber.DAEMON_NOT_RUNNING
if 'LSM_UDS_PATH' in os.environ:
current = os.environ['LSM_UDS_PATH']
tmp_dir = tempfile.mkdtemp()
os.environ['LSM_UDS_PATH'] = tmp_dir
try:
lsm.Client(TestPlugin.URI, TestPlugin.PASSWORD)
except LsmError as expected_error:
got_exception = True
self.assertTrue(expected_error.code ==
ErrorNumber.DAEMON_NOT_RUNNING,
'Actual error %d' % expected_error.code)
self.assertTrue(got_exception)
os.rmdir(tmp_dir)
if current:
os.environ['LSM_UDS_PATH'] = current
else:
del os.environ['LSM_UDS_PATH']
def test_non_existent_plugin(self):
got_exception = False
try:
uri = "%s://user@host" % rs(None, 6)
lsm.Client(uri, TestPlugin.PASSWORD)
except LsmError as expected_error:
got_exception = True
self.assertTrue(expected_error.code ==
ErrorNumber.PLUGIN_NOT_EXIST,
'Actual error %d' % expected_error.code)
self.assertTrue(got_exception)
def test_volume_depends(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE,
Cap.VOLUME_CHILD_DEPENDENCY,
Cap.VOLUME_CHILD_DEPENDENCY_RM]) and \
(supported(cap, [Cap.VOLUME_REPLICATE_COPY]) or
supported(cap, [Cap.VOLUME_REPLICATE_CLONE])):
vol, pol = self._volume_create(s.id)
if supported(cap, [Cap.VOLUME_REPLICATE_CLONE]):
vol_child = self.c.volume_replicate(
None,
lsm.Volume.REPLICATE_CLONE,
vol,
rs('v_tc_'))[1]
else:
vol_child = self.c.volume_replicate(
None,
lsm.Volume.REPLICATE_COPY,
vol,
rs('v_fc_'))[1]
self.assertTrue(vol_child is not None)
if self.c.volume_child_dependency(vol):
self.c.volume_child_dependency_rm(vol)
else:
self.assertTrue(self.c.volume_child_dependency_rm(vol)
is None)
self._volume_delete(vol)
if vol_child:
self._volume_delete(vol_child)
def test_fs_depends(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.FS_CREATE, Cap.FS_DELETE,
Cap.FS_CHILD_DEPENDENCY,
Cap.FS_CHILD_DEPENDENCY_RM,
Cap.FS_CLONE]):
fs, pol = self._fs_create(s.id)
fs_child = self.c.fs_clone(fs, rs('fs_c_'))[1]
self.assertTrue(fs_child is not None)
if self.c.fs_child_dependency(fs, None):
self.c.fs_child_dependency_rm(fs, None)
else:
self.assertTrue(self.c.fs_child_dependency_rm(fs, None)
is None)
self._fs_delete(fs)
if fs_child:
self._fs_delete(fs_child)
def test_nfs_auth_types(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.EXPORT_AUTH]):
auth_types = self.c.export_auth()
self.assertTrue(auth_types is not None)
def test_export_list(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.EXPORTS]):
self.c.exports()
# TODO verify export values
def test_create_delete_exports(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.FS_CREATE, Cap.EXPORTS, Cap.EXPORT_FS,
Cap.EXPORT_REMOVE]):
fs, pool = self._fs_create(s.id)
if supported(cap, [Cap.EXPORT_CUSTOM_PATH]):
path = "/mnt/%s" % rs(None, 6)
exp = self.c.export_fs(fs.id, path, [], [],
['192.168.2.1'])
else:
exp = self.c.export_fs(fs.id, None, [], [],
['192.168.2.1'])
self.c.export_remove(exp)
self._fs_delete(fs)
def test_pool_member_info(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.POOL_MEMBER_INFO]):
for pool in self.c.pools():
(raid_type, member_type, member_ids) = \
self.c.pool_member_info(pool)
self.assertTrue(type(raid_type) is int)
self.assertTrue(type(member_type) is int)
self.assertTrue(type(member_ids) is list)
def _skip_current_test(self, messsage):
"""
If skipTest is supported, skip this test with provided message.
Sliently return if not supported.
"""
if hasattr(unittest.TestCase, 'skipTest') is True:
self.skipTest(messsage)
return
def test_volume_raid_create(self):
for s in self.systems:
cap = self.c.capabilities(s)
# TODO(Gris Ge): Add test code for other RAID type and strip size
if supported(cap, [Cap.VOLUME_RAID_CREATE]):
supported_raid_types, supported_strip_sizes = \
self.c.volume_raid_create_cap_get(s)
if lsm.Volume.RAID_TYPE_RAID1 not in supported_raid_types:
self._skip_current_test(
"Skip test: current system does not support "
"creating RAID1 volume")
# Find two free disks
free_disks = []
for disk in self.c.disks():
if len(free_disks) == 2:
break
if disk.status & lsm.Disk.STATUS_FREE:
free_disks.append(disk)
if len(free_disks) != 2:
self._skip_current_test(
"Skip test: Failed to find two free disks for RAID 1")
return
new_vol = self.c.volume_raid_create(
rs('v'), lsm.Volume.RAID_TYPE_RAID1, free_disks,
lsm.Volume.VCR_STRIP_SIZE_DEFAULT)
self.assertTrue(new_vol is not None)
# TODO(Gris Ge): Use volume_raid_info() and pool_member_info()
# to verify size, raid_type, member type, member ids.
if supported(cap, [Cap.VOLUME_DELETE]):
self._volume_delete(new_vol)
updated_disks = []
# Check whether disks are been set back to free mode.
for disk in self.c.disks():
if disk.id in list(d.id for d in free_disks):
updated_disks.append(disk)
self.assertTrue(
disk.status & lsm.Disk.STATUS_FREE,
"Disk %s is not restored to free state "
"after volume_delete() on a raid volume"
% disk.id)
self.assertTrue(len(updated_disks) == 2,
"Disk missing after volume_delete() on "
"a raid volume")
else:
self._skip_current_test(
"Skip test: not support of VOLUME_RAID_CREATE")
def test_volume_raid_info(self):
flag_supported = False
pool_id_to_lsm_vols = dict()
lsm_vol = None
s = None
cap = None
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_RAID_INFO]):
flag_supported = True
break
if flag_supported is False:
self._skip_current_test(
"Skip test: current system does not support "
"query volume raid info(lsm.Capabilities.VOLUME_RAID_INFO)")
# Try to find a volume per pool.
lsm_vols = self.c.volumes()
for lsm_vol in lsm_vols:
pool_id = lsm_vol.pool_id
if len(pool_id) != 0 and \
pool_id_to_lsm_vols.get(pool_id) is None:
pool_id_to_lsm_vols[pool_id] = lsm_vol
lsm_vols = list(pool_id_to_lsm_vols.values())
if supported(cap, [Cap.VOLUME_CREATE]):
created_lsm_vol = self._volume_create(s.id)[0]
lsm_vols.append(created_lsm_vol)
for lsm_vol in lsm_vols:
[raid_type, strip_size, disk_count, min_io_size, opt_io_size] = \
self.c.volume_raid_info(lsm_vol)
# TODO Make these checks better by checking type and range
self.assertTrue(raid_type is not None)
self.assertTrue(strip_size is not None)
self.assertTrue(disk_count is not None)
self.assertTrue(min_io_size is not None)
self.assertTrue(opt_io_size is not None)
# Test NOT_FOUND_VOLUME error.
if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]):
self._volume_delete(created_lsm_vol)
try:
self.c.volume_raid_info(lsm_vol)
except lsm.LsmError as le:
if le.code != ErrorNumber.NOT_FOUND_VOLUME:
raise
def test_volume_ident_led_on(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_LED]):
for volume in self.c.volumes():
volume_led_status = self.c.volume_ident_led_on(volume)
self.assertTrue(volume_led_status is None,
"Volume ident_led_on"
"failed")
def test_volume_ident_led_off(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.VOLUME_LED]):
for volume in self.c.volumes():
try:
volume_led_status = self.c.volume_ident_led_off(volume)
self.assertTrue(volume_led_status is None,
"Volume ident_led_off failed")
except lsm.LsmError as le:
# As we don't know if the LED in on/off we need to
# accept that either outcome is possible
if le.code != ErrorNumber.NO_STATE_CHANGE:
raise
def test_invalid_uri(self):
# Make sure we are getting an exception
self.assertRaises(lsm.LsmError, lsm.Client,
"ontap//root@na-sim", "some_password")
# Make sure exception has the correct error code
try:
lsm.Client("ontap//root@na-sim", "some_password")
except lsm.LsmError as le:
self.assertTrue(le.code == lsm.ErrorNumber.INVALID_ARGUMENT)
def test_battery_list(self):
for s in self.systems:
cap = self.c.capabilities(s)
if supported(cap, [Cap.BATTERIES]):
self.c.batteries()
def test_volume_cache_info(self):
flag_tested = False
flag_created = False
for s in self.systems:
cap = self.c.capabilities(s)
if not supported(cap, [Cap.VOLUME_CACHE_INFO]):
continue
lsm_vols = self.c.volumes(search_key='system_id',
search_value=s.id)
if len(lsm_vols) == 0:
if not supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]):
continue
lsm_vol = self._volume_create(s.id)[0]
flag_created = True
else:
lsm_vol = lsm_vols[0]
cache_info = self.c.volume_cache_info(lsm_vol)
self.assertTrue(len(cache_info) == 5)
if flag_created:
self._volume_delete(lsm_vol)
flag_tested = True
if flag_tested is False:
self._skip_current_test(
"Skip test: no storage system support volume cache info query")
def test_volume_cache_pdc_update(self):
flag_tested = False
for s in self.systems:
cap = self.c.capabilities(s)
if not supported(cap,
[Cap.VOLUME_CACHE_INFO, Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE,
Cap.VOLUME_PHYSICAL_DISK_CACHE_UPDATE]):
continue
lsm_vol = self._volume_create(s.id)[0]
self.c.volume_physical_disk_cache_update(
lsm_vol, lsm.Volume.PHYSICAL_DISK_CACHE_ENABLED)
cache_info = self.c.volume_cache_info(lsm_vol)
self.assertTrue(cache_info[4] ==
lsm.Volume.PHYSICAL_DISK_CACHE_ENABLED)
self.c.volume_physical_disk_cache_update(
lsm_vol, lsm.Volume.PHYSICAL_DISK_CACHE_DISABLED)
cache_info = self.c.volume_cache_info(lsm_vol)
self.assertTrue(cache_info[4] ==
lsm.Volume.PHYSICAL_DISK_CACHE_DISABLED)
self._volume_delete(lsm_vol)
flag_tested = True
if flag_tested is False:
self._skip_current_test(
"Skip test: current system does not support required "
"capabilities for testing volume_physical_disk_cache_update()")
def test_volume_cache_wcp_update(self):
flag_tested = False
for s in self.systems:
cap = self.c.capabilities(s)
if not supported(cap,
[Cap.VOLUME_CACHE_INFO, Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE]):
continue
lsm_vol = self._volume_create(s.id)[0]
if supported(cap,
[Cap.VOLUME_WRITE_CACHE_POLICY_UPDATE_WRITE_BACK]):
self.c.volume_write_cache_policy_update(
lsm_vol, lsm.Volume.WRITE_CACHE_POLICY_WRITE_BACK)
cache_info = self.c.volume_cache_info(lsm_vol)
self.assertTrue(cache_info[0] ==
lsm.Volume.WRITE_CACHE_POLICY_WRITE_BACK)
flag_tested = True
if supported(cap, [Cap.VOLUME_WRITE_CACHE_POLICY_UPDATE_AUTO]):
self.c.volume_write_cache_policy_update(
lsm_vol, lsm.Volume.WRITE_CACHE_POLICY_AUTO)
cache_info = self.c.volume_cache_info(lsm_vol)
self.assertTrue(cache_info[0] ==
lsm.Volume.WRITE_CACHE_POLICY_AUTO)
flag_tested = True
if supported(cap,
[Cap.VOLUME_WRITE_CACHE_POLICY_UPDATE_WRITE_THROUGH]):
self.c.volume_write_cache_policy_update(
lsm_vol, lsm.Volume.WRITE_CACHE_POLICY_WRITE_THROUGH)
cache_info = self.c.volume_cache_info(lsm_vol)
self.assertTrue(cache_info[0] ==
lsm.Volume.WRITE_CACHE_POLICY_WRITE_THROUGH)
flag_tested = True
self._volume_delete(lsm_vol)
if flag_tested is False:
self._skip_current_test(
"Skip test: current system does not support required "
"capabilities for testing volume_write_cache_policy_update()")
def test_volume_cache_rcp_update(self):
flag_tested = False
for s in self.systems:
cap = self.c.capabilities(s)
if not supported(cap,
[Cap.VOLUME_CACHE_INFO, Cap.VOLUME_CREATE,
Cap.VOLUME_DELETE,
Cap.VOLUME_READ_CACHE_POLICY_UPDATE]):
continue
lsm_vol = self._volume_create(s.id)[0]
self.c.volume_read_cache_policy_update(
lsm_vol, lsm.Volume.READ_CACHE_POLICY_ENABLED)
cache_info = self.c.volume_cache_info(lsm_vol)
self.assertTrue(cache_info[2] ==
lsm.Volume.READ_CACHE_POLICY_ENABLED)
self.c.volume_read_cache_policy_update(
lsm_vol, lsm.Volume.READ_CACHE_POLICY_DISABLED)
cache_info = self.c.volume_cache_info(lsm_vol)
self.assertTrue(cache_info[2] ==
lsm.Volume.READ_CACHE_POLICY_DISABLED)
self._volume_delete(lsm_vol)
flag_tested = True
if flag_tested is False:
self._skip_current_test(
"Skip test: current system does not support required "
"capabilities for testing volume_read_cache_policy_update()")
def dump_results():
"""
unittest.main exits when done so we need to register this handler to
get our results out.
If PyYAML is available we will output detailed results, else we will
output nothing. The detailed output results of what we called,
how it finished and how long it took.
"""
try:
# noinspection PyUnresolvedReferences
import yaml
sys.stdout.write(yaml.dump(dict(methods_called=results, stats=stats)))
except ImportError:
sys.stdout.write("NOTICE: Install PyYAML for detailed test results\n")
def add_our_params():
"""
There are probably easier ways to extend unittest, but this seems
easiest at the moment if we want to retain the default behavior and
introduce a couple of parameters.
"""
additional = """\
Options libStorageMgmt:
--password 'Array password'
--uri 'Array URI'
--skip 'Test case to skip. Repeatable argument'
"""
if sys.version_info[0] == 2:
unittest.TestProgram.USAGE += additional
if __name__ == "__main__":
atexit.register(dump_results)
add_our_params()
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--password', default=None)
parser.add_argument('--uri')
parser.add_argument('--skip', action='append')
options, other_args = parser.parse_known_args()
if options.uri:
TestPlugin.URI = options.uri
elif os.getenv('LSM_TEST_URI'):
TestPlugin.URI = os.getenv('LSM_TEST_URI')
else:
TestPlugin.URI = 'sim://'
if options.password:
TestPlugin.PASSWORD = options.password
elif os.getenv('LSM_TEST_PASSWORD'):
TestPlugin.PASSWORD = os.getenv('LSM_TEST_PASSWORD')
if options.skip:
if hasattr(unittest.TestCase, 'skipTest') is False:
raise Exception(
"Current python version is too old to support 'skipTest'")
TestPlugin.SKIP_TEST_CASES = options.skip
else:
TestPlugin.SKIP_TEST_CASES = []
print_stderr("\nPrefix: %s\n" % g_prefix)
unittest.main(argv=sys.argv[:1] + other_args)