]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Replication V2 for Pure Storage
authorDaniel Wilson <daniel.wilson@purestorage.com>
Mon, 17 Aug 2015 22:46:13 +0000 (15:46 -0700)
committerPatrick East <patrick.east@purestorage.com>
Thu, 28 Jan 2016 14:10:28 +0000 (14:10 +0000)
This implementation utilizes Pure Storage Protection Groups to
replicate volumes natively between Pure Storage backends.

Replication schedules can be set via config file options.
Replication targets are also set via config file options.

For example,
[puredriver-1]
volume_backend_name = puredriver-1
...
replication_device = target_device_id:<unique_array_id>,
                     managed_backend_name:puredriver-2,
                     san_ip:<ip_address>,
                     api_token:<authentication_token>
pure_replica_interval_default = 900
pure_replica_retention_short_term_default = 14400
pure_replica_retention_long_term_per_day_default = 3
pure_replica_retention_long_term_default = 7

replication_device describes the target array for replication. This can
be either another managed cinder backend or one not managed by cinder.

The above settings will cause a protection group to be created between
the two flash arrays. It will replicate any volumes placed in it with
settings that cause a volume to be replicated every 15 mins (900s).  All
snapshots will be kept for 4 hours (14400s).  Then 3 snapshots per day
will be kept for 7 days.

Note that replication-failover is supported from primary to any of
multiple secondary arrays, but subsequent replication-failover is
only supported back to the original primary.

DocImpact
Implements: bp pure-v2-replication
Change-Id: I69e567143b65e92f89969ff5036c4f242c17a1dd

cinder/tests/unit/test_pure.py
cinder/volume/drivers/pure.py
releasenotes/notes/pure-v2-replication-0246223caaa8a9b5.yaml [new file with mode: 0644]

index 4bcc10f003cd0a43d7c3069f78a3cc7683cf7fbd..b35d76e077e21382e305939bda470657263de4f0 100644 (file)
@@ -22,6 +22,7 @@ from oslo_utils import units
 
 from cinder import exception
 from cinder import test
+from cinder.tests.unit import fake_volume
 
 
 def fake_retry(exceptions, interval=1, retries=3, backoff_rate=2):
@@ -43,7 +44,24 @@ ISCSI_DRIVER_OBJ = DRIVER_PATH + ".PureISCSIDriver"
 FC_DRIVER_OBJ = DRIVER_PATH + ".PureFCDriver"
 ARRAY_OBJ = DRIVER_PATH + ".FlashArray"
 
-TARGET = "pure-target"
+GET_ARRAY_PRIMARY = {"version": "99.9.9",
+                     "revision": "201411230504+8a400f7",
+                     "array_name": "pure_target1",
+                     "id": "primary_array_id"}
+
+GET_ARRAY_SECONDARY = {"version": "99.9.9",
+                       "revision": "201411230504+8a400f7",
+                       "array_name": "pure_target2",
+                       "id": "secondary_array_id"}
+
+REPLICATION_TARGET_TOKEN = "12345678-abcd-1234-abcd-1234567890ab"
+REPLICATION_PROTECTION_GROUP = "cinder-group"
+REPLICATION_INTERVAL_IN_SEC = 900
+REPLICATION_RETENTION_SHORT_TERM = 14400
+REPLICATION_RETENTION_LONG_TERM = 6
+REPLICATION_RETENTION_LONG_TERM_PER_DAY = 3
+
+PRIMARY_MANAGEMENT_IP = GET_ARRAY_PRIMARY["array_name"]
 API_TOKEN = "12345678-abcd-1234-abcd-1234567890ab"
 VOLUME_BACKEND_NAME = "Pure_iSCSI"
 ISCSI_PORT_NAMES = ["ct0.eth2", "ct0.eth3", "ct1.eth2", "ct1.eth3"]
@@ -60,6 +78,7 @@ PURE_HOST = {
 }
 REST_VERSION = "1.2"
 VOLUME_ID = "abcdabcd-1234-abcd-1234-abcdeffedcba"
