#!/usr/bin/env python@PY_VERSION@ # Copyright (C) 2013-2016 Red Hat, Inc. # (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; If not, see . # # Author: tasleson # Joe Handzik # Gris Ge import lsm import functools import time import random import string import traceback import unittest import argparse try: # Python 3.8 required change from collections.abc import Sequence except ImportError: from collections import Sequence import atexit import sys import os import tempfile from lsm import LsmError, ErrorNumber from lsm import Capabilities as Cap results = {} stats = {} MIN_POOL_SIZE = 4096 MIN_OBJECT_SIZE = 512 def _rs(l): return ''.join(random.choice(string.ascii_uppercase) for _ in range(l)) # Lets create a random prefix so we can run multiple copies at the same time # for hardware that will allow it. g_prefix = _rs(4) def rs(component, l=4): """ Generate a random string """ rp = _rs(l) if component is not None: return '%s_%s_%s' % (g_prefix, component, rp) return rp # If you print anything during execution make sure it's to stderr as the # automated tests are making the assumption that test messages go to stderr and # execution information gets written to stdout as yaml def print_stderr(msg): sys.stderr.write(msg) sys.stderr.flush() def mb_in_bytes(mib): return 1024 * 1024 * mib def record_result(method): def recorder(*args, **kwargs): try: result = method(*args, **kwargs) results[method.__name__] = dict(rc=True, msg=None) return result except Exception as e: results[method.__name__] = dict(rc=False, stack_trace=traceback.format_exc(), msg=str(e)) return recorder def update_stats(method_name, duration, number_results): if method_name in stats: stats[method_name]["count"] += 1 else: stats[method_name] = dict(count=1, total_time=0, number_items=0) stats[method_name]["total_time"] += duration if number_results > 0: stats[method_name]["number_items"] += number_results def r_fcpn(): """ Generate a random 16 character hex number """ rnd_fcpn = '%016x' % random.randrange(2 ** 64) return ':'.join(rnd_fcpn[i:i + 2] for i in range(0, len(rnd_fcpn), 2)) def r_iqn(): # RFC 3722, only lower case characters are allowed in IQN return 'iqn.1994-05.com.domain:01.' + rs(None, 6).lower() class Duration(object): def __init__(self): self.start = 0 self.end = 0 def __enter__(self): self.start = time.time() return self def __exit__(self, *ignore): self.end = time.time() def amount(self): return self.end - self.start def supported(cap, capability): for c in capability: if not cap.supported(c): return False return True class TestProxy(object): # Errors that we are forcing to occur not_logging = [lsm.ErrorNumber.NO_SUPPORT, lsm.ErrorNumber.NAME_CONFLICT, lsm.ErrorNumber.NO_STATE_CHANGE, lsm.ErrorNumber.IS_MASKED, lsm.ErrorNumber.HAS_CHILD_DEPENDENCY, lsm.ErrorNumber.EXISTS_INITIATOR] # Hash of all calls that can be async async_calls = {'volume_create': (str, lsm.Volume), 'volume_resize': (str, lsm.Volume), 'volume_replicate': (str, lsm.Volume), 'volume_replicate_range': (str,), 'volume_delete': (str,), 'volume_child_dependency_rm': (str,), 'fs_delete': (str,), 'fs_resize': (str, lsm.FileSystem), 'fs_create': (str, lsm.FileSystem), 'fs_clone': (str, lsm.FileSystem), 'fs_file_clone': (str,), 'fs_snapshot_create': (str, lsm.FsSnapshot), 'fs_snapshot_delete': (str,), 'fs_snapshot_restore': (str,), 'fs_child_dependency_rm': (str,)} # The constructor. # @param self The object self # @param obj The object instance to wrap def __init__(self, obj=None): """ Constructor which takes an object to wrap. """ self.o = obj # Called each time an attribute is requested of the object # @param self The object self # @param name Name of the attribute being accessed # @return The result of the method def __getattr__(self, name): """ Called each time an attribute is requested of the object """ if hasattr(self.o, name): return functools.partial(self.present, name) else: raise AttributeError("No such method %s" % name) @staticmethod def log_result(method, v): if method not in results: results[method] = [] results[method].append(v) # Method which is called to invoke the actual method of interest. # # The intentions of this method is this: # - Invoke the method just like it normally would without this # so signature in & out is identical # - Collect results of the method call # - Collect stats on the execution time of call # # @param self The object self # @param _proxy_method_name Method to invoke # @param args Arguments # @param kwargs Keyword arguments # @return The result of the method invocation def present(self, _proxy_method_name, *args, **kwargs): """ Method which is called to invoke the actual method of interest. """ job_possible = _proxy_method_name in TestProxy.async_calls # Timer block with Duration() as method_time: try: rc = getattr(self.o, _proxy_method_name)(*args, **kwargs) TestProxy.log_result(_proxy_method_name, dict(rc=True, stack_trace=None, msg=None)) except lsm.LsmError as le: # We are forcing some types of error, for these we won't log # but will allow the test case asserts to check to make sure # we actually got them. if le.code not in self.not_logging: TestProxy.log_result( _proxy_method_name, dict(rc=False, stack_trace=traceback.format_exc(), msg=str(le))) raise # If the job can do async, we will block looping on it. if job_possible and rc is not None: # Note: Some return a single unicode or None, # others return a tuple (job, object) if type(rc) != tuple and type(rc) != list: rc = (rc, None) rc = self.wait_for_it(_proxy_method_name, *rc) # Fix up return value to match what it would normally be if job_possible: if 2 == len(TestProxy.async_calls[_proxy_method_name]): rc = (None, rc) # We don't care about time per operation when there is only one # possible. if not job_possible and isinstance(rc, Sequence) and len(rc) > 2: num_results = len(rc) else: num_results = 0 update_stats(_proxy_method_name, method_time.amount(), num_results) return rc def wait_for_it(self, msg, job, item): if not job: return item else: while True: (s, percent, i) = self.job_status(job) if s == lsm.JobStatus.INPROGRESS: time.sleep(0.25) elif s == lsm.JobStatus.COMPLETE: self.job_free(job) return i else: raise Exception(msg + " job error code= " + str(s)) def check_type(value, *expected): assert type(value) in expected, "type expected (%s), type actual (%s)" % \ (str(type(value)), str(expected)) class TestPlugin(unittest.TestCase): """ Anything that starts with test_ will be run as a separate unit test with the setUp and tearDown methods called before and after respectively """ URI = 'sim://' PASSWORD = None @staticmethod def _min_size(): return mb_in_bytes(MIN_OBJECT_SIZE) def setUp(self): for skip_test_case in TestPlugin.SKIP_TEST_CASES: if self.id().endswith(skip_test_case): self.skipTest("Tested has been skiped as requested") self.c = TestProxy(lsm.Client(TestPlugin.URI, TestPlugin.PASSWORD)) self.systems = self.c.systems() self.pools = self.c.pools() self.pool_by_sys_id = {} for s in self.systems: self.pool_by_sys_id[s.id] = [p for p in self.pools if p.system_id == s.id] # TODO Store what exists, so that we don't remove it def _get_pool_by_usage(self, system_id, element_type, unsupported_features=0): largest_free = 0 rc = None for p in self.pool_by_sys_id[system_id]: # If the pool matches our criteria and min size we will consider # it, but we will select the one with the most free space for # testing and one that support volume expansion if p.element_type & element_type and \ p.free_space > mb_in_bytes(MIN_POOL_SIZE) and \ (not p.unsupported_actions & unsupported_features): if p.free_space > largest_free: largest_free = p.free_space rc = p return rc def _clean_up(self): # Note: We make best effort to clean things up, thus we are ignoring # exceptions in this code, depending on what's failing in the plugin # we may not be able to remove something we created # noinspection PyBroadException try: for s in self.systems: cap = self.c.capabilities(s) # Remove any access groups we created, removing any mappings # first. if supported(cap, [Cap.ACCESS_GROUP_DELETE, Cap.VOLUMES_ACCESSIBLE_BY_ACCESS_GROUP]): for ag in self.c.access_groups(): if g_prefix in ag.name and s.id == ag.system_id: try: # Make sure it doesn't have any mappings mapped_volume = self.c.\ volumes_accessible_by_access_group(ag) # Remove any mappings for vol in mapped_volume: self.c.volume_unmask(ag, vol) # Lastly, remove the access group self.c.access_group_delete(ag) except LsmError as le: print_stderr("[WARNING] error when " "removing ag %s\n" % str(le)) pass # Remove any volumes we created if supported(cap, [Cap.VOLUME_DELETE]): for v in self.c.volumes(): if g_prefix in v.name and s.id == v.system_id: # Check to see if this volume is participating in # an access group, if it is we will remove the # volume from it, but we will not delete the access # group as we likely didn't create it access_groups = self.c.\ access_groups_granted_to_volume(v) for ag in access_groups: try: self.c.volume_unmask(ag, v) except LsmError as le: print_stderr( "[WARNING] error when unmasking " "volume %s\n" % str(le)) pass try: self.c.volume_delete(v) except LsmError as le: print_stderr("[WARNING] error when removing " "volume %s\n" % str(le)) pass # Remove any fs exports we created if supported(cap, [Cap.FS, Cap.EXPORTS, Cap.EXPORT_REMOVE]): # Get all the FS fs_list = self.c.fs() for e_fs in self.c.exports(): # Get the fs object that is exported fs = [x for x in fs_list if x.id == e_fs.fs_id][0] # Make sure we are un-exporting a FS we created if g_prefix in fs.name and s.id == fs.system_id: try: print_stderr("Export remove: %s" % str(e_fs)) self.c.export_remove(e_fs) except LsmError as le: print_stderr("[WARNING] error when removing " "export %s\n" % str(le)) pass # Remove any fs we created if supported(cap, [Cap.FS, Cap.FS_DELETE]): for f in self.c.fs(): if g_prefix in f.name and s.id == f.system_id: if supported(cap, [Cap.FS_CHILD_DEPENDENCY, Cap.FS_CHILD_DEPENDENCY_RM]): if self.c.fs_child_dependency(f, None): self.c.fs_child_dependency_rm(f, None) try: self.c.fs_delete(f) except LsmError as le: print_stderr("[WARNING] error when removing " "fs %s\n" % str(le)) pass except Exception: msg = str(traceback.format_exc()) print_stderr("[WARNING] exception in _clean_up %s\n" % msg) pass def tearDown(self): self._clean_up() self.c.close() def test_plugin_info(self): (desc, version) = self.c.plugin_info() self.assertTrue(desc is not None and len(desc) > 0) self.assertTrue(version is not None and len(version) > 0) def test_fw_version_get(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.SYS_FW_VERSION_GET]): fw_ver = s.fw_version self.assertTrue(fw_ver is not None and len(fw_ver) > 0, "Firmware version retrieval failed") def test_sys_mode_get(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.SYS_MODE_GET]): sys_mode = s.mode self.assertTrue(sys_mode is not None, "System mode retrieval failed") def test_sys_read_cache_pct_get(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.SYS_READ_CACHE_PCT_GET]): read_pct = s.read_cache_pct self.assertTrue(read_pct is not None, "Read cache percentage retrieval failed") def test_system_read_cache_pct_update(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.SYS_READ_CACHE_PCT_UPDATE]): cache_status = self.c.system_read_cache_pct_update(s, 100) self.assertTrue(cache_status is None, "system_read_cache_pct_update failed") def test_timeout(self): tmo = 40000 self.c.time_out_set(tmo) self.assertEqual(self.c.time_out_get(), tmo) def test_systems_list(self): self.c.systems() def test_pools_list(self): self.c.pools() def _find_or_create_volumes(self): """ Find existing volumes, if not found, try to create one. Return (volumes, flag_created) If 'flag_created' is True, then returned volumes is newly created. """ volumes = self.c.volumes() flag_created = False if len(self.c.volumes()) == 0: for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]): self._volume_create(s.id) flag_created = True break volumes = self.c.volumes() return volumes, flag_created def test_volume_list(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUMES]): (volumes, flag_created) = self._find_or_create_volumes() self.assertTrue( len(volumes) > 0, "We need at least 1 volume to test") if flag_created: self._volume_delete(volumes[0]) def test_volume_vpd83(self): # You cannot test for vpd83 if the device doesn't support volumes for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUMES]): (volumes, flag_created) = self._find_or_create_volumes() self.assertTrue( len(volumes) > 0, "We need at least 1 volume to test") for v in volumes: # Don't test this on HDS as VPD is empty if '10.1.134.227' not in TestPlugin.URI: self.assertTrue( lsm.Volume.vpd83_verify(v.vpd83), "VPD is not as expected '%s' for volume id: '%s'" % (v.vpd83, v.id)) if flag_created: self._volume_delete(volumes[0]) def test_disks_list(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.DISKS]): disks = self.c.disks() self.assertTrue(len(disks) > 0, "We need at least 1 disk to test") if supported(cap, [Cap.DISK_VPD83_GET]): try: list(disk.vpd83 for disk in disks) list(disk.rpm for disk in disks) list(disk.link_type for disk in disks) except LsmError as lsm_err: if lsm_err.code != ErrorNumber.NO_SUPPORT: raise def test_disk_location_get(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.DISK_LOCATION]): for disk in self.c.disks(): disk_location = disk.location self.assertTrue(disk_location is not None and len(disk_location) > 0, "Disk location retrieval failed") def _volume_create(self, system_id, element_type=lsm.Pool.ELEMENT_TYPE_VOLUME, unsupported_features=0): if system_id in self.pool_by_sys_id: p = self._get_pool_by_usage(system_id, element_type, unsupported_features) self.assertTrue(p is not None, "Unable to find a suitable pool") if p: vol_size = self._min_size() vol = self.c.volume_create(p, rs('v'), vol_size, lsm.Volume.PROVISION_DEFAULT)[1] self.assertTrue(self._volume_exists(vol.id), p.id) self.assertTrue(vol.pool_id == p.id) self.assertTrue(vol.system_id == p.system_id) return vol, p def _fs_create(self, system_id): if system_id in self.pool_by_sys_id: fs = None pool = self._get_pool_by_usage(system_id, lsm.Pool.ELEMENT_TYPE_FS) self.assertTrue(pool is not None, "Unable to find a suitable pool " "for fs creation") if pool is not None: fs_size = self._min_size() fs = self.c.fs_create(pool, rs('fs'), fs_size)[1] self.assertTrue(fs is not None) if fs: self.assertTrue(self._fs_exists(fs.id)) self.assertTrue(self._system_exists(fs.system_id)) self.assertTrue(pool.system_id == fs.system_id) return fs, pool def _volume_delete(self, volume): self.c.volume_delete(volume) self.assertFalse(self._volume_exists(volume.id)) def _fs_delete(self, fs): self.c.fs_delete(fs) self.assertFalse(self._fs_exists(fs.id)) def _fs_snapshot_delete(self, fs, ss): self.c.fs_snapshot_delete(fs, ss) self.assertFalse(self._fs_snapshot_exists(fs, ss.id)) def _volume_exists(self, volume_id, pool_id=None): volumes = self.c.volumes() for v in volumes: if v.id == volume_id: if pool_id is not None: if v.pool_id == pool_id: return True else: return False return True return False def _fs_exists(self, fs_id): fs = self.c.fs() for f in fs: if f.id == fs_id: return True return False def _system_exists(self, system_id): systems = self.c.systems() for s in systems: if s.id == system_id: return True return False def _fs_snapshot_exists(self, fs, ss_id): snapshots = self.c.fs_snapshots(fs) for s in snapshots: if s.id == ss_id: return True return False def test_volume_create_delete(self): if self.pool_by_sys_id: for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_CREATE]): vol = self._volume_create(s.id)[0] self.assertTrue(vol is not None) if vol is not None and \ supported(cap, [Cap.VOLUME_DELETE]): self._volume_delete(vol) def test_volume_resize(self): if self.pool_by_sys_id: for s in self.systems: cap = self.c.capabilities(s) # We need to make sure that the pool supports volume grow. unsupported = lsm.Pool.UNSUPPORTED_VOLUME_GROW if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE, Cap.VOLUME_RESIZE]): vol = self._volume_create( s.id, unsupported_features=unsupported)[0] vol_resize = self.c.volume_resize( vol, vol.size_bytes + mb_in_bytes(16))[1] self.assertTrue(vol.size_bytes < vol_resize.size_bytes) # The 3par in the lab has a firmware bug that causes # this assert to fail, lets skip this test for that box. # we need to come up with a better way to handle these # kind of things, ie. expected failures for certain things if '10.1.134.160' not in TestPlugin.URI: self.assertTrue(vol.id == vol_resize.id, "Expecting re-sized volume to refer to " "same volume. Expected %s, got %s" % (vol.id, vol_resize.id)) if vol.id == vol_resize.id: self._volume_delete(vol_resize) else: # Delete the original self._volume_delete(vol) def _replicate_test(self, capability, replication_type): if self.pool_by_sys_id: for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]): vol, pool = self._volume_create(s.id) # For the moment lets allow the array to pick the pool # to supply the backing store for the replicate if supported(cap, [capability]): volume_clone = self.c.volume_replicate( None, replication_type, vol, rs('v_c_'))[1] self.assertTrue(volume_clone is not None) if volume_clone: self.assertTrue( self._volume_exists(volume_clone.id)) # Lets test for creating a clone with an # existing name error_num = None try: self.c.volume_replicate( None, replication_type, vol, volume_clone.name)[1] except LsmError as le: error_num = le.code self.assertTrue(error_num == ErrorNumber.NAME_CONFLICT) self._volume_delete(volume_clone) self._volume_delete(vol) def test_volume_replication(self): self._replicate_test(Cap.VOLUME_REPLICATE_CLONE, lsm.Volume.REPLICATE_CLONE) self._replicate_test(Cap.VOLUME_REPLICATE_COPY, lsm.Volume.REPLICATE_COPY) self._replicate_test(Cap.VOLUME_REPLICATE_MIRROR_ASYNC, lsm.Volume.REPLICATE_MIRROR_ASYNC) self._replicate_test(Cap.VOLUME_REPLICATE_MIRROR_SYNC, lsm.Volume.REPLICATE_MIRROR_SYNC) def test_volume_replicate_range_block_size(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_COPY_RANGE_BLOCK_SIZE]): size = self.c.volume_replicate_range_block_size(s) self.assertTrue(size > 0) else: self.assertRaises(lsm.LsmError, self.c.volume_replicate_range_block_size, s) def test_replication_range(self): if self.pool_by_sys_id: for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_COPY_RANGE_BLOCK_SIZE, Cap.VOLUME_CREATE, Cap.VOLUME_DELETE, Cap.VOLUME_COPY_RANGE]): size = self.c.volume_replicate_range_block_size(s) vol, pool = self._volume_create(s.id) br = lsm.BlockRange(0, size, size) if supported( cap, [Cap.VOLUME_COPY_RANGE_CLONE]): self.c.volume_replicate_range( lsm.Volume.REPLICATE_CLONE, vol, vol, [br]) else: self.assertRaises( lsm.LsmError, self.c.volume_replicate_range, lsm.Volume.REPLICATE_CLONE, vol, vol, [br]) br = lsm.BlockRange(size * 2, size, size) if supported( cap, [Cap.VOLUME_COPY_RANGE_COPY]): self.c.volume_replicate_range( lsm.Volume.REPLICATE_COPY, vol, vol, [br]) else: self.assertRaises( lsm.LsmError, self.c.volume_replicate_range, lsm.Volume.REPLICATE_COPY, vol, vol, [br]) self._volume_delete(vol) def test_fs_creation_deletion(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.FS_CREATE]): fs, pool = self._fs_create(s.id) if fs is not None: if supported(cap, [Cap.FS_DELETE]): self._fs_delete(fs) def test_fs_resize(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.FS_CREATE]): fs, pool = self._fs_create(s.id) if fs is not None: if supported(cap, [Cap.FS_RESIZE]): fs_size = fs.total_space + mb_in_bytes(16) fs_resized = self.c.fs_resize(fs, fs_size)[1] self.assertTrue(fs_resized.total_space) if supported(cap, [Cap.FS_DELETE]): self._fs_delete(fs) def test_fs_clone(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.FS_CREATE, Cap.FS_CLONE]): fs, pool = self._fs_create(s.id) if fs is not None: fs_clone = self.c.fs_clone(fs, rs('fs_c'))[1] if supported(cap, [Cap.FS_DELETE]): self._fs_delete(fs_clone) self._fs_delete(fs) def test_fs_snapshot(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.FS_CREATE, Cap.FS_SNAPSHOT_CREATE]): fs, pool = self._fs_create(s.id) if fs is not None: ss = self.c.fs_snapshot_create(fs, rs('ss'))[1] self.assertTrue(self._fs_snapshot_exists(fs, ss.id)) if supported(cap, [Cap.FS_SNAPSHOT_RESTORE]): self.c.fs_snapshot_restore(fs, ss, None, None, True) # Delete snapshot if supported(cap, [Cap.FS_SNAPSHOT_DELETE]): self._fs_snapshot_delete(fs, ss) def test_target_ports(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.TARGET_PORTS]): ports = self.c.target_ports() for p in ports: self.assertTrue(p.id is not None) self.assertTrue(p.port_type is not None) self.assertTrue(p.service_address is not None) self.assertTrue(p.network_address is not None) self.assertTrue(p.physical_address is not None) self.assertTrue(p.physical_name is not None) self.assertTrue(p.system_id is not None) def _masking_state(self, cap, ag, vol, masked): if supported(cap, [Cap. VOLUMES_ACCESSIBLE_BY_ACCESS_GROUP]): vol_masked = \ self.c.volumes_accessible_by_access_group(ag) match = [x for x in vol_masked if x.id == vol.id] if masked: self.assertTrue(len(match) == 1, "len = %d" % len(match)) else: self.assertTrue(len(match) == 0, "len = %d" % len(match)) if supported(cap, [Cap. ACCESS_GROUPS_GRANTED_TO_VOLUME]): ag_masked = \ self.c.access_groups_granted_to_volume(vol) match = [x for x in ag_masked if x.id == ag.id] if masked: self.assertTrue(len(match) == 1, "len = %d" % len(match)) else: self.assertTrue(len(match) == 0, "len = %d" % len(match)) def test_mask_unmask(self): for s in self.systems: ag_created = None cap = self.c.capabilities(s) if supported(cap, [Cap.ACCESS_GROUPS, Cap.VOLUME_MASK, Cap.VOLUME_UNMASK, Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]): if supported(cap, [Cap.ACCESS_GROUP_CREATE_ISCSI_IQN]): ag_name = rs("ag") ag_iqn = r_iqn() ag_created = self.c.access_group_create( ag_name, ag_iqn, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN, s) # Make sure we have an access group to test with, many # smi-s providers don't provide functionality to create them! ag_list = self.c.access_groups('system_id', s.id) if len(ag_list): vol = self._volume_create(s.id)[0] self.assertTrue(vol is not None) chose_ag = ag_created if chose_ag is None: for ag in ag_list: if len(ag.init_ids) >= 1: chose_ag = ag break if chose_ag is None: raise Exception("No access group with 1+ member " "found, cannot do volume mask test") if vol is not None and chose_ag is not None: self.c.volume_mask(chose_ag, vol) self._masking_state(cap, chose_ag, vol, True) # Test duplicate call for NO_STATE_CHANGE error flag_dup_error_found = False try: self.c.volume_mask(chose_ag, vol) except LsmError as lsm_error: self.assertTrue( lsm_error.code == ErrorNumber.NO_STATE_CHANGE) flag_dup_error_found = True self.assertTrue(flag_dup_error_found) self.c.volume_unmask(chose_ag, vol) self._masking_state(cap, chose_ag, vol, False) # Test duplicate call for NO_STATE_CHANGE error flag_dup_error_found = False try: self.c.volume_unmask(chose_ag, vol) except LsmError as lsm_error: self.assertTrue( lsm_error.code == ErrorNumber.NO_STATE_CHANGE) flag_dup_error_found = True self.assertTrue(flag_dup_error_found) if vol: self._volume_delete(vol) if ag_created: self.assertTrue(s.id == ag_created.system_id) self.c.access_group_delete(ag_created) def _create_access_group(self, name, s, init_type): ag_created = None if init_type == lsm.AccessGroup.INIT_TYPE_ISCSI_IQN: ag_created = self.c.access_group_create( name, r_iqn(), lsm.AccessGroup.INIT_TYPE_ISCSI_IQN, s) elif init_type == lsm.AccessGroup.INIT_TYPE_WWPN: ag_created = self.c.access_group_create( name, r_fcpn(), lsm.AccessGroup.INIT_TYPE_WWPN, s) self.assertTrue(ag_created is not None) if ag_created is not None: ag_list = self.c.access_groups() match = [x for x in ag_list if x.id == ag_created.id] self.assertTrue( len(match) == 1, "Newly created access group %s not in the access group listing" % ag_created.name) self.assertTrue(s.id == ag_created.system_id) return ag_created def _delete_access_group(self, ag): self.c.access_group_delete(ag) ag_list = self.c.access_groups() match = [x for x in ag_list if x.id == ag.id] self.assertTrue(len(match) == 0, "Expected access group that was " "deleted to not show up in the " "access group list!") def _test_ag_create_dup(self, lsm_ag, lsm_system): """ Test NAME_CONFLICT and EXISTS_INITIATOR of access_group_create(). """ flag_got_expected_error = False if lsm_ag.init_type == lsm.AccessGroup.INIT_TYPE_ISCSI_IQN: new_init_id = r_iqn() else: new_init_id = r_fcpn() try: self.c.access_group_create( lsm_ag.name, new_init_id, lsm_ag.init_type, lsm_system) except LsmError as lsm_error: self.assertTrue(lsm_error.code == ErrorNumber.NAME_CONFLICT) flag_got_expected_error = True self.assertTrue(flag_got_expected_error) flag_got_expected_error = False try: self.c.access_group_create( rs('ag'), lsm_ag.init_ids[0], lsm_ag.init_type, lsm_system) except LsmError as lsm_error: self.assertTrue(lsm_error.code == ErrorNumber.EXISTS_INITIATOR) flag_got_expected_error = True self.assertTrue(flag_got_expected_error) def _test_ag_create_delete(self, cap, s): if supported(cap, [Cap.ACCESS_GROUPS, Cap.ACCESS_GROUP_CREATE_ISCSI_IQN]): ag = self._create_access_group( rs('ag'), s, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN) if ag is not None and \ supported(cap, [Cap.ACCESS_GROUP_DELETE]): self._test_ag_create_dup(ag, s) self._delete_access_group(ag) if supported(cap, [Cap.ACCESS_GROUPS, Cap.ACCESS_GROUP_CREATE_WWPN]): ag = self._create_access_group( rs('ag'), s, lsm.AccessGroup.INIT_TYPE_WWPN) if ag is not None and \ supported(cap, [Cap.ACCESS_GROUP_DELETE]): self._test_ag_create_dup(ag, s) self._delete_access_group(ag) def test_iscsi_chap(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.ACCESS_GROUPS, Cap.ACCESS_GROUP_CREATE_ISCSI_IQN, Cap.VOLUME_ISCSI_CHAP_AUTHENTICATION]): ag = self._create_access_group( rs('ag'), s, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN) self.assertTrue(ag is not None) if ag: self.c.iscsi_chap_auth(ag.init_ids[0], 'foo', rs(None, 12), None, None) if supported(cap, [Cap.ACCESS_GROUP_DELETE]): self._test_ag_create_dup(ag, s) self._delete_access_group(ag) def test_access_group_create_delete(self): for s in self.systems: cap = self.c.capabilities(s) self._test_ag_create_delete(cap, s) def test_access_group_list(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.ACCESS_GROUPS]): ag_list = self.c.access_groups('system_id', s.id) if len(ag_list) == 0: self._test_ag_create_delete(cap, s) else: self.assertTrue(len(ag_list) > 0, "Need at least 1 access group for testing " "and no support exists for creation of " "access groups for this system") def _ag_init_add(self, ag): if ag.init_type == lsm.AccessGroup.INIT_TYPE_ISCSI_IQN: t_id = r_iqn() t = lsm.AccessGroup.INIT_TYPE_ISCSI_IQN else: # We will try FC PN t_id = r_fcpn() t = lsm.AccessGroup.INIT_TYPE_WWPN ag_add = self.c.access_group_initiator_add(ag, t_id, t) self.assertTrue(ag_add.system_id == ag.system_id) ag_after = self.c.access_groups('id', ag.id)[0] match = [x for x in ag_after.init_ids if x == t_id] self.assertTrue(len(match) == 1) return t_id def _ag_init_delete(self, ag, init_id, init_type): self.c.access_group_initiator_delete(ag, init_id, init_type) ag_after = self.c.access_groups('id', ag.id)[0] match = [x for x in ag_after.init_ids if x == init_id] self.assertTrue(len(match) == 0) def test_access_group_initiator_add_delete(self): usable_ag_types = [lsm.AccessGroup.INIT_TYPE_WWPN, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN] for s in self.systems: ag_to_delete = None cap = self.c.capabilities(s) if supported(cap, [Cap.ACCESS_GROUPS]): ag_list = self.c.access_groups('system_id', s.id) if supported(cap, [Cap.ACCESS_GROUP_CREATE_WWPN])\ or supported(cap, [Cap.ACCESS_GROUP_CREATE_ISCSI_IQN]): if supported( cap, [Cap.ACCESS_GROUP_CREATE_ISCSI_IQN, Cap.ACCESS_GROUP_DELETE]): ag_to_delete = self._create_access_group( rs('ag'), s, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN) self.c.access_groups('system_id', s.id) if supported(cap, [Cap. ACCESS_GROUP_INITIATOR_ADD_ISCSI_IQN]): init_id = self._ag_init_add(ag_to_delete) if supported( cap, [Cap.ACCESS_GROUP_INITIATOR_DELETE]): self._ag_init_delete( ag_to_delete, init_id, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN) if supported( cap, [Cap.ACCESS_GROUP_CREATE_WWPN, Cap.ACCESS_GROUP_DELETE]): ag_to_delete = self._create_access_group( rs('ag'), s, lsm.AccessGroup.INIT_TYPE_WWPN) self.c.access_groups('system_id', s.id) if ag_to_delete is not None: self._delete_access_group(ag_to_delete) else: if len(ag_list): # Try and find an initiator group that has a usable # access group type instead of unknown or other... ag = ag_list[0] for a_tmp in ag_list: if a_tmp.init_type in usable_ag_types: ag = a_tmp break if supported(cap, [Cap. ACCESS_GROUP_INITIATOR_ADD_WWPN]): init_id = self._ag_init_add(ag) if supported( cap, [Cap.ACCESS_GROUP_INITIATOR_DELETE]): self._ag_init_delete( ag, init_id, lsm.AccessGroup.INIT_TYPE_WWPN) if supported( cap, [Cap.ACCESS_GROUP_INITIATOR_ADD_ISCSI_IQN]): init_id = self._ag_init_add(ag) if supported(cap, [Cap.ACCESS_GROUP_INITIATOR_DELETE]): self._ag_init_delete( ag, init_id, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN) def test_duplicate_volume_name(self): if self.pool_by_sys_id: for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_CREATE]): vol, pool = self._volume_create(s.id) self.assertTrue(vol is not None) # Try to create another with same name try: self.c.volume_create( pool, vol.name, vol.size_bytes, lsm.Volume.PROVISION_DEFAULT)[1] except LsmError as le: self.assertTrue(le.code == ErrorNumber.NAME_CONFLICT) if vol is not None and \ supported(cap, [Cap.VOLUME_DELETE]): self._volume_delete(vol) def test_duplicate_access_group_name(self): for s in self.systems: ag_name = rs('ag_dupe') cap = self.c.capabilities(s) if supported(cap, [Cap.ACCESS_GROUPS, Cap.ACCESS_GROUP_DELETE]): self.c.access_groups('system_id', s.id) if supported( cap, [Cap.ACCESS_GROUP_CREATE_ISCSI_IQN]): ag_type = lsm.AccessGroup.INIT_TYPE_ISCSI_IQN elif supported(cap, [Cap.ACCESS_GROUP_CREATE_WWPN]): ag_type = lsm.AccessGroup.INIT_TYPE_WWPN else: return ag_created = self._create_access_group( ag_name, s, ag_type) if ag_created is not None: # Try to create a duplicate got_exception = False try: self._create_access_group( ag_name, s, ag_type) except LsmError as le: got_exception = True self.assertTrue(le.code == ErrorNumber.NAME_CONFLICT) self.assertTrue(got_exception) self._delete_access_group(ag_created) def test_ag_vol_delete_with_vol_masked(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.ACCESS_GROUPS, Cap.ACCESS_GROUP_CREATE_ISCSI_IQN, Cap.ACCESS_GROUP_DELETE, Cap.VOLUME_UNMASK, Cap.VOLUME_CREATE, Cap.VOLUME_DELETE, Cap.VOLUME_MASK, Cap.VOLUME_UNMASK]): ag_name = rs("ag") ag_iqn = r_iqn() ag = self.c.access_group_create( ag_name, ag_iqn, lsm.AccessGroup.INIT_TYPE_ISCSI_IQN, s) pool = self._get_pool_by_usage(s.id, lsm.Pool.ELEMENT_TYPE_VOLUME) if ag and pool: vol_size = self._min_size() vol = self.c.volume_create( pool, rs('v'), vol_size, lsm.Volume.PROVISION_DEFAULT)[1] if vol: got_exception = False self.c.volume_mask(ag, vol) # Try to delete the access group try: self.c.access_group_delete(ag) except LsmError as le: if le.code == lsm.ErrorNumber.IS_MASKED: got_exception = True self.assertTrue(le.code == lsm.ErrorNumber.IS_MASKED) self.assertTrue(got_exception) # Try to delete the volume got_exception = False try: self.c.volume_delete(vol) except LsmError as le: if le.code == lsm.ErrorNumber.IS_MASKED: got_exception = True self.assertTrue(le.code == lsm.ErrorNumber.IS_MASKED) self.assertTrue(got_exception) # Clean up self.c.volume_unmask(ag, vol) self.c.volume_delete(vol) self.c.access_group_delete(ag) def test_volume_delete_with_replication(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_REPLICATE_CLONE, Cap.VOLUME_CHILD_DEPENDENCY, Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]): pool = self._get_pool_by_usage(s.id, lsm.Pool.ELEMENT_TYPE_VOLUME) if pool: vol_size = self._min_size() vol = self.c.volume_create( pool, rs('v'), vol_size, lsm.Volume.PROVISION_DEFAULT)[1] clone_vol = self.c.volume_replicate( pool, lsm.Volume.REPLICATE_CLONE, vol, rs('test_vd_with_repo_'))[1] if self.c.volume_child_dependency(vol): got_exception = False try: self.c.volume_delete(vol) except LsmError as le: if le.code == lsm.ErrorNumber.HAS_CHILD_DEPENDENCY: got_exception = True self.assertTrue( le.code == lsm.ErrorNumber.HAS_CHILD_DEPENDENCY) self.assertTrue(got_exception) # Clean up self.c.volume_delete(clone_vol) self.c.volume_delete(vol) else: self._skip_current_test( "Skip test: current system does not support " "volume_create() or volume_replicate() with CLONE " "or volume_delete()") def test_fs_delete_with_clone(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.FS_CREATE, Cap.FS_CLONE, Cap.FS_DELETE, Cap.FS_CHILD_DEPENDENCY]): fs, pool = self._fs_create(s.id) self.assertTrue(fs is not None) fs_child = self.c.fs_clone(fs, rs('fs_c_'))[1] if self.c.fs_child_dependency(fs, None): got_exception = False try: self.c.fs_delete(fs) except LsmError as le: if le.code == lsm.ErrorNumber.HAS_CHILD_DEPENDENCY: got_exception = True self.assertTrue(le.code == lsm.ErrorNumber.HAS_CHILD_DEPENDENCY) self.assertTrue(got_exception) # Clean up self.c.fs_delete(fs_child) self.c.fs_delete(fs) else: self._skip_current_test( "Skip test: current system does not support " "fs_create() or fs_clone() or fs_delete()") def test_volume_vpd83_verify(self): failing = [None, "012345678901234567890123456789AB", "012345678901234567890123456789ax", "012345678901234567890123456789ag", "1234567890123456789012345abcdef", "01234567890123456789012345abcdefa", "55cd2e404beec32e0", "55cd2e404beec32ex", "35cd2e404beec32A"] for f in failing: self.assertFalse(lsm.Volume.vpd83_verify(f)) self.assertTrue( lsm.Volume.vpd83_verify("61234567890123456789012345abcdef")) self.assertTrue( lsm.Volume.vpd83_verify("55cd2e404beec32e")) self.assertTrue( lsm.Volume.vpd83_verify("35cd2e404beec32e")) self.assertTrue( lsm.Volume.vpd83_verify("25cd2e404beec32e")) def test_available_plugins(self): plugins = self.c.available_plugins(':') self.assertTrue(plugins is not None) self.assertTrue(len(plugins) > 0) self.assertTrue(':' in plugins[0]) def test_volume_enable_disable(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE, Cap.VOLUME_ENABLE, Cap.VOLUME_DISABLE]): vol, pool = self._volume_create(s.id) self.c.volume_disable(vol) self.c.volume_enable(vol) self._volume_delete(vol) def test_daemon_not_running(self): current = None got_exception = False # Force a ErrorNumber.DAEMON_NOT_RUNNING if 'LSM_UDS_PATH' in os.environ: current = os.environ['LSM_UDS_PATH'] tmp_dir = tempfile.mkdtemp() os.environ['LSM_UDS_PATH'] = tmp_dir try: lsm.Client(TestPlugin.URI, TestPlugin.PASSWORD) except LsmError as expected_error: got_exception = True self.assertTrue(expected_error.code == ErrorNumber.DAEMON_NOT_RUNNING, 'Actual error %d' % expected_error.code) self.assertTrue(got_exception) os.rmdir(tmp_dir) if current: os.environ['LSM_UDS_PATH'] = current else: del os.environ['LSM_UDS_PATH'] def test_non_existent_plugin(self): got_exception = False try: uri = "%s://user@host" % rs(None, 6) lsm.Client(uri, TestPlugin.PASSWORD) except LsmError as expected_error: got_exception = True self.assertTrue(expected_error.code == ErrorNumber.PLUGIN_NOT_EXIST, 'Actual error %d' % expected_error.code) self.assertTrue(got_exception) def test_volume_depends(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE, Cap.VOLUME_CHILD_DEPENDENCY, Cap.VOLUME_CHILD_DEPENDENCY_RM]) and \ (supported(cap, [Cap.VOLUME_REPLICATE_COPY]) or supported(cap, [Cap.VOLUME_REPLICATE_CLONE])): vol, pol = self._volume_create(s.id) if supported(cap, [Cap.VOLUME_REPLICATE_CLONE]): vol_child = self.c.volume_replicate( None, lsm.Volume.REPLICATE_CLONE, vol, rs('v_tc_'))[1] else: vol_child = self.c.volume_replicate( None, lsm.Volume.REPLICATE_COPY, vol, rs('v_fc_'))[1] self.assertTrue(vol_child is not None) if self.c.volume_child_dependency(vol): self.c.volume_child_dependency_rm(vol) else: self.assertTrue(self.c.volume_child_dependency_rm(vol) is None) self._volume_delete(vol) if vol_child: self._volume_delete(vol_child) def test_fs_depends(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.FS_CREATE, Cap.FS_DELETE, Cap.FS_CHILD_DEPENDENCY, Cap.FS_CHILD_DEPENDENCY_RM, Cap.FS_CLONE]): fs, pol = self._fs_create(s.id) fs_child = self.c.fs_clone(fs, rs('fs_c_'))[1] self.assertTrue(fs_child is not None) if self.c.fs_child_dependency(fs, None): self.c.fs_child_dependency_rm(fs, None) else: self.assertTrue(self.c.fs_child_dependency_rm(fs, None) is None) self._fs_delete(fs) if fs_child: self._fs_delete(fs_child) def test_nfs_auth_types(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.EXPORT_AUTH]): auth_types = self.c.export_auth() self.assertTrue(auth_types is not None) def test_export_list(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.EXPORTS]): self.c.exports() # TODO verify export values def test_create_delete_exports(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.FS_CREATE, Cap.EXPORTS, Cap.EXPORT_FS, Cap.EXPORT_REMOVE]): fs, pool = self._fs_create(s.id) if supported(cap, [Cap.EXPORT_CUSTOM_PATH]): path = "/mnt/%s" % rs(None, 6) exp = self.c.export_fs(fs.id, path, [], [], ['192.168.2.1']) else: exp = self.c.export_fs(fs.id, None, [], [], ['192.168.2.1']) self.c.export_remove(exp) self._fs_delete(fs) def test_pool_member_info(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.POOL_MEMBER_INFO]): for pool in self.c.pools(): (raid_type, member_type, member_ids) = \ self.c.pool_member_info(pool) self.assertTrue(type(raid_type) is int) self.assertTrue(type(member_type) is int) self.assertTrue(type(member_ids) is list) def _skip_current_test(self, messsage): """ If skipTest is supported, skip this test with provided message. Sliently return if not supported. """ if hasattr(unittest.TestCase, 'skipTest') is True: self.skipTest(messsage) return def test_volume_raid_create(self): for s in self.systems: cap = self.c.capabilities(s) # TODO(Gris Ge): Add test code for other RAID type and strip size if supported(cap, [Cap.VOLUME_RAID_CREATE]): supported_raid_types, supported_strip_sizes = \ self.c.volume_raid_create_cap_get(s) if lsm.Volume.RAID_TYPE_RAID1 not in supported_raid_types: self._skip_current_test( "Skip test: current system does not support " "creating RAID1 volume") # Find two free disks free_disks = [] for disk in self.c.disks(): if len(free_disks) == 2: break if disk.status & lsm.Disk.STATUS_FREE: free_disks.append(disk) if len(free_disks) != 2: self._skip_current_test( "Skip test: Failed to find two free disks for RAID 1") return new_vol = self.c.volume_raid_create( rs('v'), lsm.Volume.RAID_TYPE_RAID1, free_disks, lsm.Volume.VCR_STRIP_SIZE_DEFAULT) self.assertTrue(new_vol is not None) # TODO(Gris Ge): Use volume_raid_info() and pool_member_info() # to verify size, raid_type, member type, member ids. if supported(cap, [Cap.VOLUME_DELETE]): self._volume_delete(new_vol) updated_disks = [] # Check whether disks are been set back to free mode. for disk in self.c.disks(): if disk.id in list(d.id for d in free_disks): updated_disks.append(disk) self.assertTrue( disk.status & lsm.Disk.STATUS_FREE, "Disk %s is not restored to free state " "after volume_delete() on a raid volume" % disk.id) self.assertTrue(len(updated_disks) == 2, "Disk missing after volume_delete() on " "a raid volume") else: self._skip_current_test( "Skip test: not support of VOLUME_RAID_CREATE") def test_volume_raid_info(self): flag_supported = False pool_id_to_lsm_vols = dict() lsm_vol = None s = None cap = None for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_RAID_INFO]): flag_supported = True break if flag_supported is False: self._skip_current_test( "Skip test: current system does not support " "query volume raid info(lsm.Capabilities.VOLUME_RAID_INFO)") # Try to find a volume per pool. lsm_vols = self.c.volumes() for lsm_vol in lsm_vols: pool_id = lsm_vol.pool_id if len(pool_id) != 0 and \ pool_id_to_lsm_vols.get(pool_id) is None: pool_id_to_lsm_vols[pool_id] = lsm_vol lsm_vols = list(pool_id_to_lsm_vols.values()) if supported(cap, [Cap.VOLUME_CREATE]): created_lsm_vol = self._volume_create(s.id)[0] lsm_vols.append(created_lsm_vol) for lsm_vol in lsm_vols: [raid_type, strip_size, disk_count, min_io_size, opt_io_size] = \ self.c.volume_raid_info(lsm_vol) # TODO Make these checks better by checking type and range self.assertTrue(raid_type is not None) self.assertTrue(strip_size is not None) self.assertTrue(disk_count is not None) self.assertTrue(min_io_size is not None) self.assertTrue(opt_io_size is not None) # Test NOT_FOUND_VOLUME error. if supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]): self._volume_delete(created_lsm_vol) try: self.c.volume_raid_info(lsm_vol) except lsm.LsmError as le: if le.code != ErrorNumber.NOT_FOUND_VOLUME: raise def test_volume_ident_led_on(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_LED]): for volume in self.c.volumes(): volume_led_status = self.c.volume_ident_led_on(volume) self.assertTrue(volume_led_status is None, "Volume ident_led_on" "failed") def test_volume_ident_led_off(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.VOLUME_LED]): for volume in self.c.volumes(): try: volume_led_status = self.c.volume_ident_led_off(volume) self.assertTrue(volume_led_status is None, "Volume ident_led_off failed") except lsm.LsmError as le: # As we don't know if the LED in on/off we need to # accept that either outcome is possible if le.code != ErrorNumber.NO_STATE_CHANGE: raise def test_invalid_uri(self): # Make sure we are getting an exception self.assertRaises(lsm.LsmError, lsm.Client, "ontap//root@na-sim", "some_password") # Make sure exception has the correct error code try: lsm.Client("ontap//root@na-sim", "some_password") except lsm.LsmError as le: self.assertTrue(le.code == lsm.ErrorNumber.INVALID_ARGUMENT) def test_battery_list(self): for s in self.systems: cap = self.c.capabilities(s) if supported(cap, [Cap.BATTERIES]): self.c.batteries() def test_volume_cache_info(self): flag_tested = False flag_created = False for s in self.systems: cap = self.c.capabilities(s) if not supported(cap, [Cap.VOLUME_CACHE_INFO]): continue lsm_vols = self.c.volumes(search_key='system_id', search_value=s.id) if len(lsm_vols) == 0: if not supported(cap, [Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]): continue lsm_vol = self._volume_create(s.id)[0] flag_created = True else: lsm_vol = lsm_vols[0] cache_info = self.c.volume_cache_info(lsm_vol) self.assertTrue(len(cache_info) == 5) if flag_created: self._volume_delete(lsm_vol) flag_tested = True if flag_tested is False: self._skip_current_test( "Skip test: no storage system support volume cache info query") def test_volume_cache_pdc_update(self): flag_tested = False for s in self.systems: cap = self.c.capabilities(s) if not supported(cap, [Cap.VOLUME_CACHE_INFO, Cap.VOLUME_CREATE, Cap.VOLUME_DELETE, Cap.VOLUME_PHYSICAL_DISK_CACHE_UPDATE]): continue lsm_vol = self._volume_create(s.id)[0] self.c.volume_physical_disk_cache_update( lsm_vol, lsm.Volume.PHYSICAL_DISK_CACHE_ENABLED) cache_info = self.c.volume_cache_info(lsm_vol) self.assertTrue(cache_info[4] == lsm.Volume.PHYSICAL_DISK_CACHE_ENABLED) self.c.volume_physical_disk_cache_update( lsm_vol, lsm.Volume.PHYSICAL_DISK_CACHE_DISABLED) cache_info = self.c.volume_cache_info(lsm_vol) self.assertTrue(cache_info[4] == lsm.Volume.PHYSICAL_DISK_CACHE_DISABLED) self._volume_delete(lsm_vol) flag_tested = True if flag_tested is False: self._skip_current_test( "Skip test: current system does not support required " "capabilities for testing volume_physical_disk_cache_update()") def test_volume_cache_wcp_update(self): flag_tested = False for s in self.systems: cap = self.c.capabilities(s) if not supported(cap, [Cap.VOLUME_CACHE_INFO, Cap.VOLUME_CREATE, Cap.VOLUME_DELETE]): continue lsm_vol = self._volume_create(s.id)[0] if supported(cap, [Cap.VOLUME_WRITE_CACHE_POLICY_UPDATE_WRITE_BACK]): self.c.volume_write_cache_policy_update( lsm_vol, lsm.Volume.WRITE_CACHE_POLICY_WRITE_BACK) cache_info = self.c.volume_cache_info(lsm_vol) self.assertTrue(cache_info[0] == lsm.Volume.WRITE_CACHE_POLICY_WRITE_BACK) flag_tested = True if supported(cap, [Cap.VOLUME_WRITE_CACHE_POLICY_UPDATE_AUTO]): self.c.volume_write_cache_policy_update( lsm_vol, lsm.Volume.WRITE_CACHE_POLICY_AUTO) cache_info = self.c.volume_cache_info(lsm_vol) self.assertTrue(cache_info[0] == lsm.Volume.WRITE_CACHE_POLICY_AUTO) flag_tested = True if supported(cap, [Cap.VOLUME_WRITE_CACHE_POLICY_UPDATE_WRITE_THROUGH]): self.c.volume_write_cache_policy_update( lsm_vol, lsm.Volume.WRITE_CACHE_POLICY_WRITE_THROUGH) cache_info = self.c.volume_cache_info(lsm_vol) self.assertTrue(cache_info[0] == lsm.Volume.WRITE_CACHE_POLICY_WRITE_THROUGH) flag_tested = True self._volume_delete(lsm_vol) if flag_tested is False: self._skip_current_test( "Skip test: current system does not support required " "capabilities for testing volume_write_cache_policy_update()") def test_volume_cache_rcp_update(self): flag_tested = False for s in self.systems: cap = self.c.capabilities(s) if not supported(cap, [Cap.VOLUME_CACHE_INFO, Cap.VOLUME_CREATE, Cap.VOLUME_DELETE, Cap.VOLUME_READ_CACHE_POLICY_UPDATE]): continue lsm_vol = self._volume_create(s.id)[0] self.c.volume_read_cache_policy_update( lsm_vol, lsm.Volume.READ_CACHE_POLICY_ENABLED) cache_info = self.c.volume_cache_info(lsm_vol) self.assertTrue(cache_info[2] == lsm.Volume.READ_CACHE_POLICY_ENABLED) self.c.volume_read_cache_policy_update( lsm_vol, lsm.Volume.READ_CACHE_POLICY_DISABLED) cache_info = self.c.volume_cache_info(lsm_vol) self.assertTrue(cache_info[2] == lsm.Volume.READ_CACHE_POLICY_DISABLED) self._volume_delete(lsm_vol) flag_tested = True if flag_tested is False: self._skip_current_test( "Skip test: current system does not support required " "capabilities for testing volume_read_cache_policy_update()") def dump_results(): """ unittest.main exits when done so we need to register this handler to get our results out. If PyYAML is available we will output detailed results, else we will output nothing. The detailed output results of what we called, how it finished and how long it took. """ try: # noinspection PyUnresolvedReferences import yaml sys.stdout.write(yaml.dump(dict(methods_called=results, stats=stats))) except ImportError: sys.stdout.write("NOTICE: Install PyYAML for detailed test results\n") def add_our_params(): """ There are probably easier ways to extend unittest, but this seems easiest at the moment if we want to retain the default behavior and introduce a couple of parameters. """ additional = """\ Options libStorageMgmt: --password 'Array password' --uri 'Array URI' --skip 'Test case to skip. Repeatable argument' """ if sys.version_info[0] == 2: unittest.TestProgram.USAGE += additional if __name__ == "__main__": atexit.register(dump_results) add_our_params() parser = argparse.ArgumentParser(add_help=False) parser.add_argument('--password', default=None) parser.add_argument('--uri') parser.add_argument('--skip', action='append') options, other_args = parser.parse_known_args() if options.uri: TestPlugin.URI = options.uri elif os.getenv('LSM_TEST_URI'): TestPlugin.URI = os.getenv('LSM_TEST_URI') else: TestPlugin.URI = 'sim://' if options.password: TestPlugin.PASSWORD = options.password elif os.getenv('LSM_TEST_PASSWORD'): TestPlugin.PASSWORD = os.getenv('LSM_TEST_PASSWORD') if options.skip: if hasattr(unittest.TestCase, 'skipTest') is False: raise Exception( "Current python version is too old to support 'skipTest'") TestPlugin.SKIP_TEST_CASES = options.skip else: TestPlugin.SKIP_TEST_CASES = [] print_stderr("\nPrefix: %s\n" % g_prefix) unittest.main(argv=sys.argv[:1] + other_args)