+VOLUME_TYPE_ID = "357aa1f1-4f9c-4f10-acec-626af66425ba"
 VOLUME = {
     "name": "volume-" + VOLUME_ID,
     "id": VOLUME_ID,
@@ -67,9 +86,12 @@ VOLUME = {
     "size": 2,
     "host": "irrelevant",
     "volume_type": None,
-    "volume_type_id": None,
+    "volume_type_id": VOLUME_TYPE_ID,
+    "replication_status": None,
     "consistencygroup_id": None,
+    "provider_location": GET_ARRAY_PRIMARY["id"]
 }
+VOLUME_PURITY_NAME = VOLUME['name'] + '-cinder'
 VOLUME_WITH_CGROUP = VOLUME.copy()
 VOLUME_WITH_CGROUP['consistencygroup_id'] = \
     "4a2f7e3a-312a-40c5-96a8-536b8a0fe074"
@@ -206,6 +228,97 @@ PURE_PGROUP = {
     "volumes": ["v1"]
 }
 
+PGROUP_ON_TARGET_NOT_ALLOWED = {
+    "name": "array1:replicated_pgroup",
+    "hgroups": None,
+    "source": "array1",
+    "hosts": None,
+    "volumes": ["array1:replicated_volume"],
+    "time_remaining": None,
+    "targets": [{"name": "array2",
+                 "allowed": False}]}
+PGROUP_ON_TARGET_ALLOWED = {
+    "name": "array1:replicated_pgroup",
+    "hgroups": None,
+    "source": "array1",
+    "hosts": None,
+    "volumes": ["array1:replicated_volume"],
+    "time_remaining": None,
+    "targets": [{"name": "array2",
+                 "allowed": True}]}
+CONNECTED_ARRAY = {
+    "id": "6b1a7ce3-da61-0d86-65a7-9772cd259fef",
+    "version": "99.9.9",
+    "connected": True,
+    "management_address": "10.42.10.229",
+    "replication_address": "192.168.10.229",
+    "type": ["replication"],
+    "array_name": "3rd-pure-generic2"}
+REPLICATED_PGSNAPS = [
+    {
+        "name": "array1:cinder-repl-pg.3",
+        "created": "2014-12-04T22:59:38Z",
+        "started": "2014-12-04T22:59:38Z",
+        "completed": "2014-12-04T22:59:39Z",
+        "source": "array1:cinder-repl-pg",
+        "logical_data_transferred": 0,
+        "progress": 1.0,
+        "data_transferred": 318
+    },
+    {
+        "name": "array1:cinder-repl-pg.2",
+        "created": "2014-12-04T21:59:38Z",
+        "started": "2014-12-04T21:59:38Z",
+        "completed": "2014-12-04T21:59:39Z",
+        "source": "array1:cinder-repl-pg",
+        "logical_data_transferred": 0,
+        "progress": 1.0,
+        "data_transferred": 318
+    },
+    {
+        "name": "array1:cinder-repl-pg.1",
+        "created": "2014-12-04T20:59:38Z",
+        "started": "2014-12-04T20:59:38Z",
+        "completed": "2014-12-04T20:59:39Z",
+        "source": "array1:cinder-repl-pg",
+        "logical_data_transferred": 0,
+        "progress": 1.0,
+        "data_transferred": 318
+    }]
+REPLICATED_VOLUME_SNAPS = [
+    {
+        "source": "array1:replicated_volume1",
+        "serial": "BBA481C01639104E0001D5F7",
+        "created": "2014-12-04T22:59:38Z",
+        "name": "array1:cinder-repl-pg.2.replicated_volume1",
+        "size": 1048576
+    },
+    {
+        "source": "array1:replicated_volume2",
+        "serial": "BBA481C01639104E0001D5F8",
+        "created": "2014-12-04T22:59:38Z",
+        "name": "array1:cinder-repl-pg.2.replicated_volume2",
+        "size": 1048576
+    },
+    {
+        "source": "array1:replicated_volume3",
+        "serial": "BBA481C01639104E0001D5F9",
+        "created": "2014-12-04T22:59:38Z",
+        "name": "array1:PureSRAEndToEndPGroup1.2.replicated_volume3",
+        "size": 1048576
+    }
+]
+NON_REPLICATED_VOL_TYPE = {"is_public": True,
+                           "extra_specs": {},
+                           "name": "volume_type_1",
+                           "id": VOLUME_TYPE_ID}
+REPLICATED_VOL_TYPE = {"is_public": True,
+                       "extra_specs":
+                       {pure.EXTRA_SPECS_REPL_ENABLED:
+                        "<is> True"},
+                       "name": "volume_type_2",
+                       "id": VOLUME_TYPE_ID}
+
 
 class FakePureStorageHTTPError(Exception):
     def __init__(self, target=None, rest_version=None, code=None,
@@ -221,13 +334,29 @@ class PureDriverTestCase(test.TestCase):
     def setUp(self):
         super(PureDriverTestCase, self).setUp()
         self.mock_config = mock.Mock()
-        self.mock_config.san_ip = TARGET
+        self.mock_config.san_ip = PRIMARY_MANAGEMENT_IP
         self.mock_config.pure_api_token = API_TOKEN
         self.mock_config.volume_backend_name = VOLUME_BACKEND_NAME
+        self.mock_config.safe_get.return_value = None
         self.array = mock.Mock()
+        self.array
+        self.array.get.return_value = GET_ARRAY_PRIMARY
+        self.array.array_name = GET_ARRAY_PRIMARY["array_name"]
+        self.array.array_id = GET_ARRAY_PRIMARY["id"]
+        self.array2 = mock.Mock()
+        self.array2.array_name = GET_ARRAY_SECONDARY["array_name"]
+        self.array2.array_id = GET_ARRAY_SECONDARY["id"]
+        self.array2.get.return_value = GET_ARRAY_SECONDARY
         self.purestorage_module = pure.purestorage
         self.purestorage_module.PureHTTPError = FakePureStorageHTTPError
 
+    def fake_get_array(*args, **kwargs):
+        if 'action' in kwargs and kwargs['action'] is 'monitor':
+            return PERF_INFO_RAW
+
+        if 'space' in kwargs and kwargs['space'] is True:
+            return SPACE_INFO
+
     def assert_error_propagates(self, mocks, func, *args, **kwargs):
         """Assert that errors from mocks propagate to func.
 
@@ -252,12 +381,119 @@ class PureBaseSharedDriverTestCase(PureDriverTestCase):
         self.driver = pure.PureBaseVolumeDriver(configuration=self.mock_config)
         self.driver._array = self.array
         self.array.get_rest_version.return_value = '1.4'
+        self.purestorage_module.FlashArray.side_effect = None
+        self.array2.get_rest_version.return_value = '1.4'
+
+    def tearDown(self):
+        super(PureBaseSharedDriverTestCase, self).tearDown()
 
 
 class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
     def setUp(self):
         super(PureBaseVolumeDriverTestCase, self).setUp()
 
+    def _setup_mocks_for_replication(self):
+        # Mock config values
+        self.mock_config.pure_replica_interval_default = (
+            REPLICATION_INTERVAL_IN_SEC)
+        self.mock_config.pure_replica_retention_short_term_default = (
+            REPLICATION_RETENTION_SHORT_TERM)
+        self.mock_config.pure_replica_retention_long_term_default = (
+            REPLICATION_RETENTION_LONG_TERM)
+        self.mock_config.pure_replica_retention_long_term_default = (
+            REPLICATION_RETENTION_LONG_TERM_PER_DAY)
+        self.mock_config.safe_get.return_value = [
+            {"target_device_id": self.driver._array.array_id,
+             "managed_backend_name": None,
+             "san_ip": "1.2.3.4",
+             "api_token": "abc123"}]
+
+    @mock.patch(BASE_DRIVER_OBJ + '._generate_replication_retention')
+    @mock.patch(BASE_DRIVER_OBJ + '._setup_replicated_pgroups')
+    def test_parse_replication_configs_single_target(
+            self,
+            mock_setup_repl_pgroups,
+            mock_generate_replication_retention):
+        retention = mock.MagicMock()
+        mock_generate_replication_retention.return_value = retention
+        mock_setup_repl_pgroups.return_value = None
+
+        # Test single array configured
+        self.mock_config.safe_get.return_value = [
+            {"target_device_id": self.driver._array.id,
+             "managed_backend_name": None,
+             "san_ip": "1.2.3.4",
+             "api_token": "abc123"}]
+        self.purestorage_module.FlashArray.return_value = self.array
+        self.driver.parse_replication_configs()
+        self.assertEqual(1, len(self.driver._replication_target_arrays))
+        self.assertEqual(self.array, self.driver._replication_target_arrays[0])
+        only_target_array = self.driver._replication_target_arrays[0]
+        self.assertEqual(self.driver._array.id,
+                         only_target_array._target_device_id)
+
+    @mock.patch(BASE_DRIVER_OBJ + '._generate_replication_retention')
+    @mock.patch(BASE_DRIVER_OBJ + '._setup_replicated_pgroups')
+    def test_parse_replication_configs_multiple_target(
+            self,
+            mock_setup_repl_pgroups,
+            mock_generate_replication_retention):
+
+        retention = mock.MagicMock()
+        mock_generate_replication_retention.return_value = retention
+        mock_setup_repl_pgroups.return_value = None
+
+        # Test multiple arrays configured
+        self.mock_config.safe_get.return_value = [
+            {"target_device_id": GET_ARRAY_PRIMARY["id"],
+             "managed_backend_name": None,
+             "san_ip": "1.2.3.4",
+             "api_token": "abc123"},
+            {"target_device_id": GET_ARRAY_SECONDARY["id"],
+             "managed_backend_name": None,
+             "san_ip": "1.2.3.5",
+             "api_token": "abc124"}]
+        self.purestorage_module.FlashArray.side_effect = \
+            [self.array, self.array2]
+        self.driver.parse_replication_configs()
+        self.assertEqual(2, len(self.driver._replication_target_arrays))
+        self.assertEqual(self.array, self.driver._replication_target_arrays[0])
+        first_target_array = self.driver._replication_target_arrays[0]
+        self.assertEqual(GET_ARRAY_PRIMARY["id"],
+                         first_target_array._target_device_id)
+        self.assertEqual(
+            self.array2, self.driver._replication_target_arrays[1])
+        second_target_array = self.driver._replication_target_arrays[1]
+        self.assertEqual(GET_ARRAY_SECONDARY["id"],
+                         second_target_array._target_device_id)
+
+    @mock.patch(BASE_DRIVER_OBJ + '._generate_replication_retention')
+    @mock.patch(BASE_DRIVER_OBJ + '._setup_replicated_pgroups')
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_do_setup_replicated(self, mock_get_volume_type,
+                                 mock_setup_repl_pgroups,
+                                 mock_generate_replication_retention):
+        retention = mock.MagicMock()
+        mock_generate_replication_retention.return_value = retention
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        self._setup_mocks_for_replication()
+        self.array2.get.return_value = GET_ARRAY_SECONDARY
+        self.array.get.return_value = GET_ARRAY_PRIMARY
+        self.purestorage_module.FlashArray.side_effect = [self.array,
+                                                          self.array2]
+        self.driver.do_setup(None)
+        self.assertEqual(self.array, self.driver._array)
+        self.assertEqual(1, len(self.driver._replication_target_arrays))
+        self.assertEqual(self.array2,
+                         self.driver._replication_target_arrays[0])
+        calls = [
+            mock.call(self.array2, [self.array], 'cinder-group',
+                      REPLICATION_INTERVAL_IN_SEC, retention),
+            mock.call(self.array, [self.array2], 'cinder-group',
+                      REPLICATION_INTERVAL_IN_SEC, retention)
+        ]
+        mock_setup_repl_pgroups.assert_has_calls(calls)
+
     def test_generate_purity_host_name(self):
         result = self.driver._generate_purity_host_name(
             "really-long-string-thats-a-bit-too-long")
@@ -271,7 +507,9 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
         self.assertEqual(49, len(result))
         self.assertTrue(pure.GENERATED_NAME.match(result))
 
-    def test_create_volume(self):
+    @mock.patch(BASE_DRIVER_OBJ + "._is_volume_replicated_type", autospec=True)
+    def test_create_volume(self, mock_is_replicated_type):
+        mock_is_replicated_type.return_value = False
         self.driver.create_volume(VOLUME)
         self.array.create_volume.assert_called_with(
             VOLUME["name"] + "-cinder", 2 * units.Gi)
@@ -280,8 +518,11 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
 
     @mock.patch(BASE_DRIVER_OBJ + "._add_volume_to_consistency_group",
                 autospec=True)
-    def test_create_volume_with_cgroup(self, mock_add_to_cgroup):
+    @mock.patch(BASE_DRIVER_OBJ + "._is_volume_replicated_type", autospec=True)
+    def test_create_volume_with_cgroup(self, mock_is_replicated_type,
+                                       mock_add_to_cgroup):
         vol_name = VOLUME_WITH_CGROUP["name"] + "-cinder"
+        mock_is_replicated_type.return_value = False
 
         self.driver.create_volume(VOLUME_WITH_CGROUP)
 
@@ -290,9 +531,11 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
                                 VOLUME_WITH_CGROUP['consistencygroup_id'],
                                 vol_name)
 
-    def test_create_volume_from_snapshot(self):
+    @mock.patch(BASE_DRIVER_OBJ + "._is_volume_replicated_type", autospec=True)
+    def test_create_volume_from_snapshot(self, mock_is_replicated_type):
         vol_name = VOLUME["name"] + "-cinder"
         snap_name = SNAPSHOT["volume_name"] + "-cinder." + SNAPSHOT["name"]
+        mock_is_replicated_type.return_value = False
 
         # Branch where extend unneeded
         self.driver.create_volume_from_snapshot(VOLUME, SNAPSHOT)
@@ -332,7 +575,9 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
                 autospec=True)
     @mock.patch(BASE_DRIVER_OBJ + "._extend_if_needed", autospec=True)
     @mock.patch(BASE_DRIVER_OBJ + "._get_pgroup_snap_name_from_snapshot")
-    def test_create_volume_from_cgsnapshot(self, mock_get_snap_name,
+    @mock.patch(BASE_DRIVER_OBJ + "._is_volume_replicated_type", autospec=True)
+    def test_create_volume_from_cgsnapshot(self, mock_is_replicated_type,
+                                           mock_get_snap_name,
                                            mock_extend_if_needed,
                                            mock_add_to_cgroup):
         vol_name = VOLUME_WITH_CGROUP["name"] + "-cinder"
@@ -340,6 +585,7 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
                     "e074-cinder.4a2f7e3a-312a-40c5-96a8-536b8a0fe075."\
                     + vol_name
         mock_get_snap_name.return_value = snap_name
+        mock_is_replicated_type.return_value = False
 
         self.driver.create_volume_from_snapshot(VOLUME_WITH_CGROUP,
                                                 SNAPSHOT_WITH_CGROUP)
@@ -355,9 +601,12 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
                                 VOLUME_WITH_CGROUP['consistencygroup_id'],
                                 vol_name)
 
-    def test_create_cloned_volume(self):
+    # Tests cloning a volume that is not replicated type
+    @mock.patch(BASE_DRIVER_OBJ + "._is_volume_replicated_type", autospec=True)
+    def test_create_cloned_volume(self, mock_is_replicated_type):
         vol_name = VOLUME["name"] + "-cinder"
         src_name = SRC_VOL["name"] + "-cinder"
+        mock_is_replicated_type.return_value = False
         # Branch where extend unneeded
         self.driver.create_cloned_volume(VOLUME, SRC_VOL)
         self.array.copy_volume.assert_called_with(src_name, vol_name)
@@ -377,10 +626,14 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
             self.driver.create_cloned_volume, VOLUME, SRC_VOL)
         SRC_VOL["size"] = 2  # reset size
 
+    # Tests cloning a volume that is part of a consistency group
     @mock.patch(BASE_DRIVER_OBJ + "._add_volume_to_consistency_group",
                 autospec=True)
-    def test_create_cloned_volume_with_cgroup(self, mock_add_to_cgroup):
+    @mock.patch(BASE_DRIVER_OBJ + "._is_volume_replicated_type", autospec=True)
+    def test_create_cloned_volume_with_cgroup(self, mock_is_replicated_type,
+                                              mock_add_to_cgroup):
         vol_name = VOLUME_WITH_CGROUP["name"] + "-cinder"
+        mock_is_replicated_type.return_value = False
 
         self.driver.create_cloned_volume(VOLUME_WITH_CGROUP, SRC_VOL)
 
@@ -509,7 +762,8 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
         # Branch where connection is missing and the host is still deleted
         self.array.reset_mock()
         self.array.disconnect_host.side_effect = \
-            self.purestorage_module.PureHTTPError(code=400, text="reason")
+            self.purestorage_module.PureHTTPError(code=400,
+                                                  text="is not connected")
         self.driver.terminate_connection(VOLUME, ISCSI_CONNECTOR)
         self.array.disconnect_host.assert_called_with(PURE_HOST_NAME, vol_name)
         self.array.list_host_connections.assert_called_with(PURE_HOST_NAME,
@@ -867,7 +1121,8 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
 
         expected_snapshot_update = [{
             'id': mock_snap.id,
-            'status': 'available'
+            'status': 'available',
+            'provider_location': self.array.array_id
         }]
         self.assertEqual(expected_snapshot_update, snapshots)
 
@@ -1208,11 +1463,593 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
                           self.driver.unmanage_snapshot,
                           SNAPSHOT)
 
-    def test_retype(self):
-        # Ensure that we return true no matter what the inputs are
-        retyped, update = self.driver.retype(None, None, None, None, None)
-        self.assertTrue(retyped)
-        self.assertIsNone(update)
+    def _test_retype_repl(self, mock_is_repl, is_vol_repl, repl_cabability):
+        mock_is_repl.return_value = is_vol_repl
+        context = mock.MagicMock()
+        volume = fake_volume.fake_volume_obj(context)
+        new_type = {
+            'extra_specs': {
+                pure.EXTRA_SPECS_REPL_ENABLED:
+                '<is> ' + str(repl_cabability)
+            }
+        }
+
+        actual = self.driver.retype(context, volume, new_type, None, None)
+        expected = (True, None)
+        self.assertEqual(expected, actual)
+        return context, volume
+
+    @mock.patch(BASE_DRIVER_OBJ + '._is_volume_replicated_type', autospec=True)
+    def test_retype_repl_to_repl(self, mock_is_replicated_type):
+        self._test_retype_repl(mock_is_replicated_type, True, True)
+
+    @mock.patch(BASE_DRIVER_OBJ + '._is_volume_replicated_type', autospec=True)
+    def test_retype_non_repl_to_non_repl(self, mock_is_replicated_type):
+        self._test_retype_repl(mock_is_replicated_type, False, False)
+
+    @mock.patch(BASE_DRIVER_OBJ + '.replication_enable')
+    @mock.patch(BASE_DRIVER_OBJ + '.replication_disable')
+    @mock.patch(BASE_DRIVER_OBJ + '._is_volume_replicated_type', autospec=True)
+    def test_retype_non_repl_to_repl(self,
+                                     mock_is_replicated_type,
+                                     mock_replication_disable,
+                                     mock_replication_enable):
+
+        context, volume = self._test_retype_repl(mock_is_replicated_type,
+                                                 False,
+                                                 True)
+        self.assertFalse(mock_replication_disable.called)
+        mock_replication_enable.assert_called_with(context, volume)
+
+    @mock.patch(BASE_DRIVER_OBJ + '.replication_enable')
+    @mock.patch(BASE_DRIVER_OBJ + '.replication_disable')
+    @mock.patch(BASE_DRIVER_OBJ + '._is_volume_replicated_type', autospec=True)
+    def test_retype_repl_to_non_repl(self,
+                                     mock_is_replicated_type,
+                                     mock_replication_disable,
+                                     mock_replication_enable):
+        context, volume = self._test_retype_repl(mock_is_replicated_type,
+                                                 True,
+                                                 False)
+        self.assertFalse(mock_replication_enable.called)
+        mock_replication_disable.assert_called_with(context, volume)
+
+    def test_get_arrays_single_array_found(self):
+        volume = deepcopy(VOLUME)
+        # Test case where only single array, it's the one we're looking for
+        volume["provider_location"] = GET_ARRAY_PRIMARY["id"]
+        rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
+        self.assertEqual(self.array, rslt_primary)
+        self.assertEqual(0, len(rslt_secondary))
+
+    def test_get_arrays_single_array_not_found(self):
+        volume = deepcopy(VOLUME)
+        # Test case where only single array, it's not the one we're looking for
+        volume["provider_location"] = "won't find me"
+        rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
+        self.assertEqual(self.array, rslt_primary)
+        self.assertEqual(0, len(rslt_secondary))
+
+    def test_get_arrays_no_primary_configured(self):
+        volume = deepcopy(VOLUME)
+        # Test case where no primary is configured,
+        del volume["provider_location"]
+        rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
+        self.assertEqual(self.array, rslt_primary)
+        self.assertEqual(0, len(rslt_secondary))
+
+    def test_get_arrays_after_failover(self):
+        volume = deepcopy(VOLUME)
+        # Test case where 2 arrays, volume is failed over to secondary
+        volume["provider_location"] = GET_ARRAY_SECONDARY["id"]
+        self.array.array_name = GET_ARRAY_PRIMARY["array_name"]
+        self.driver._replication_target_arrays = [self.array2]
+        rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
+        self.assertEqual(self.array2, rslt_primary)
+        self.assertListEqual([self.array], rslt_secondary)
+
+    def test_get_arrays_primary_configured(self):
+        volume = deepcopy(VOLUME)
+        # Test case where 2 arrays, provider_location is primary
+        volume["provider_location"] = GET_ARRAY_PRIMARY["id"]
+        self.driver._replication_target_arrays = [self.array2]
+        rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
+        self.assertEqual(self.array, rslt_primary)
+        self.assertListEqual([self.array2], rslt_secondary)
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_is_vol_replicated_no_extra_specs(self, mock_get_vol_type):
+        mock_get_vol_type.return_value = NON_REPLICATED_VOL_TYPE
+        volume = fake_volume.fake_volume_obj(mock.MagicMock())
+        actual = self.driver._is_volume_replicated_type(volume)
+        self.assertFalse(actual)
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_is_vol_replicated_has_repl_extra_specs(self, mock_get_vol_type):
+        mock_get_vol_type.return_value = REPLICATED_VOL_TYPE
+        volume = fake_volume.fake_volume_obj(mock.MagicMock())
+        actual = self.driver._is_volume_replicated_type(volume)
+        self.assertTrue(actual)
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_is_vol_replicated_has_other_extra_specs(self, mock_get_vol_type):
+        vtype_test = deepcopy(NON_REPLICATED_VOL_TYPE)
+        vtype_test["extra_specs"] = {"some_key": "some_value"}
+        mock_get_vol_type.return_value = vtype_test
+        volume = fake_volume.fake_volume_obj(mock.MagicMock())
+        actual = self.driver._is_volume_replicated_type(volume)
+        self.assertFalse(actual)
+
+    def test_does_pgroup_exist_not_exists(self):
+        self.array.get_pgroup.side_effect = (
+            self.purestorage_module.PureHTTPError(code=400,
+                                                  text="does not exist"))
+        exists = self.driver._does_pgroup_exist(self.array, "some_pgroup")
+        self.assertFalse(exists)
+
+    def test_does_pgroup_exist_exists(self):
+        self.array.get_pgroup.side_effect = None
+        self.array.get_pgroup.return_value = PGROUP_ON_TARGET_NOT_ALLOWED
+        exists = self.driver._does_pgroup_exist(self.array, "some_pgroup")
+        self.assertTrue(exists)
+
+    def test_does_pgroup_exist_error_propagates(self):
+        self.assert_error_propagates([self.array.get_pgroup],
+                                     self.driver._does_pgroup_exist,
+                                     self.array,
+                                     "some_pgroup")
+
+    @mock.patch(BASE_DRIVER_OBJ + "._does_pgroup_exist")
+    def test_wait_until_target_group_setting_propagates_ready(self,
+                                                              mock_exists):
+        mock_exists.return_value = True
+        self.driver._wait_until_target_group_setting_propagates(
+            self.array,
+            "some_pgroup"
+        )
+
+    @mock.patch(BASE_DRIVER_OBJ + "._does_pgroup_exist")
+    def test_wait_until_target_group_setting_propagates_not_ready(self,
+                                                                  mock_exists):
+        mock_exists.return_value = False
+        self.assertRaises(
+            exception.PureDriverException,
+            self.driver._wait_until_target_group_setting_propagates,
+            self.array,
+            "some_pgroup"
+        )
+
+    def test_wait_until_source_array_allowed_ready(self):
+        self.array.get_pgroup.return_value = PGROUP_ON_TARGET_ALLOWED
+        self.driver._wait_until_source_array_allowed(
+            self.array,
+            "some_pgroup",)
+
+    def test_wait_until_source_array_allowed_not_ready(self):
+        self.array.get_pgroup.return_value = PGROUP_ON_TARGET_NOT_ALLOWED
+        self.assertRaises(
+            exception.PureDriverException,
+            self.driver._wait_until_source_array_allowed,
+            self.array,
+            "some_pgroup",
+        )
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_create_volume_replicated(self, mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        self._setup_mocks_for_replication()
+        self.driver._array = self.array
+        self.driver._array.array_name = GET_ARRAY_PRIMARY["array_name"]
+        self.driver._array.array_id = GET_ARRAY_PRIMARY["id"]
+        self.driver._replication_target_arrays = [mock.Mock()]
+        self.driver._replication_target_arrays[0].array_name = (
+            GET_ARRAY_SECONDARY["array_name"])
+        model_update = self.driver.create_volume(VOLUME)
+        self.assertEqual(
+            "enabled", model_update["replication_status"],
+            "create_volume should return valid replication_status")
+        self.assertEqual(GET_ARRAY_PRIMARY["id"],
+                         model_update["provider_location"])
+        self.array.create_volume.assert_called_with(
+            VOLUME["name"] + "-cinder", 2 * units.Gi)
+        self.array.set_pgroup.assert_called_with(
+            REPLICATION_PROTECTION_GROUP,
+            addvollist=[VOLUME["name"] + "-cinder"])
+        self.assert_error_propagates([self.array.create_volume],
+                                     self.driver.create_volume, VOLUME)
+
+    @mock.patch(BASE_DRIVER_OBJ + "._get_flasharray")
+    def test_get_latest_replicated_vol_snap(self, mock_get_flasharray):
+        mock_get_flasharray.return_value = self.array
+        replicated_vol_snaps_return_0 = deepcopy(REPLICATED_VOLUME_SNAPS)
+        for snap in replicated_vol_snaps_return_0:
+            snap["source"] = "not_the_volume_we_want"
+        replicated_vol_snaps_return_1 = deepcopy(REPLICATED_VOLUME_SNAPS)
+        self.array.get_pgroup.return_value = REPLICATED_PGSNAPS
+        self.array.get_volume = mock.Mock()
+        self.array.get_volume.side_effect = [
+            replicated_vol_snaps_return_0,
+            replicated_vol_snaps_return_1]
+        source_array_name = "array1"
+        replicated_pg_name = "cinder-repl-pg"
+        replicated_vol_name = "replicated_volume2"
+        result = self.driver._get_latest_replicated_vol_snap(
+            self.array,
+            source_array_name,
+            replicated_pg_name,
+            replicated_vol_name)
+        expected = {
+            "source": "array1:replicated_volume2",
+            "serial": "BBA481C01639104E0001D5F8",
+            "created": "2014-12-04T22:59:38Z",
+            "name": "array1:cinder-repl-pg.2.replicated_volume2",
+            "size": 1048576
+        }
+        self.assertEqual(expected, result)
+        # Test when no replicated PG snapshots available
+        self.array.get_pgroup.return_value = []
+        result = self.driver._get_latest_replicated_vol_snap(
+            self.array,
+            source_array_name,
+            replicated_pg_name,
+            replicated_vol_name)
+        self.assertIsNone(result)
+        # Test when no replicated PG snapshot contains the volume we want
+        self.array.get_pgroup.return_value = REPLICATED_PGSNAPS
+        self.array.get_volume.side_effect = None
+        self.array.get_volume.return_value = REPLICATED_VOLUME_SNAPS
+        result = self.driver._get_latest_replicated_vol_snap(
+            self.array,
+            source_array_name,
+            replicated_pg_name,
+            "missing_volume")
+        self.assertIsNone(result)
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_create_cloned_volume_failed_over(self, mock_get_volume_type):
+        # Tests cloning a volume that is failed over to secondary
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        vol_name = VOLUME["name"] + "-cinder"
+        src_name = SRC_VOL["name"] + "-cinder"
+        volume = deepcopy(VOLUME)
+        del volume["provider_location"]
+        src_vol = deepcopy(SRC_VOL)
+        src_vol["provider_location"] = GET_ARRAY_SECONDARY["id"]
+        src_vol["replication_status"] = "enabled"
+        self.driver._array.array_name = GET_ARRAY_PRIMARY["array_name"]
+        self.driver._array.array_id = GET_ARRAY_PRIMARY["id"]
+        self.driver._replication_target_arrays = [self.array2]
+        rslt = self.driver.create_cloned_volume(volume, src_vol)
+        self.assertFalse(self.array.copy_volume.called)
+        self.array2.copy_volume.assert_called_with(src_name, vol_name)
+        self.assertEqual("enabled", rslt["replication_status"])
+        self.assertEqual(GET_ARRAY_SECONDARY["id"], rslt["provider_location"])
+        self.assertFalse(self.array.extend_volume.called)
+        self.assertFalse(self.array2.extend_volume.called)
+        self.assert_error_propagates(
+            [self.array2.copy_volume],
+            self.driver.create_cloned_volume, volume, src_vol)
+        self.assertTrue(self.array2.set_pgroup.called)
+        self.assertFalse(self.array.set_pgroup.called)
+
+    @mock.patch(BASE_DRIVER_OBJ + "._add_and_replicate_if_needed")
+    @mock.patch(BASE_DRIVER_OBJ + "._get_arrays")
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_replication_enable(self, mock_get_volume_type,
+                                mock_get_arrays,
+                                mock_add_vol_to_pg):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        mock_get_arrays.return_value = (self.array, self.array2)
+        volume = deepcopy(VOLUME)
+        # Test volume is added to replication PG
+        rslt = self.driver.replication_enable(None, volume)
+        mock_add_vol_to_pg.assert_called_with(self.array, volume)
+        # Verify model_update shows replication is re-enabled
+        self.assertEqual("enabled", rslt["replication_status"])
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    @mock.patch(BASE_DRIVER_OBJ + "._get_latest_replicated_vol_snap",
+                autospec=True)
+    def test_failover_src_volume_has_no_repl_snapshot(
+            self,
+            mock_get_latest_replicated_vol_snap,
+            mock_get_volume_type):
+        # Test case when target volume doesn't have a replicated snapshot. this
+        # can happen if replication is still propagating the first snapshot
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        src_vol = deepcopy(VOLUME)
+        target_array = "dummy_target_device_id"
+        src_vol["replication_status"] = "enabled"
+        src_vol["volume_type_id"] = REPLICATED_VOL_TYPE["id"]
+        self.driver._replication_target_array = mock.Mock()
+        mock_get_latest_replicated_vol_snap.return_value = None
+        context = mock.MagicMock()
+        self.assertRaises(exception.PureDriverException,
+                          self.driver.replication_failover,
+                          context,
+                          src_vol,
+                          target_array)
+
+    @mock.patch(BASE_DRIVER_OBJ + "._get_latest_replicated_vol_snap",
+                autospec=True)
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_failover(self, mock_get_volume_type,
+                      mock_get_latest_replicated_vol_snap):
+        # Test case when replication is set up correctly, expect call
+        # to copy_volume
+        src_vol = deepcopy(VOLUME)
+        src_vol["replication_status"] = "enabled"
+        src_vol["volume_type_id"] = REPLICATED_VOL_TYPE["id"]
+        self._setup_mocks_for_replication()
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        self.array2._target_device_id = self.array2.array_id
+        self.driver._replication_target_arrays = [self.array2]
+        # Assume array_name is also the san_ip
+        self.array2._target = GET_ARRAY_SECONDARY["array_name"]
+        self.driver._array._target = GET_ARRAY_PRIMARY["array_name"]
+        mock_get_latest_replicated_vol_snap.return_value =\
+            REPLICATED_VOLUME_SNAPS[0]
+        target_array = self.driver._replication_target_arrays[0]
+        target_array.copy_volume = mock.Mock()
+        context = mock.MagicMock()
+        self.driver.replication_failover(context, src_vol,
+                                         self.array2._target_device_id)
+        target_array.copy_volume.assert_called_with(
+            REPLICATED_VOLUME_SNAPS[0]["name"],
+            src_vol["name"] + "-cinder",
+            overwrite=True
+        )
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_failover_no_replicated_array_configured(self,
+                                                     mock_get_volume_type):
+        # verify exception if no replication target is configured
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        self.driver._replication_target_arrays = None
+        context = mock.Mock()
+        src_vol = VOLUME
+        target_array = "dummy_target_device_id"
+        self.assertRaises(exception.PureDriverException,
+                          self.driver.replication_failover,
+                          context,
+                          src_vol,
+                          target_array)
+
+    @mock.patch(BASE_DRIVER_OBJ + "._get_latest_replicated_vol_snap",
+                autospec=True)
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_failover_invalid_replicated_array_specified(
+            self,
+            mock_get_volume_type,
+            mock_get_latest_replicated_vol_snap):
+        src_vol = deepcopy(VOLUME)
+        src_vol["replication_status"] = "enabled"
+        src_vol["volume_type_id"] = REPLICATED_VOL_TYPE["id"]
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        self.array2._target_device_id = self.array2.array_id
+        self.driver._replication_target_arrays = [self.array2]
+        # Assume array_name is also the san_ip
+        self.array2._target = GET_ARRAY_SECONDARY["array_name"]
+        self.driver._array._target = GET_ARRAY_PRIMARY["array_name"]
+        # we should not attempt the operation to find snapshot on secondary
+        assert not mock_get_latest_replicated_vol_snap.called
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    @mock.patch(BASE_DRIVER_OBJ + '._get_current_array')
+    def test_replication_disable(self, mock_get_array, mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        mock_get_array.return_value = self.array
+        context = mock.MagicMock()
+        volume = VOLUME
+
+        model_update = self.driver.replication_disable(context, volume)
+
+        self.assertDictMatch({'replication_status': 'disabled'}, model_update)
+        self.array.set_pgroup.assert_called_with(
+            self.driver._replication_pg_name,
+            remvollist=['volume-' + volume['id'] + '-cinder']
+        )
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    @mock.patch(BASE_DRIVER_OBJ + '._get_current_array')
+    def test_replication_disable_error_propagates(self,
+                                                  mock_get_array,
+                                                  mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        mock_get_array.return_value = self.array
+        context = mock.MagicMock()
+        volume = VOLUME
+        self.assert_error_propagates(
+            [mock_get_array, self.array.set_pgroup],
+            self.driver.replication_disable,
+            context, volume
+        )
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    @mock.patch(BASE_DRIVER_OBJ + '._get_current_array')
+    def test_replication_disable_already_disabled(self,
+                                                  mock_get_array,
+                                                  mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        mock_get_array.return_value = self.array
+        context = mock.MagicMock()
+        volume = VOLUME
+
+        self.array.set_pgroup.side_effect = FakePureStorageHTTPError(
+            code=400, text='could not be found')
+
+        model_update = self.driver.replication_disable(context, volume)
+
+        self.assertDictMatch({'replication_status': 'disabled'}, model_update)
+        self.array.set_pgroup.assert_called_with(
+            self.driver._replication_pg_name,
+            remvollist=['volume-' + volume["id"] + '-cinder']
+        )
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_list_replication_targets(self, mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        mock_backend_1 = mock.MagicMock()
+        mock_backend_1.array_name = "cinder-backend-1"
+        mock_backend_1._target_device_id = "cinder-backend-1-id"
+        mock_backend_2 = mock.MagicMock()
+        mock_backend_2.array_name = "cinder-backend-2"
+        mock_backend_2._target_device_id = "cinder-backend-2-id"
+        self.driver._replication_target_arrays = [
+            mock_backend_1,
+            mock_backend_2
+        ]
+        context = mock.MagicMock()
+        vref = VOLUME
+        self.array.get_pgroup.return_value = {
+            "hgroups": None,
+            "hosts": None,
+            "name": "pg2",
+            "source": "pure01",
+            "targets": [
+                {
+                    "name": mock_backend_1.array_name,
+                    "allowed": True
+                },
+                {
+                    "name": mock_backend_2.array_name,
+                    "allowed": False
+                },
+            ],
+            "time_remaining": 86395,
+            "volumes": [
+                "volume-" + vref['id'] + "-cinder",
+                "volume-123456-cinder"
+            ]
+        }
+
+        data = self.driver.list_replication_targets(context, vref)
+
+        expected_data = {
+            'volume_id': vref['id'],
+            'targets': [
+                {
+                    'target_device_id': mock_backend_1._target_device_id
+                },
+                {
+                    'target_device_id': mock_backend_2._target_device_id
+                }
+            ]
+        }
+        self.assertDictMatch(expected_data, data)
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_list_replication_targets_error_propagates(self,
+                                                       mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        self.array.get_pgroup = mock.MagicMock()
+        self.assert_error_propagates([self.array.get_pgroup],
+                                     self.driver.list_replication_targets,
+                                     mock.Mock(),
+                                     VOLUME)
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_list_replication_targets_no_targets(self, mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        mock_backend_1 = mock.MagicMock()
+        mock_backend_1.array_name = "cinder-backend-1"
+        mock_backend_1._target_device_id = "cinder-backend-1-id"
+        mock_backend_2 = mock.MagicMock()
+        mock_backend_2.array_name = "cinder-backend-2"
+        mock_backend_2._target_device_id = "cinder-backend-2-id"
+        self.driver._replication_target_arrays = [
+            mock_backend_1,
+            mock_backend_2
+        ]
+        context = mock.MagicMock()
+        vref = VOLUME
+        self.array.get_pgroup.return_value = {
+            "hgroups": None,
+            "hosts": None,
+            "name": "pg2",
+            "source": "pure01",
+            "targets": None,
+            "time_remaining": 86395,
+            "volumes": [
+                "volume-" + vref['id'] + "-cinder",
+                "volume-123456-cinder"
+            ]
+        }
+
+        data = self.driver.list_replication_targets(context, vref)
+
+        expected_data = {
+            'volume_id': vref['id'],
+            'targets': []
+        }
+        self.assertDictMatch(expected_data, data)
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_list_replication_targets_no_volumes(self, mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        mock_backend_1 = mock.MagicMock()
+        mock_backend_1.array_name = "cinder-backend-1"
+        mock_backend_1._target_device_id = "cinder-backend-1-id"
+        mock_backend_2 = mock.MagicMock()
+        mock_backend_2.array_name = "cinder-backend-2"
+        mock_backend_2._target_device_id = "cinder-backend-2-id"
+        self.driver._replication_target_arrays = [
+            mock_backend_1,
+            mock_backend_2
+        ]
+        context = mock.MagicMock()
+        vref = VOLUME
+        self.array.get_pgroup.return_value = {
+            "hgroups": None,
+            "hosts": None,
+            "name": "pg2",
+            "source": "pure01",
+            "targets": None,
+            "time_remaining": 86395,
+            "volumes": None
+        }
+
+        data = self.driver.list_replication_targets(context, vref)
+
+        expected_data = {
+            'volume_id': vref['id'],
+            'targets': []
+        }
+        self.assertDictMatch(expected_data, data)
+
+    @mock.patch('cinder.volume.volume_types.get_volume_type')
+    def test_list_replication_targets_no_secondary_configured(
+            self, mock_get_volume_type):
+        mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
+        self.driver._replication_target_arrays = []
+        context = mock.MagicMock()
+        vref = VOLUME
+        self.array.get_pgroup.return_value = {
+            "hgroups": None,
+            "hosts": None,
+            "name": "pg2",
+            "source": "pure01",
+            "targets": [
+                {
+                    "name": "dummy1",
+                    "allowed": True
+                },
+                {
+                    "name": "dummy2",
+                    "allowed": False
+                },
+            ],
+            "time_remaining": 86395,
+            "volumes": None
+        }
+
+        data = self.driver.list_replication_targets(context, vref)
+
+        expected_data = {
+            'volume_id': vref['id'],
+            'targets': []
+        }
+        self.assertDictMatch(expected_data, data)
 
 
 class PureISCSIDriverTestCase(PureDriverTestCase):
@@ -1223,33 +2060,20 @@ class PureISCSIDriverTestCase(PureDriverTestCase):
         self.driver = pure.PureISCSIDriver(configuration=self.mock_config)
         self.driver._array = self.array
 
-    def test_do_setup(self):
-        self.purestorage_module.FlashArray.return_value = self.array
-        self.array.get_rest_version.return_value = \
-            self.driver.SUPPORTED_REST_API_VERSIONS[0]
-        self.driver.do_setup(None)
-        self.purestorage_module.FlashArray.assert_called_with(
-            TARGET,
-            api_token=API_TOKEN
-        )
-        self.assertEqual(self.array, self.driver._array)
-        self.assertEqual(
-            self.driver.SUPPORTED_REST_API_VERSIONS,
-            self.purestorage_module.FlashArray.supported_rest_versions
-        )
-
     def test_get_host(self):
         good_host = PURE_HOST.copy()
         good_host.update(iqn=["another-wrong-iqn", INITIATOR_IQN])
         bad_host = {"name": "bad-host", "iqn": ["wrong-iqn"]}
         self.array.list_hosts.return_value = [bad_host]
-        real_result = self.driver._get_host(ISCSI_CONNECTOR)
+        real_result = self.driver._get_host(self.array, ISCSI_CONNECTOR)
         self.assertIs(None, real_result)
         self.array.list_hosts.return_value.append(good_host)
-        real_result = self.driver._get_host(ISCSI_CONNECTOR)
+        real_result = self.driver._get_host(self.array, ISCSI_CONNECTOR)
         self.assertEqual(good_host, real_result)
         self.assert_error_propagates([self.array.list_hosts],
-                                     self.driver._get_host, ISCSI_CONNECTOR)
+                                     self.driver._get_host,
+                                     self.array,
+                                     ISCSI_CONNECTOR)
 
     @mock.patch(ISCSI_DRIVER_OBJ + "._connect")
     @mock.patch(ISCSI_DRIVER_OBJ + "._get_target_iscsi_ports")
@@ -1373,7 +2197,7 @@ class PureISCSIDriverTestCase(PureDriverTestCase):
         self.array.connect_host.return_value = {"vol": vol_name, "lun": 1}
         real_result = self.driver._connect(VOLUME, ISCSI_CONNECTOR, None)
         self.assertEqual(result, real_result)
-        mock_host.assert_called_with(self.driver, ISCSI_CONNECTOR)
+        mock_host.assert_called_with(self.driver, self.array, ISCSI_CONNECTOR)
         self.assertFalse(mock_generate.called)
         self.assertFalse(self.array.create_host.called)
         self.array.connect_host.assert_called_with(PURE_HOST_NAME, vol_name)
@@ -1382,7 +2206,7 @@ class PureISCSIDriverTestCase(PureDriverTestCase):
         mock_host.return_value = None
         mock_generate.return_value = PURE_HOST_NAME
         real_result = self.driver._connect(VOLUME, ISCSI_CONNECTOR, None)
-        mock_host.assert_called_with(self.driver, ISCSI_CONNECTOR)
+        mock_host.assert_called_with(self.driver, self.array, ISCSI_CONNECTOR)
         mock_generate.assert_called_with(HOSTNAME)
         self.array.create_host.assert_called_with(PURE_HOST_NAME,
                                                   iqnlist=[INITIATOR_IQN])
@@ -1392,8 +2216,8 @@ class PureISCSIDriverTestCase(PureDriverTestCase):
         self.array.reset_mock()
         self.assert_error_propagates(
             [mock_host, mock_generate, self.array.connect_host,
-             self.array.create_host],
-            self.driver._connect, VOLUME, ISCSI_CONNECTOR, None)
+             self.array.create_host], self.driver._connect, VOLUME,
+            ISCSI_CONNECTOR, None)
 
         self.mock_config.use_chap_auth = True
         chap_user = ISCSI_CONNECTOR["host"]
@@ -1466,7 +2290,8 @@ class PureISCSIDriverTestCase(PureDriverTestCase):
                 text="Connection already exists"
             )
         self.assertRaises(self.purestorage_module.PureHTTPError,
-                          self.driver._connect, VOLUME, ISCSI_CONNECTOR, None)
+                          self.driver._connect, VOLUME,
+                          ISCSI_CONNECTOR, None)
         self.assertTrue(self.array.connect_host.called)
         self.assertTrue(self.array.list_volume_private_connections)
 
@@ -1479,33 +2304,20 @@ class PureFCDriverTestCase(PureDriverTestCase):
         self.driver._array = self.array
         self.driver._lookup_service = mock.Mock()
 
-    def test_do_setup(self):
-        self.purestorage_module.FlashArray.return_value = self.array
-        self.array.get_rest_version.return_value = \
-            self.driver.SUPPORTED_REST_API_VERSIONS[0]
-        self.driver.do_setup(None)
-        self.purestorage_module.FlashArray.assert_called_with(
-            TARGET,
-            api_token=API_TOKEN
-        )
-        self.assertEqual(self.array, self.driver._array)
-        self.assertEqual(
-            self.driver.SUPPORTED_REST_API_VERSIONS,
-            self.purestorage_module.FlashArray.supported_rest_versions
-        )
-
     def test_get_host(self):
         good_host = PURE_HOST.copy()
         good_host.update(wwn=["another-wrong-wwn", INITIATOR_WWN])
         bad_host = {"name": "bad-host", "wwn": ["wrong-wwn"]}
         self.array.list_hosts.return_value = [bad_host]
-        actual_result = self.driver._get_host(FC_CONNECTOR)
+        actual_result = self.driver._get_host(self.array, FC_CONNECTOR)
         self.assertIs(None, actual_result)
         self.array.list_hosts.return_value.append(good_host)
-        actual_result = self.driver._get_host(FC_CONNECTOR)
+        actual_result = self.driver._get_host(self.array, FC_CONNECTOR)
         self.assertEqual(good_host, actual_result)
         self.assert_error_propagates([self.array.list_hosts],
-                                     self.driver._get_host, FC_CONNECTOR)
+                                     self.driver._get_host,
+                                     self.array,
+                                     FC_CONNECTOR)
 
     @mock.patch(FC_DRIVER_OBJ + "._connect")
     def test_initialize_connection(self, mock_connection):
@@ -1530,7 +2342,7 @@ class PureFCDriverTestCase(PureDriverTestCase):
         self.array.connect_host.return_value = {"vol": vol_name, "lun": 1}
         real_result = self.driver._connect(VOLUME, FC_CONNECTOR)
         self.assertEqual(result, real_result)
-        mock_host.assert_called_with(self.driver, FC_CONNECTOR)
+        mock_host.assert_called_with(self.driver, self.array, FC_CONNECTOR)
         self.assertFalse(mock_generate.called)
         self.assertFalse(self.array.create_host.called)
         self.array.connect_host.assert_called_with(PURE_HOST_NAME, vol_name)
@@ -1539,7 +2351,7 @@ class PureFCDriverTestCase(PureDriverTestCase):
         mock_host.return_value = None
         mock_generate.return_value = PURE_HOST_NAME
         real_result = self.driver._connect(VOLUME, FC_CONNECTOR)
-        mock_host.assert_called_with(self.driver, FC_CONNECTOR)
+        mock_host.assert_called_with(self.driver, self.array, FC_CONNECTOR)
         mock_generate.assert_called_with(HOSTNAME)
         self.array.create_host.assert_called_with(PURE_HOST_NAME,
                                                   wwnlist={INITIATOR_WWN})
@@ -1604,13 +2416,6 @@ class PureVolumeUpdateStatsTestCase(PureBaseSharedDriverTestCase):
         super(PureVolumeUpdateStatsTestCase, self).setUp()
         self.array.get.side_effect = self.fake_get_array
 
-    def fake_get_array(*args, **kwargs):
-        if 'action' in kwargs and kwargs['action'] is 'monitor':
-            return PERF_INFO_RAW
-
-        if 'space' in kwargs and kwargs['space'] is True:
-            return SPACE_INFO
-
     @ddt.data(dict(used=10,
                    provisioned=100,
                    config_ratio=5,
@@ -1693,6 +2498,9 @@ class PureVolumeUpdateStatsTestCase(PureBaseSharedDriverTestCase):
             'usec_per_read_op': PERF_INFO['usec_per_read_op'],
             'usec_per_write_op': PERF_INFO['usec_per_write_op'],
             'queue_depth': PERF_INFO['queue_depth'],
+            'replication_enabled': False,
+            'replication_type': ['async'],
+            'replication_count': 0
         }
 
         real_result = self.driver.get_volume_stats(refresh=True)
index 04e02351571b5726c377017fc04966332a7c5c15..d329310f488f769eb44e3ca96e37ea1c920e004b 100644 (file)
@@ -35,6 +35,7 @@ from cinder import utils
 from cinder.volume import driver
 from cinder.volume.drivers.san import san
 from cinder.volume import utils as volume_utils
+from cinder.volume import volume_types
 from cinder.zonemanager import utils as fczm_utils
 
 try:
@@ -52,7 +53,19 @@ PURE_OPTS = [
                 help="Automatically determine an oversubscription ratio based "
                      "on the current total data reduction values. If used "
                      "this calculated value will override the "
-                     "max_over_subscription_ratio config option.")
+                     "max_over_subscription_ratio config option."),
+    # These are used as default settings.  In future these can be overridden
+    # by settings in volume-type.
+    cfg.IntOpt("pure_replica_interval_default", default=900,
+               help="Snapshot replication interval in seconds."),
+    cfg.IntOpt("pure_replica_retention_short_term_default", default=14400,
+               help="Retain all snapshots on target for this "
+                    "time (in seconds.)"),
+    cfg.IntOpt("pure_replica_retention_long_term_per_day_default", default=3,
+               help="Retain how many snapshots for each day."),
+    cfg.IntOpt("pure_replica_retention_long_term_default", default=7,
+               help="Retain snapshots per day on target for this time "
+                    "(in days.)"),
 ]
 
 CONF = cfg.CONF
@@ -61,15 +74,30 @@ CONF.register_opts(PURE_OPTS)
 INVALID_CHARACTERS = re.compile(r"[^-a-zA-Z0-9]")
 GENERATED_NAME = re.compile(r".*-[a-f0-9]{32}-cinder$")
 
+REPLICATION_CG_NAME = "cinder-group"
+
 CHAP_SECRET_KEY = "PURE_TARGET_CHAP_SECRET"
 
 ERR_MSG_NOT_EXIST = "does not exist"
 ERR_MSG_PENDING_ERADICATION = "has been destroyed"
+ERR_MSG_ALREADY_EXISTS = "already exists"
+ERR_MSG_COULD_NOT_BE_FOUND = "could not be found"
+ERR_MSG_ALREADY_INCLUDES = "already includes"
+ERR_MSG_ALREADY_ALLOWED = "already allowed on"
+ERR_MSG_NOT_CONNECTED = "is not connected"
+ERR_MSG_ALREADY_BELONGS = "already belongs to"
+
+EXTRA_SPECS_REPL_ENABLED = "replication_enabled"
 
 CONNECT_LOCK_NAME = 'PureVolumeDriver_connect'
 
+
 UNMANAGED_SUFFIX = '-unmanaged'
 MANAGE_SNAP_REQUIRED_API_VERSIONS = ['1.4', '1.5']
+REPLICATION_REQUIRED_API_VERSIONS = ['1.3', '1.4', '1.5']
+
+REPL_SETTINGS_PROPAGATE_RETRY_INTERVAL = 5  # 5 seconds
+REPL_SETTINGS_PROPAGATE_MAX_RETRIES = 36  # 36 * 5 = 180 seconds
 
 
 def log_debug_trace(f):
@@ -99,6 +127,61 @@ class PureBaseVolumeDriver(san.SanDriver):
         self._storage_protocol = None
         self._backend_name = (self.configuration.volume_backend_name or
                               self.__class__.__name__)
+        self._replication_target_arrays = []
+        self._replication_pg_name = REPLICATION_CG_NAME
+        self._replication_interval = None
+        self._replication_retention_short_term = None
+        self._replication_retention_long_term = None
+        self._replication_retention_long_term_per_day = None
+        self._is_replication_enabled = False
+
+    def parse_replication_configs(self):
+        self._replication_interval = (
+            self.configuration.pure_replica_interval_default)
+        self._replication_retention_short_term = (
+            self.configuration.pure_replica_retention_short_term_default)
+        self._replication_retention_long_term = (
+            self.configuration.pure_replica_retention_long_term_default)
+        self._replication_retention_long_term_per_day = (
+            self.configuration.
+            pure_replica_retention_long_term_per_day_default)
+
+        retention_policy = self._generate_replication_retention()
+        replication_devices = self.configuration.safe_get(
+            'replication_device')
+        if replication_devices:
+            for replication_device in replication_devices:
+                target_device_id = replication_device["target_device_id"]
+                san_ip = replication_device["san_ip"]
+                api_token = replication_device["api_token"]
+                target_array = self._get_flasharray(san_ip, api_token)
+                target_array._target_device_id = target_device_id
+                LOG.debug("Adding san_ip %(san_ip)s to replication_targets.",
+                          {"san_ip": san_ip})
+                api_version = target_array.get_rest_version()
+                if api_version not in REPLICATION_REQUIRED_API_VERSIONS:
+                    msg = _('Unable to do replication with Purity REST '
+                            'API version %(api_version)s, requires one of '
+                            '%(required_versions)s.') % {
+                        'api_version': api_version,
+                        'required_versions': REPLICATION_REQUIRED_API_VERSIONS
+                    }
+                    raise exception.PureDriverException(reason=msg)
+                target_array_info = target_array.get()
+                target_array.array_name = target_array_info["array_name"]
+                target_array.array_id = target_array_info["id"]
+                LOG.debug("secondary array name: %s", self._array.array_name)
+                LOG.debug("secondary array id: %s", self._array.array_id)
+                self._setup_replicated_pgroups(target_array, [self._array],
+                                               self._replication_pg_name,
+                                               self._replication_interval,
+                                               retention_policy)
+                self._replication_target_arrays.append(target_array)
+        self._setup_replicated_pgroups(self._array,
+                                       self._replication_target_arrays,
+                                       self._replication_pg_name,
+                                       self._replication_interval,
+                                       retention_policy)
 
     def do_setup(self, context):
         """Performs driver initialization steps that could raise exceptions."""
@@ -111,9 +194,22 @@ class PureBaseVolumeDriver(san.SanDriver):
         # if unable to authenticate.
         purestorage.FlashArray.supported_rest_versions = \
             self.SUPPORTED_REST_API_VERSIONS
-        self._array = purestorage.FlashArray(
+        self._array = self._get_flasharray(
             self.configuration.san_ip,
             api_token=self.configuration.pure_api_token)
+        self._array._target_device_id = self.configuration.config_group
+        LOG.debug("Primary array target_device_id: %s",
+                  self.configuration.config_group)
+        LOG.debug("Primary array name: %s", self._array.array_name)
+        LOG.debug("Primary array id: %s", self._array.array_id)
+        self.do_setup_replication()
+
+    def do_setup_replication(self):
+        replication_devices = self.configuration.safe_get(
+            'replication_device')
+        if replication_devices:
+            self.parse_replication_configs()
+            self._is_replication_enabled = True
 
     def check_for_setup_error(self):
         # Avoid inheriting check_for_setup_error from SanDriver, which checks
@@ -133,6 +229,12 @@ class PureBaseVolumeDriver(san.SanDriver):
                 vol_name
             )
 
+        model_update = {'provider_location': self._array.array_id}
+        if self._add_and_replicate_if_needed(self._array, volume):
+            model_update['replication_status'] = 'enabled'
+
+        return model_update
+
     @log_debug_trace
     def create_volume_from_snapshot(self, volume, snapshot):
         """Creates a volume from a snapshot."""
@@ -147,50 +249,109 @@ class PureBaseVolumeDriver(san.SanDriver):
                     '%(id)s.') % {'id': snapshot['id']}
             raise exception.PureDriverException(reason=msg)
 
-        self._array.copy_volume(snap_name, vol_name)
-        self._extend_if_needed(vol_name, snapshot["volume_size"],
+        # Check which backend the snapshot is on. In case of failover and
+        # snapshot on failed over volume the snapshot may be on the
+        # secondary array.
+        current_array = self._get_current_array(snapshot)
+
+        current_array.copy_volume(snap_name, vol_name)
+        self._extend_if_needed(current_array,
+                               vol_name,
+                               snapshot["volume_size"],
                                volume["size"])
+
+        # TODO(dwilson): figure out if we need to mirror consisgroup on
+        # target array if failover has occurred.
         if volume['consistencygroup_id']:
-            self._add_volume_to_consistency_group(
-                volume['consistencygroup_id'],
-                vol_name
-            )
+            if current_array.array_id == self._array.array_id:
+                self._add_volume_to_consistency_group(
+                    volume['consistencygroup_id'],
+                    vol_name)
+            else:
+                LOG.warning(_LW("Volume %s is failed over - skipping addition"
+                                " to Consistency Group."), volume["id"])
+
+        model_update = {"provider_location": current_array.array_id}
+        if self._add_and_replicate_if_needed(current_array, volume):
+            model_update['replication_status'] = 'enabled'
+
+        return model_update
+
+    def _add_and_replicate_if_needed(self, array, volume):
+        """Add volume to protection group and create a snapshot."""
+        if self._is_volume_replicated_type(volume):
+            try:
+                array.set_pgroup(self._replication_pg_name,
+                                 addvollist=[self._get_vol_name(volume)])
+            except purestorage.PureHTTPError as err:
+                with excutils.save_and_reraise_exception() as ctxt:
+                    if (err.code == 400 and
+                            ERR_MSG_ALREADY_BELONGS in err.text):
+                        # Happens if the volume already added to PG.
+                        ctxt.reraise = False
+                        LOG.warning(_LW("Adding Volume to Protection Group "
+                                        "failed with message: %s"), err.text)
+            array.create_pgroup_snapshot(self._replication_pg_name,
+                                         replicate_now=True,
+                                         apply_retention=True)
+            return True
+        else:
+            return False
 
     @log_debug_trace
     def create_cloned_volume(self, volume, src_vref):
         """Creates a clone of the specified volume."""
         vol_name = self._get_vol_name(volume)
         src_name = self._get_vol_name(src_vref)
-        self._array.copy_volume(src_name, vol_name)
-        self._extend_if_needed(vol_name, src_vref["size"], volume["size"])
 
+        # Check which backend the source volume is on. In case of failover
+        # the source volume may be on the secondary array.
+        current_array = self._get_current_array(src_vref)
+        current_array.copy_volume(src_name, vol_name)
+        self._extend_if_needed(current_array,
+                               vol_name,
+                               src_vref["size"],
+                               volume["size"])
+
+        # TODO(dwilson): figure out if we need to mirror consisgroup on
+        # target array if failover has occurred.
         if volume['consistencygroup_id']:
-            self._add_volume_to_consistency_group(
-                volume['consistencygroup_id'],
-                vol_name
-            )
+            if current_array.array_id == self._array.array_id:
+                self._add_volume_to_consistency_group(
+                    volume['consistencygroup_id'],
+                    vol_name)
+            else:
+                LOG.warning(_LW("Volume %s is failed over - skipping addition"
+                                " to Consistency Group."), volume["id"])
 
-    def _extend_if_needed(self, vol_name, src_size, vol_size):
+        model_update = {"provider_location": current_array.array_id}
+        if self._add_and_replicate_if_needed(current_array, volume):
+            model_update['replication_status'] = 'enabled'
+
+        return model_update
+
+    def _extend_if_needed(self, array, vol_name, src_size, vol_size):
         """Extend the volume from size src_size to size vol_size."""
         if vol_size > src_size:
             vol_size = vol_size * units.Gi
-            self._array.extend_volume(vol_name, vol_size)
+            array.extend_volume(vol_name, vol_size)
 
     @log_debug_trace
     def delete_volume(self, volume):
         """Disconnect all hosts and delete the volume"""
         vol_name = self._get_vol_name(volume)
+        current_array = self._get_current_array(volume)
         try:
-            connected_hosts = \
-                self._array.list_volume_private_connections(vol_name)
+            connected_hosts = current_array.list_volume_private_connections(
+                vol_name)
             for host_info in connected_hosts:
                 host_name = host_info["host"]
-                self._disconnect_host(host_name, vol_name)
-            self._array.destroy_volume(vol_name)
+                self._disconnect_host(current_array, host_name, vol_name)
+            current_array.destroy_volume(vol_name)
         except purestorage.PureHTTPError as err:
             with excutils.save_and_reraise_exception() as ctxt:
-                if err.code == 400 and \
-                        ERR_MSG_NOT_EXIST in err.text:
+                if (err.code == 400 and
+                        ERR_MSG_NOT_EXIST in err.text):
                     # Happens if the volume does not exist.
                     ctxt.reraise = False
                     LOG.warning(_LW("Volume deletion failed with message: %s"),
@@ -199,15 +360,23 @@ class PureBaseVolumeDriver(san.SanDriver):
     @log_debug_trace
     def create_snapshot(self, snapshot):
         """Creates a snapshot."""
+
+        # Get current array in case we have failed over via replication.
+        current_array = self._get_current_array(snapshot)
         vol_name, snap_suff = self._get_snap_name(snapshot).split(".")
-        self._array.create_snapshot(vol_name, suffix=snap_suff)
+        current_array.create_snapshot(vol_name, suffix=snap_suff)
+        return {'provider_location': current_array.array_id}
 
     @log_debug_trace
     def delete_snapshot(self, snapshot):
         """Deletes a snapshot."""
+
+        # Get current array in case we have failed over via replication.
+        current_array = self._get_current_array(snapshot)
+
         snap_name = self._get_snap_name(snapshot)
         try:
-            self._array.destroy_volume(snap_name)
+            current_array.destroy_volume(snap_name)
         except purestorage.PureHTTPError as err:
             with excutils.save_and_reraise_exception() as ctxt:
                 if err.code == 400 and (
@@ -230,7 +399,7 @@ class PureBaseVolumeDriver(san.SanDriver):
         """
         raise NotImplementedError
 
-    def _get_host(self, connector):
+    def _get_host(self, array, connector):
         """Get a Purity Host that corresponds to the host in the connector.
 
         This implementation is specific to the host type (iSCSI, FC, etc).
@@ -238,14 +407,15 @@ class PureBaseVolumeDriver(san.SanDriver):
         raise NotImplementedError
 
     @utils.synchronized(CONNECT_LOCK_NAME, external=True)
-    def _disconnect(self, volume, connector, **kwargs):
+    def _disconnect(self, array, volume, connector, **kwargs):
         vol_name = self._get_vol_name(volume)
-        host = self._get_host(connector)
+        host = self._get_host(array, connector)
         if host:
             host_name = host["name"]
-            result = self._disconnect_host(host_name, vol_name)
+            result = self._disconnect_host(array, host_name, vol_name)
         else:
-            LOG.error(_LE("Unable to disconnect host from volume."))
+            LOG.error(_LE("Unable to disconnect host from volume, could not "
+                          "determine Purity host"))
             result = False
 
         return result
@@ -253,27 +423,28 @@ class PureBaseVolumeDriver(san.SanDriver):
     @log_debug_trace
     def terminate_connection(self, volume, connector, **kwargs):
         """Terminate connection."""
-        self._disconnect(volume, connector, **kwargs)
+        # Get current array in case we have failed over via replication.
+        current_array = self._get_current_array(volume)
+        self._disconnect(current_array, volume, connector, **kwargs)
 
     @log_debug_trace
-    def _disconnect_host(self, host_name, vol_name):
+    def _disconnect_host(self, array, host_name, vol_name):
         """Return value indicates if host was deleted on array or not"""
         try:
-            self._array.disconnect_host(host_name, vol_name)
+            array.disconnect_host(host_name, vol_name)
         except purestorage.PureHTTPError as err:
             with excutils.save_and_reraise_exception() as ctxt:
-                if err.code == 400:
+                if err.code == 400 and ERR_MSG_NOT_CONNECTED in err.text:
                     # Happens if the host and volume are not connected.
                     ctxt.reraise = False
                     LOG.error(_LE("Disconnection failed with message: "
                                   "%(msg)s."), {"msg": err.text})
         if (GENERATED_NAME.match(host_name) and
-            not self._array.list_host_connections(host_name,
-                                                  private=True)):
+                not array.list_host_connections(host_name, private=True)):
             LOG.info(_LI("Deleting unneeded host %(host_name)r."),
                      {"host_name": host_name})
             try:
-                self._array.delete_host(host_name)
+                array.delete_host(host_name)
             except purestorage.PureHTTPError as err:
                 with excutils.save_and_reraise_exception() as ctxt:
                     if err.code == 400 and ERR_MSG_NOT_EXIST in err.text:
@@ -364,6 +535,9 @@ class PureBaseVolumeDriver(san.SanDriver):
         data['usec_per_write_op'] = perf_info['usec_per_write_op']
         data['queue_depth'] = perf_info['queue_depth']
 
+        data["replication_enabled"] = self._is_replication_enabled
+        data["replication_type"] = ["async"]
+        data["replication_count"] = len(self._replication_target_arrays)
         self._stats = data
 
     def _get_provisioned_space(self):
@@ -394,9 +568,13 @@ class PureBaseVolumeDriver(san.SanDriver):
     @log_debug_trace
     def extend_volume(self, volume, new_size):
         """Extend volume to new_size."""
+
+        # Get current array in case we have failed over via replication.
+        current_array = self._get_current_array(volume)
+
         vol_name = self._get_vol_name(volume)
         new_size = new_size * units.Gi
-        self._array.extend_volume(vol_name, new_size)
+        current_array.extend_volume(vol_name, new_size)
 
     def _add_volume_to_consistency_group(self, consistencygroup_id, vol_name):
         pgroup_name = self._get_pgroup_name_from_id(consistencygroup_id)
@@ -463,7 +641,13 @@ class PureBaseVolumeDriver(san.SanDriver):
         elif source_cg:
             self._create_cg_from_cg(group, source_cg, volumes, source_vols)
 
-        return None, None
+        return_volumes = []
+        for volume in volumes:
+            return_volume = {'id': volume.id, 'status': 'available',
+                             'provider_location': self._array.array_id}
+            return_volumes.append(return_volume)
+        model_update = {'status': 'available'}
+        return model_update, return_volumes
 
     @log_debug_trace
     def delete_consistencygroup(self, context, group, volumes):
@@ -527,7 +711,8 @@ class PureBaseVolumeDriver(san.SanDriver):
         for snapshot in snapshots:
             snapshot_updates.append({
                 'id': snapshot.id,
-                'status': 'available'
+                'status': 'available',
+                'provider_location': self._array.array_id
             })
 
         model_update = {'status': 'available'}
@@ -584,7 +769,7 @@ class PureBaseVolumeDriver(san.SanDriver):
                          " key to identify an existing volume."))
 
         if is_snap:
-            # Purity snapshot names are prefixed with the source volume name
+            # Purity snapshot names are prefixed with the source volume name.
             ref_vol_name, ref_snap_suffix = existing_ref['name'].split('.')
         else:
             ref_vol_name = existing_ref['name']
@@ -605,7 +790,7 @@ class PureBaseVolumeDriver(san.SanDriver):
                     ctxt.reraise = False
 
         # If volume information was unable to be retrieved we need
-        # to throw a Invalid Reference exception
+        # to throw a Invalid Reference exception.
         raise exception.ManageExistingInvalidReference(
             existing_ref=existing_ref,
             reason=_("Unable to find Purity ref with name=%s") % ref_vol_name)
@@ -675,6 +860,7 @@ class PureBaseVolumeDriver(san.SanDriver):
 
         The volume will be renamed with "-unmanaged" as a suffix
         """
+
         vol_name = self._get_vol_name(volume)
         unmanaged_vol_name = vol_name + UNMANAGED_SUFFIX
         LOG.info(_LI("Renaming existing volume %(ref_name)s to %(new_name)s"),
@@ -738,6 +924,19 @@ class PureBaseVolumeDriver(san.SanDriver):
                                        "new_name": unmanaged_snap_name})
         self._rename_volume_object(snap_name, unmanaged_snap_name)
 
+    @staticmethod
+    def _get_flasharray(san_ip, api_token, rest_version=None):
+        array = purestorage.FlashArray(san_ip,
+                                       api_token=api_token,
+                                       rest_version=rest_version)
+        array_info = array.get()
+        array.array_name = array_info["array_name"]
+        array.array_id = array_info["id"]
+        LOG.debug("connected to %(array_name)s with REST API %(api_version)s",
+                  {"array_name": array.array_name,
+                   "api_version": array._rest_version})
+        return array
+
     @staticmethod
     def _get_vol_name(volume):
         """Return the name of the volume Purity will use."""
@@ -794,23 +993,24 @@ class PureBaseVolumeDriver(san.SanDriver):
         name = name.lstrip("-")
         return "{name}-{uuid}-cinder".format(name=name, uuid=uuid.uuid4().hex)
 
-    def _connect_host_to_vol(self, host_name, vol_name):
+    @staticmethod
+    def _connect_host_to_vol(array, host_name, vol_name):
         connection = None
         try:
-            connection = self._array.connect_host(host_name, vol_name)
+            connection = array.connect_host(host_name, vol_name)
         except purestorage.PureHTTPError as err:
             with excutils.save_and_reraise_exception() as ctxt:
                 if (err.code == 400 and
-                        "Connection already exists" in err.text):
+                        ERR_MSG_ALREADY_EXISTS in err.text):
                     # Happens if the volume is already connected to the host.
                     # Treat this as a success.
                     ctxt.reraise = False
                     LOG.debug("Volume connection already exists for Purity "
                               "host with message: %s", err.text)
 
-                    # Get the info for the existing connection
-                    connected_hosts = \
-                        self._array.list_volume_private_connections(vol_name)
+                    # Get the info for the existing connection.
+                    connected_hosts = (
+                        array.list_volume_private_connections(vol_name))
                     for host_info in connected_hosts:
                         if host_info["host"] == host_name:
                             connection = host_info
@@ -825,11 +1025,417 @@ class PureBaseVolumeDriver(san.SanDriver):
         """Retype from one volume type to another on the same backend.
 
         For a Pure Array there is currently no differentiation between types
-        of volumes. This means that changing from one type to another on the
-        same array should be a no-op.
+        of volumes other than some being part of a protection group to be
+        replicated.
         """
+        previous_vol_replicated = self._is_volume_replicated_type(volume)
+
+        new_vol_replicated = False
+        if new_type:
+            specs = new_type.get("extra_specs")
+            if specs and EXTRA_SPECS_REPL_ENABLED in specs:
+                replication_capability = specs[EXTRA_SPECS_REPL_ENABLED]
+                # Do not validate settings, ignore invalid.
+                new_vol_replicated = (replication_capability == "<is> True")
+
+        if previous_vol_replicated and not new_vol_replicated:
+            # Remove from protection group.
+            self.replication_disable(context, volume)
+        elif not previous_vol_replicated and new_vol_replicated:
+            # Add to protection group.
+            self.replication_enable(context, volume)
+
         return True, None
 
+    # Replication v2
+    @log_debug_trace
+    def replication_enable(self, context, volume):
+        """Enable replication on the given volume."""
+
+        # Get current array in case we have failed over.
+        current_array = self._get_current_array(volume)
+        LOG.debug("Enabling replication for volume %(id)s residing on "
+                  "array %(target_device_id)s." %
+                  {"id": volume["id"],
+                   "target_device_id": current_array._target_device_id})
+
+        model_update = {"provider_location": current_array.array_id}
+        if self._add_and_replicate_if_needed(current_array, volume):
+            model_update['replication_status'] = 'enabled'
+
+        return model_update
+
+    @log_debug_trace
+    def replication_disable(self, context, volume):
+        """Disable replication on the given volume."""
+
+        # Get current array in case we have failed over via replication.
+        current_array = self._get_current_array(volume)
+        LOG.debug("Disabling replication for volume %(id)s residing on "
+                  "array %(target_device_id)s." %
+                  {"id": volume["id"],
+                   "target_device_id": current_array._target_device_id})
+
+        try:
+            current_array.set_pgroup(self._replication_pg_name,
+                                     remvollist=(
+                                         [self._get_vol_name(volume)]))
+        except purestorage.PureHTTPError as err:
+            with excutils.save_and_reraise_exception() as ctxt:
+                if (err.code == 400 and
+                        ERR_MSG_COULD_NOT_BE_FOUND in err.text):
+                    ctxt.reraise = False
+                    LOG.warning(_LW("Disable replication on volume failed: "
+                                    "already disabled: %s"), err.text)
+                else:
+                    LOG.error(_LE("Disable replication on volume failed with "
+                                  "message: %s"), err.text)
+        return {'replication_status': 'disabled'}
+
+    @log_debug_trace
+    def replication_failover(self, context, volume, secondary):
+        """Failover volume to the secondary array
+
+        This action will not affect the original volume in any
+        way and it will stay as is. If a subsequent replication_enable
+        and failover is performed we will simply overwrite the original
+        volume.
+        """
+        vol_name = self._get_vol_name(volume)
+        # Get the latest replicated snapshot for src_name volume.
+        # Find "source": "<source_array>:<vol_name>" in snapshot attributes.
+        secondary_array = None
+        current_array, failover_candidate_arrays = self._get_arrays(volume)
+        LOG.debug("Failover replication for volume %(id)s residing on "
+                  "array %(primary)s to %(secondary)s." %
+                  {"id": volume["id"],
+                   "primary": current_array._target_device_id,
+                   "secondary": secondary})
+
+        if not failover_candidate_arrays:
+                raise exception.PureDriverException(
+                    reason=_("Unable to failover volume %(volume)s, no "
+                             "secondary targets configured.") %
+                    {'volume': vol_name})
+
+        if secondary:
+            for array in failover_candidate_arrays:
+                if array._target_device_id == secondary:
+                    secondary_array = array
+        if not secondary_array:
+            raise exception.PureDriverException(
+                reason=_("Unable to determine secondary_array from supplied "
+                         "secondary: %(secondary)s.") %
+                {"secondary": secondary}
+            )
+        LOG.debug("Starting failover from %(primary)s to %(secondary)s",
+                  {"primary": current_array.array_name,
+                   "secondary": secondary_array.array_name})
+
+        vol_source_name_to_find = "%s:%s" % (current_array.array_name,
+                                             vol_name)
+        volume_snap = self._get_latest_replicated_vol_snap(
+            secondary_array,
+            current_array.array_name,
+            self._replication_pg_name,
+            vol_name)
+        if not volume_snap:
+            raise exception.PureDriverException(
+                reason=_("Unable to find volume snapshot for %s.")
+                % vol_source_name_to_find)
+        # Create volume from snapshot.
+        secondary_array.copy_volume(volume_snap["name"],
+                                    vol_name,
+                                    overwrite=True)
+        # New volume inherits replicated type, but is not actively replicating.
+        model_update = {"provider_location": secondary_array.array_id,
+                        "replication_status": "failed-over"}
+        return model_update
+
+    @log_debug_trace
+    def list_replication_targets(self, context, vref):
+        """Return all connected arrays that are active."""
+        data = {'volume_id': vref['id']}
+        status = {}
+        current_array, failover_candidate_arrays = self._get_arrays(vref)
+        LOG.debug("List replication targets for volume %(id)s residing on "
+                  "array %(primary)s." %
+                  {"id": vref["id"],
+                   "primary": current_array._target_device_id})
+        pgroup = current_array.get_pgroup(self._replication_pg_name)
+        volume_name = self._get_vol_name(vref)
+        volumes_in_pgroup = pgroup["volumes"]
+        is_vol_in_pgroup = (volumes_in_pgroup and
+                            volume_name in pgroup["volumes"])
+        # Purity returns None instead of empty list if no targets
+        target_arrays = pgroup.get("targets") or []
+        for target_array in target_arrays:
+            if is_vol_in_pgroup:
+                status[target_array["name"]] = target_array["allowed"]
+            else:
+                status[target_array["name"]] = False
+
+        remote_targets = []
+
+        for flash_array in (failover_candidate_arrays or []):
+            if flash_array.array_name in status:
+                remote_targets.append(
+                    {'target_device_id': flash_array._target_device_id})
+
+        data["targets"] = remote_targets
+        return data
+
+    def get_replication_updates(self, context):
+        # currently, the manager does not use these updates.
+        # TODO(mudassir): update this when implemented in manager
+        return []
+
+    def _get_current_array(self, volume):
+        current_array, _ = self._get_arrays(volume)
+        return current_array
+
+    def _get_arrays(self, volume):
+        """Returns the current and secondary arrays for a volume or snapshot
+
+        :param volume: volume or snapshot object
+        :return: the current_array, list of secondary_arrays for the volume
+        """
+        current_array_id = volume.get("provider_location", None)
+        # Default to configured current array, including case when
+        # provider_location is misconfigured.
+        primary_array = self._array
+        secondary_arrays = []
+
+        if self._replication_target_arrays:
+            secondary_arrays = self._replication_target_arrays
+
+        if current_array_id and not current_array_id == self._array.array_id:
+            for flash_array in self._replication_target_arrays:
+                if flash_array.array_id == current_array_id:
+                    primary_array = flash_array
+                    secondary_arrays = [self._array]
+                    break
+
+        return primary_array, secondary_arrays
+
+    def _does_pgroup_exist(self, array, pgroup_name):
+        """Return True/False"""
+        try:
+            array.get_pgroup(pgroup_name)
+            return True
+        except purestorage.PureHTTPError as err:
+            with excutils.save_and_reraise_exception() as ctxt:
+                if err.code == 400 and ERR_MSG_NOT_EXIST in err.text:
+                    ctxt.reraise = False
+                    return False
+            # Any unexpected exception to be handled by caller.
+
+    @log_debug_trace
+    @utils.retry(exception.PureDriverException,
+                 REPL_SETTINGS_PROPAGATE_RETRY_INTERVAL,
+                 REPL_SETTINGS_PROPAGATE_MAX_RETRIES)
+    def _wait_until_target_group_setting_propagates(
+            self, target_array, pgroup_name_on_target):
+        # Wait for pgroup to show up on target array.
+        if self._does_pgroup_exist(target_array, pgroup_name_on_target):
+            return
+        else:
+            raise exception.PureDriverException(message=
+                                                _('Protection Group not '
+                                                  'ready.'))
+
+    @log_debug_trace
+    @utils.retry(exception.PureDriverException,
+                 REPL_SETTINGS_PROPAGATE_RETRY_INTERVAL,
+                 REPL_SETTINGS_PROPAGATE_MAX_RETRIES)
+    def _wait_until_source_array_allowed(self, source_array, pgroup_name):
+        result = source_array.get_pgroup(pgroup_name)
+        if result["targets"][0]["allowed"]:
+            return
+        else:
+            raise exception.PureDriverException(message=_('Replication not '
+                                                          'allowed yet.'))
+
+    def _get_pgroup_name_on_target(self, source_array_name, pgroup_name):
+        return "%s:%s" % (source_array_name, pgroup_name)
+
+    @log_debug_trace
+    def _setup_replicated_pgroups(self, primary, secondaries, pg_name,
+                                  replication_interval, retention_policy):
+            self._create_protection_group_if_not_exist(
+                primary, pg_name)
+
+            # Apply retention policies to a protection group.
+            # These retention policies will be applied on the replicated
+            # snapshots on the target array.
+            primary.set_pgroup(pg_name, **retention_policy)
+
+            # Configure replication propagation frequency on a
+            # protection group.
+            primary.set_pgroup(pg_name,
+                               replicate_frequency=replication_interval)
+            for target_array in secondaries:
+                try:
+                    # Configure PG to replicate to target_array.
+                    primary.set_pgroup(pg_name,
+                                       addtargetlist=[target_array.array_name])
+                except purestorage.PureHTTPError as err:
+                    with excutils.save_and_reraise_exception() as ctxt:
+                        if err.code == 400 and (
+                                ERR_MSG_ALREADY_INCLUDES
+                                in err.text):
+                            ctxt.reraise = False
+                            LOG.info(_LI("Skipping add target %(target_array)s"
+                                         " to protection group %(pgname)s"
+                                         " since it's already added."),
+                                     {"target_array": target_array.array_name,
+                                      "pgname": pg_name})
+
+            # Wait until "Target Group" setting propagates to target_array.
+            pgroup_name_on_target = self._get_pgroup_name_on_target(
+                primary.array_name, pg_name)
+
+            for target_array in secondaries:
+                self._wait_until_target_group_setting_propagates(
+                    target_array,
+                    pgroup_name_on_target)
+                try:
+                    # Configure the target_array to allow replication from the
+                    # PG on source_array.
+                    target_array.set_pgroup(pgroup_name_on_target,
+                                            allowed=True)
+                except purestorage.PureHTTPError as err:
+                    with excutils.save_and_reraise_exception() as ctxt:
+                        if (err.code == 400 and
+                                ERR_MSG_ALREADY_ALLOWED in err.text):
+                            ctxt.reraise = False
+                            LOG.info(_LI("Skipping allow pgroup %(pgname)s on "
+                                         "target array %(target_array)s since "
+                                         "it is already allowed."),
+                                     {"pgname": pg_name,
+                                      "target_array": target_array.array_name})
+
+            # Wait until source array acknowledges previous operation.
+            self._wait_until_source_array_allowed(primary, pg_name)
+            # Start replication on the PG.
+            primary.set_pgroup(pg_name, replicate_enabled=True)
+
+    @log_debug_trace
+    def _generate_replication_retention(self):
+        """Generates replication retention settings in Purity compatible format
+
+        An example of the settings:
+        target_all_for = 14400 (i.e. 4 hours)
+        target_per_day = 6
+        target_days = 4
+        The settings above configure the target array to retain 4 hours of
+        the most recent snapshots.
+        After the most recent 4 hours, the target will choose 4 snapshots
+        per day from the previous 6 days for retention
+
+        :return: a dictionary representing replication retention settings
+        """
+        replication_retention = dict(
+            target_all_for=self._replication_retention_short_term,
+            target_per_day=self._replication_retention_long_term_per_day,
+            target_days=self._replication_retention_long_term
+        )
+        return replication_retention
+
+    @log_debug_trace
+    def _get_latest_replicated_vol_snap(self,
+                                        array,
+                                        source_array_name,
+                                        pgroup_name,
+                                        vol_name):
+        # NOTE(patrickeast): This currently requires a call with REST API 1.3
+        # if we need to create a temporary FlashArray for this operation.
+        api_version = array.get_rest_version()
+        LOG.debug("Current REST API for array id %(id)s is %(api_version)s",
+                  {"id": array.array_id, "api_version": api_version})
+        if api_version != '1.3':
+            target_array = self._get_flasharray(array._target,
+                                                api_token=array._api_token,
+                                                rest_version='1.3')
+        else:
+            target_array = array
+
+        # Get all protection group snapshots.
+        snap_name = "%s:%s" % (source_array_name, pgroup_name)
+        LOG.debug("Looking for snap %(snap)s on array id %(array_id)s",
+                  {"snap": snap_name, "array_id": target_array.array_id})
+        pg_snaps = target_array.get_pgroup(snap_name, snap=True, transfer=True)
+        LOG.debug("Retrieved snapshots on target %(pg_snaps)s",
+                  {"pg_snaps": pg_snaps})
+
+        # Only use snapshots that are replicated completely.
+        pg_snaps_filtered = [s for s in pg_snaps if s["progress"] == 1]
+        LOG.debug("Filtered list of snapshots %(pg_snaps_filtered)s",
+                  {"pg_snaps_filtered": pg_snaps_filtered})
+
+        # Go through the protection group snapshots, latest first ....
+        #   stop when we find required volume snapshot.
+        pg_snaps_filtered.sort(key=lambda x: x["created"], reverse=True)
+        LOG.debug("Sorted list of snapshots %(pg_snaps_filtered)s",
+                  {"pg_snaps_filtered": pg_snaps_filtered})
+        volume_snap = None
+        vol_snap_source_to_find = "%s:%s" % (source_array_name, vol_name)
+
+        LOG.debug("Searching for snapshot of volume %(vol)s on array "
+                  "%(array)s.",
+                  {"vol": vol_snap_source_to_find, "array": array.array_name})
+        for pg_snap in pg_snaps_filtered:
+            # Get volume snapshots inside the replicated PG snapshot.
+            volume_snaps = target_array.get_volume(pg_snap["name"],
+                                                   snap=True,
+                                                   pgroup=True)
+            for snap in volume_snaps:
+                LOG.debug("Examining snapshot %(snap)s.", {"snap": snap})
+                if snap["source"] == vol_snap_source_to_find:
+                    volume_snap = snap
+                    break
+            if volume_snap:  # Found the volume snapshot we needed.
+                    LOG.debug("Found snapshot for volume %(vol)s in "
+                              "snap %(snap)s.",
+                              {"snap": pg_snap["name"],
+                               "vol": vol_snap_source_to_find})
+                    break
+        return volume_snap
+
+    @log_debug_trace
+    def _create_protection_group_if_not_exist(self, source_array, pgname):
+        try:
+            source_array.create_pgroup(pgname)
+        except purestorage.PureHTTPError as err:
+            with excutils.save_and_reraise_exception() as ctxt:
+                if err.code == 400 and ERR_MSG_ALREADY_EXISTS in err.text:
+                    # Happens if the PG already exists
+                    ctxt.reraise = False
+                    LOG.warning(_LW("Skipping creation of PG %s since it "
+                                    "already exists."), pgname)
+                    # We assume PG has already been setup with correct
+                    # replication settings.
+                    return
+                if err.code == 400 and (
+                        ERR_MSG_PENDING_ERADICATION in err.text):
+                    ctxt.reraise = False
+                    LOG.warning(_LW("Protection group %s is deleted but not"
+                                    " eradicated - will recreate."), pgname)
+                    source_array.eradicate_pgroup(pgname)
+                    source_array.create_pgroup(pgname)
+
+    def _is_volume_replicated_type(self, volume):
+        ctxt = context.get_admin_context()
+        volume_type = volume_types.get_volume_type(ctxt,
+                                                   volume["volume_type_id"])
+        replication_flag = False
+        specs = volume_type.get("extra_specs")
+        if specs and EXTRA_SPECS_REPL_ENABLED in specs:
+            replication_capability = specs[EXTRA_SPECS_REPL_ENABLED]
+            # Do not validate settings, ignore invalid.
+            replication_flag = (replication_capability == "<is> True")
+        return replication_flag
+
 
 class PureISCSIDriver(PureBaseVolumeDriver, san.SanISCSIDriver):
 
@@ -840,12 +1446,9 @@ class PureISCSIDriver(PureBaseVolumeDriver, san.SanISCSIDriver):
         super(PureISCSIDriver, self).__init__(execute=execute, *args, **kwargs)
         self._storage_protocol = "iSCSI"
 
-    def do_setup(self, context):
-        super(PureISCSIDriver, self).do_setup(context)
-
-    def _get_host(self, connector):
+    def _get_host(self, array, connector):
         """Return dict describing existing Purity host object or None."""
-        hosts = self._array.list_hosts()
+        hosts = array.list_hosts()
         for host in hosts:
             if connector["initiator"] in host["iqn"]:
                 return host
@@ -895,7 +1498,7 @@ class PureISCSIDriver(PureBaseVolumeDriver, san.SanISCSIDriver):
             target_iqns.append(port["iqn"])
             target_portals.append(port["portal"])
 
-        # If we have multiple ports always report them
+        # If we have multiple ports always report them.
         if target_luns and target_iqns and target_portals:
             props["data"]["target_luns"] = target_luns
             props["data"]["target_iqns"] = target_iqns
@@ -944,8 +1547,9 @@ class PureISCSIDriver(PureBaseVolumeDriver, san.SanISCSIDriver):
             (chap_username, chap_password, initiator_update) = \
                 self._get_chap_credentials(connector['host'], initiator_data)
 
+        current_array = self._get_current_array(volume)
         vol_name = self._get_vol_name(volume)
-        host = self._get_host(connector)
+        host = self._get_host(current_array, connector)
 
         if host:
             host_name = host["name"]
@@ -974,14 +1578,16 @@ class PureISCSIDriver(PureBaseVolumeDriver, san.SanISCSIDriver):
             host_name = self._generate_purity_host_name(connector["host"])
             LOG.info(_LI("Creating host object %(host_name)r with IQN:"
                          " %(iqn)s."), {"host_name": host_name, "iqn": iqn})
-            self._array.create_host(host_name, iqnlist=[iqn])
+            current_array.create_host(host_name, iqnlist=[iqn])
 
             if self.configuration.use_chap_auth:
-                self._array.set_host(host_name,
-                                     host_user=chap_username,
-                                     host_password=chap_password)
+                current_array.set_host(host_name,
+                                       host_user=chap_username,
+                                       host_password=chap_password)
 
-        connection = self._connect_host_to_vol(host_name, vol_name)
+        connection = self._connect_host_to_vol(current_array,
+                                               host_name,
+                                               vol_name)
 
         if self.configuration.use_chap_auth:
             connection["auth_username"] = chap_username
@@ -1003,29 +1609,27 @@ class PureFCDriver(PureBaseVolumeDriver, driver.FibreChannelDriver):
         self._storage_protocol = "FC"
         self._lookup_service = fczm_utils.create_lookup_service()
 
-    def do_setup(self, context):
-        super(PureFCDriver, self).do_setup(context)
-
-    def _get_host(self, connector):
+    def _get_host(self, array, connector):
         """Return dict describing existing Purity host object or None."""
-        hosts = self._array.list_hosts()
+        hosts = array.list_hosts()
         for host in hosts:
             for wwn in connector["wwpns"]:
                 if wwn in str(host["wwn"]).lower():
                     return host
 
-    def _get_array_wwns(self):
+    @staticmethod
+    def _get_array_wwns(array):
         """Return list of wwns from the array"""
-        ports = self._array.list_ports()
+        ports = array.list_ports()
         return [port["wwn"] for port in ports if port["wwn"]]
 
     @fczm_utils.AddFCZone
     @log_debug_trace
     def initialize_connection(self, volume, connector, initiator_data=None):
         """Allow connection to connector and return connection info."""
-
+        current_array = self._get_current_array(volume)
         connection = self._connect(volume, connector)
-        target_wwns = self._get_array_wwns()
+        target_wwns = self._get_array_wwns(current_array)
         init_targ_map = self._build_initiator_target_map(target_wwns,
                                                          connector)
         properties = {
@@ -1047,8 +1651,9 @@ class PureFCDriver(PureBaseVolumeDriver, driver.FibreChannelDriver):
         """Connect the host and volume; return dict describing connection."""
         wwns = connector["wwpns"]
 
+        current_array = self._get_current_array(volume)
         vol_name = self._get_vol_name(volume)
-        host = self._get_host(connector)
+        host = self._get_host(current_array, connector)
 
         if host:
             host_name = host["name"]
@@ -1058,9 +1663,9 @@ class PureFCDriver(PureBaseVolumeDriver, driver.FibreChannelDriver):
             host_name = self._generate_purity_host_name(connector["host"])
             LOG.info(_LI("Creating host object %(host_name)r with WWN:"
                          " %(wwn)s."), {"host_name": host_name, "wwn": wwns})
-            self._array.create_host(host_name, wwnlist=wwns)
+            current_array.create_host(host_name, wwnlist=wwns)
 
-        return self._connect_host_to_vol(host_name, vol_name)
+        return self._connect_host_to_vol(current_array, host_name, vol_name)
 
     def _build_initiator_target_map(self, target_wwns, connector):
         """Build the target_wwns and the initiator target map."""
@@ -1090,12 +1695,15 @@ class PureFCDriver(PureBaseVolumeDriver, driver.FibreChannelDriver):
     @log_debug_trace
     def terminate_connection(self, volume, connector, **kwargs):
         """Terminate connection."""
-        no_more_connections = self._disconnect(volume, connector, **kwargs)
+        current_array = self._get_current_array(volume)
+
+        no_more_connections = self._disconnect(current_array, volume,
+                                               connector, **kwargs)
 
         properties = {"driver_volume_type": "fibre_channel", "data": {}}
 
         if no_more_connections:
-            target_wwns = self._get_array_wwns()
+            target_wwns = self._get_array_wwns(current_array)
             init_targ_map = self._build_initiator_target_map(target_wwns,
                                                              connector)
             properties["data"] = {"target_wwn": target_wwns,
diff --git a/releasenotes/notes/pure-v2-replication-0246223caaa8a9b5.yaml b/releasenotes/notes/pure-v2-replication-0246223caaa8a9b5.yaml
new file mode 100644 (file)
index 0000000..817dcf7
--- /dev/null
@@ -0,0 +1,3 @@
+---
+features:
+  - Added v2 replication support to the Pure Volume driver.
\ No newline at end of file