From: David Ripton Date: Wed, 16 Jan 2013 14:14:19 +0000 (-0500) Subject: Fix line endings from CRLF to LF. X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=d8b0f10ec7a5e0726d1b09e84e6cc388d957ba03;p=openstack-build%2Fneutron-build.git Fix line endings from CRLF to LF. Fixes bug #1100234 Change-Id: Idb28558547b69ed4ad64334651405dc70fa9ddb3 --- diff --git a/quantum/plugins/hyperv/common/__init__.py b/quantum/plugins/hyperv/common/__init__.py index c5618535c..7ef4e09fa 100644 --- a/quantum/plugins/hyperv/common/__init__.py +++ b/quantum/plugins/hyperv/common/__init__.py @@ -1,16 +1,16 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 Cloudbase Solutions SRL -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudbase Solutions SRL +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/quantum/tests/unit/hyperv/__init__.py b/quantum/tests/unit/hyperv/__init__.py index c5618535c..7ef4e09fa 100644 --- a/quantum/tests/unit/hyperv/__init__.py +++ b/quantum/tests/unit/hyperv/__init__.py @@ -1,16 +1,16 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 Cloudbase Solutions SRL -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudbase Solutions SRL +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/quantum/tests/unit/hyperv/test_hyperv_quantum_agent.py b/quantum/tests/unit/hyperv/test_hyperv_quantum_agent.py index aa8184f88..f58fe232f 100644 --- a/quantum/tests/unit/hyperv/test_hyperv_quantum_agent.py +++ b/quantum/tests/unit/hyperv/test_hyperv_quantum_agent.py @@ -1,113 +1,113 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 Cloudbase Solutions SRL -# Copyright 2013 Pedro Navarro Perez -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit tests for Windows Hyper-V virtual switch quantum driver -""" - +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudbase Solutions SRL +# Copyright 2013 Pedro Navarro Perez +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit tests for Windows Hyper-V virtual switch quantum driver +""" + import sys - + import mock import unittest2 as unittest - + from quantum.openstack.common import cfg from quantum.plugins.hyperv.agent import hyperv_quantum_agent - - -class TestHyperVQuantumAgent(unittest.TestCase): - - def setUp(self): - self.addCleanup(cfg.CONF.reset) - # Avoid rpc initialization for unit tests - cfg.CONF.set_override('rpc_backend', - 'quantum.openstack.common.rpc.impl_fake') - self.agent = hyperv_quantum_agent.HyperVQuantumAgent() - self.agent.plugin_rpc = mock.Mock() - self.agent.context = mock.Mock() - self.agent.agent_id = mock.Mock() - self.agent._utils = mock.Mock() - - def tearDown(self): - cfg.CONF.reset() - - def test_port_bound(self): - port = mock.Mock() - net_uuid = 'my-net-uuid' - with mock.patch.object( - self.agent._utils, 'connect_vnic_to_vswitch'): - with mock.patch.object( - self.agent._utils, 'set_vswitch_port_vlan_id'): - self.agent._port_bound(port, net_uuid, 'vlan', None, None) - - def test_port_unbound(self): - map = { - 'network_type': 'vlan', - 'vswitch_name': 'fake-vswitch', - 'ports': [], - 'vlan_id': 1} - net_uuid = 'my-net-uuid' - network_vswitch_map = (net_uuid, map) - with mock.patch.object(self.agent, - '_get_network_vswitch_map_by_port_id', - return_value=network_vswitch_map): - with mock.patch.object( - self.agent._utils, - 'disconnect_switch_port'): - self.agent._port_unbound(net_uuid) - - def test_treat_devices_added_returns_true_for_missing_device(self): - attrs = {'get_device_details.side_effect': Exception()} - self.agent.plugin_rpc.configure_mock(**attrs) - self.assertTrue(self.agent._treat_devices_added([{}])) - - def mock_treat_devices_added(self, details, func_name): - """ - :param details: the details to return for the device - :param func_name: the function that should be called - :returns: whether the named function was called - """ - attrs = {'get_device_details.return_value': details} - self.agent.plugin_rpc.configure_mock(**attrs) - with mock.patch.object(self.agent, func_name) as func: - self.assertFalse(self.agent._treat_devices_added([{}])) - return func.called - - def test_treat_devices_added_updates_known_port(self): - details = mock.MagicMock() - details.__contains__.side_effect = lambda x: True - self.assertTrue(self.mock_treat_devices_added(details, - '_treat_vif_port')) - - def test_treat_devices_removed_returns_true_for_missing_device(self): - attrs = {'update_device_down.side_effect': Exception()} - self.agent.plugin_rpc.configure_mock(**attrs) - self.assertTrue(self.agent._treat_devices_removed([{}])) - - def mock_treat_devices_removed(self, port_exists): - details = dict(exists=port_exists) - attrs = {'update_device_down.return_value': details} - self.agent.plugin_rpc.configure_mock(**attrs) - with mock.patch.object(self.agent, '_port_unbound') as func: - self.assertFalse(self.agent._treat_devices_removed([{}])) - self.assertEqual(func.called, not port_exists) - - def test_treat_devices_removed_unbinds_port(self): - self.mock_treat_devices_removed(False) - - def test_treat_devices_removed_ignores_missing_port(self): - self.mock_treat_devices_removed(False) + + +class TestHyperVQuantumAgent(unittest.TestCase): + + def setUp(self): + self.addCleanup(cfg.CONF.reset) + # Avoid rpc initialization for unit tests + cfg.CONF.set_override('rpc_backend', + 'quantum.openstack.common.rpc.impl_fake') + self.agent = hyperv_quantum_agent.HyperVQuantumAgent() + self.agent.plugin_rpc = mock.Mock() + self.agent.context = mock.Mock() + self.agent.agent_id = mock.Mock() + self.agent._utils = mock.Mock() + + def tearDown(self): + cfg.CONF.reset() + + def test_port_bound(self): + port = mock.Mock() + net_uuid = 'my-net-uuid' + with mock.patch.object( + self.agent._utils, 'connect_vnic_to_vswitch'): + with mock.patch.object( + self.agent._utils, 'set_vswitch_port_vlan_id'): + self.agent._port_bound(port, net_uuid, 'vlan', None, None) + + def test_port_unbound(self): + map = { + 'network_type': 'vlan', + 'vswitch_name': 'fake-vswitch', + 'ports': [], + 'vlan_id': 1} + net_uuid = 'my-net-uuid' + network_vswitch_map = (net_uuid, map) + with mock.patch.object(self.agent, + '_get_network_vswitch_map_by_port_id', + return_value=network_vswitch_map): + with mock.patch.object( + self.agent._utils, + 'disconnect_switch_port'): + self.agent._port_unbound(net_uuid) + + def test_treat_devices_added_returns_true_for_missing_device(self): + attrs = {'get_device_details.side_effect': Exception()} + self.agent.plugin_rpc.configure_mock(**attrs) + self.assertTrue(self.agent._treat_devices_added([{}])) + + def mock_treat_devices_added(self, details, func_name): + """ + :param details: the details to return for the device + :param func_name: the function that should be called + :returns: whether the named function was called + """ + attrs = {'get_device_details.return_value': details} + self.agent.plugin_rpc.configure_mock(**attrs) + with mock.patch.object(self.agent, func_name) as func: + self.assertFalse(self.agent._treat_devices_added([{}])) + return func.called + + def test_treat_devices_added_updates_known_port(self): + details = mock.MagicMock() + details.__contains__.side_effect = lambda x: True + self.assertTrue(self.mock_treat_devices_added(details, + '_treat_vif_port')) + + def test_treat_devices_removed_returns_true_for_missing_device(self): + attrs = {'update_device_down.side_effect': Exception()} + self.agent.plugin_rpc.configure_mock(**attrs) + self.assertTrue(self.agent._treat_devices_removed([{}])) + + def mock_treat_devices_removed(self, port_exists): + details = dict(exists=port_exists) + attrs = {'update_device_down.return_value': details} + self.agent.plugin_rpc.configure_mock(**attrs) + with mock.patch.object(self.agent, '_port_unbound') as func: + self.assertFalse(self.agent._treat_devices_removed([{}])) + self.assertEqual(func.called, not port_exists) + + def test_treat_devices_removed_unbinds_port(self): + self.mock_treat_devices_removed(False) + + def test_treat_devices_removed_ignores_missing_port(self): + self.mock_treat_devices_removed(False) diff --git a/quantum/tests/unit/hyperv/test_hyperv_quantum_plugin.py b/quantum/tests/unit/hyperv/test_hyperv_quantum_plugin.py index c278f4986..c6911b885 100644 --- a/quantum/tests/unit/hyperv/test_hyperv_quantum_plugin.py +++ b/quantum/tests/unit/hyperv/test_hyperv_quantum_plugin.py @@ -1,88 +1,88 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 Cloudbase Solutions SRL -# Copyright 2013 Pedro Navarro Perez -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudbase Solutions SRL +# Copyright 2013 Pedro Navarro Perez +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import contextlib - + from quantum import context from quantum.extensions import portbindings from quantum.manager import QuantumManager from quantum.openstack.common import cfg from quantum.tests.unit import test_db_plugin as test_plugin - - -class HyperVQuantumPluginTestCase(test_plugin.QuantumDbPluginV2TestCase): - - _plugin_name = ('quantum.plugins.hyperv.' - 'hyperv_quantum_plugin.HyperVQuantumPlugin') - - def setUp(self): - super(HyperVQuantumPluginTestCase, self).setUp(self._plugin_name) - - -class TestHyperVVirtualSwitchBasicGet( - test_plugin.TestBasicGet, HyperVQuantumPluginTestCase): - pass - - -class TestHyperVVirtualSwitchV2HTTPResponse( - test_plugin.TestV2HTTPResponse, HyperVQuantumPluginTestCase): - pass - - -class TestHyperVVirtualSwitchPortsV2( - test_plugin.TestPortsV2, HyperVQuantumPluginTestCase): - def test_port_vif_details(self): - plugin = QuantumManager.get_plugin() - with self.port(name='name') as port: - port_id = port['port']['id'] - self.assertEqual(port['port']['binding:vif_type'], - portbindings.VIF_TYPE_HYPERV) - # By default user is admin - now test non admin user - ctx = context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False, - read_deleted="no") - non_admin_port = plugin.get_port(ctx, port_id) - self.assertTrue('status' in non_admin_port) - self.assertFalse('binding:vif_type' in non_admin_port) - - def test_ports_vif_details(self): - cfg.CONF.set_default('allow_overlapping_ips', True) - plugin = QuantumManager.get_plugin() - with contextlib.nested(self.port(), self.port()) as (port1, port2): - ctx = context.get_admin_context() - ports = plugin.get_ports(ctx) - self.assertEqual(len(ports), 2) - for port in ports: - self.assertEqual(port['binding:vif_type'], - portbindings.VIF_TYPE_HYPERV) - # By default user is admin - now test non admin user - ctx = context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False, - read_deleted="no") - ports = plugin.get_ports(ctx) - self.assertEqual(len(ports), 2) - for non_admin_port in ports: - self.assertTrue('status' in non_admin_port) - self.assertFalse('binding:vif_type' in non_admin_port) - - -class TestHyperVVirtualSwitchNetworksV2( - test_plugin.TestNetworksV2, HyperVQuantumPluginTestCase): - pass + + +class HyperVQuantumPluginTestCase(test_plugin.QuantumDbPluginV2TestCase): + + _plugin_name = ('quantum.plugins.hyperv.' + 'hyperv_quantum_plugin.HyperVQuantumPlugin') + + def setUp(self): + super(HyperVQuantumPluginTestCase, self).setUp(self._plugin_name) + + +class TestHyperVVirtualSwitchBasicGet( + test_plugin.TestBasicGet, HyperVQuantumPluginTestCase): + pass + + +class TestHyperVVirtualSwitchV2HTTPResponse( + test_plugin.TestV2HTTPResponse, HyperVQuantumPluginTestCase): + pass + + +class TestHyperVVirtualSwitchPortsV2( + test_plugin.TestPortsV2, HyperVQuantumPluginTestCase): + def test_port_vif_details(self): + plugin = QuantumManager.get_plugin() + with self.port(name='name') as port: + port_id = port['port']['id'] + self.assertEqual(port['port']['binding:vif_type'], + portbindings.VIF_TYPE_HYPERV) + # By default user is admin - now test non admin user + ctx = context.Context(user_id=None, + tenant_id=self._tenant_id, + is_admin=False, + read_deleted="no") + non_admin_port = plugin.get_port(ctx, port_id) + self.assertTrue('status' in non_admin_port) + self.assertFalse('binding:vif_type' in non_admin_port) + + def test_ports_vif_details(self): + cfg.CONF.set_default('allow_overlapping_ips', True) + plugin = QuantumManager.get_plugin() + with contextlib.nested(self.port(), self.port()) as (port1, port2): + ctx = context.get_admin_context() + ports = plugin.get_ports(ctx) + self.assertEqual(len(ports), 2) + for port in ports: + self.assertEqual(port['binding:vif_type'], + portbindings.VIF_TYPE_HYPERV) + # By default user is admin - now test non admin user + ctx = context.Context(user_id=None, + tenant_id=self._tenant_id, + is_admin=False, + read_deleted="no") + ports = plugin.get_ports(ctx) + self.assertEqual(len(ports), 2) + for non_admin_port in ports: + self.assertTrue('status' in non_admin_port) + self.assertFalse('binding:vif_type' in non_admin_port) + + +class TestHyperVVirtualSwitchNetworksV2( + test_plugin.TestNetworksV2, HyperVQuantumPluginTestCase): + pass diff --git a/quantum/tests/unit/hyperv/test_hyperv_rpcapi.py b/quantum/tests/unit/hyperv/test_hyperv_rpcapi.py index dcb412236..098fcea12 100644 --- a/quantum/tests/unit/hyperv/test_hyperv_rpcapi.py +++ b/quantum/tests/unit/hyperv/test_hyperv_rpcapi.py @@ -1,126 +1,126 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 Cloudbase Solutions SRL -# Copyright 2013 Pedro Navarro Perez -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit Tests for hyperv quantum rpc -""" - +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudbase Solutions SRL +# Copyright 2013 Pedro Navarro Perez +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for hyperv quantum rpc +""" + import mock import unittest2 - + from quantum.agent import rpc as agent_rpc from quantum.common import topics from quantum.openstack.common import context from quantum.openstack.common import rpc from quantum.plugins.hyperv import agent_notifier_api as ana from quantum.plugins.hyperv.common import constants - - -class rpcHyperVApiTestCase(unittest2.TestCase): - - def _test_hyperv_quantum_api( - self, rpcapi, topic, method, rpc_method, **kwargs): - ctxt = context.RequestContext('fake_user', 'fake_project') - expected_retval = 'foo' if method == 'call' else None - expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION - if rpc_method == 'cast' and method == 'run_instance': - kwargs['call'] = False - - rpc_method_mock = mock.Mock() - rpc_method_mock.return_value = expected_retval - setattr(rpc, rpc_method, rpc_method_mock) - - retval = getattr(rpcapi, method)(ctxt, **kwargs) - - self.assertEqual(retval, expected_retval) - - expected_args = [ctxt, topic, expected_msg] - for arg, expected_arg in zip(rpc_method_mock.call_args[0], - expected_args): - self.assertEqual(arg, expected_arg) - - def test_delete_network(self): - rpcapi = ana.AgentNotifierApi(topics.AGENT) - self._test_hyperv_quantum_api( - rpcapi, - topics.get_topic_name( - topics.AGENT, - topics.NETWORK, - topics.DELETE), - 'network_delete', rpc_method='fanout_cast', - network_id='fake_request_spec') - - def test_port_update(self): - rpcapi = ana.AgentNotifierApi(topics.AGENT) - self._test_hyperv_quantum_api( - rpcapi, - topics.get_topic_name( - topics.AGENT, - topics.PORT, - topics.UPDATE), - 'port_update', rpc_method='fanout_cast', - port='fake_port', - network_type='fake_network_type', - segmentation_id='fake_segmentation_id', - physical_network='fake_physical_network') - - def test_port_delete(self): - rpcapi = ana.AgentNotifierApi(topics.AGENT) - self._test_hyperv_quantum_api( - rpcapi, - topics.get_topic_name( - topics.AGENT, - topics.PORT, - topics.DELETE), - 'port_delete', rpc_method='fanout_cast', - port_id='port_id') - - def test_tunnel_update(self): - rpcapi = ana.AgentNotifierApi(topics.AGENT) - self._test_hyperv_quantum_api( - rpcapi, - topics.get_topic_name( - topics.AGENT, - constants.TUNNEL, - topics.UPDATE), - 'tunnel_update', rpc_method='fanout_cast', - tunnel_ip='fake_ip', tunnel_id='fake_id') - - def test_device_details(self): - rpcapi = agent_rpc.PluginApi(topics.PLUGIN) - self._test_hyperv_quantum_api( - rpcapi, topics.PLUGIN, - 'get_device_details', rpc_method='call', - device='fake_device', - agent_id='fake_agent_id') - - def test_update_device_down(self): - rpcapi = agent_rpc.PluginApi(topics.PLUGIN) - self._test_hyperv_quantum_api( - rpcapi, topics.PLUGIN, - 'update_device_down', rpc_method='call', - device='fake_device', - agent_id='fake_agent_id') - - def test_tunnel_sync(self): - rpcapi = agent_rpc.PluginApi(topics.PLUGIN) - self._test_hyperv_quantum_api( - rpcapi, topics.PLUGIN, - 'tunnel_sync', rpc_method='call', - tunnel_ip='fake_tunnel_ip') + + +class rpcHyperVApiTestCase(unittest2.TestCase): + + def _test_hyperv_quantum_api( + self, rpcapi, topic, method, rpc_method, **kwargs): + ctxt = context.RequestContext('fake_user', 'fake_project') + expected_retval = 'foo' if method == 'call' else None + expected_msg = rpcapi.make_msg(method, **kwargs) + expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION + if rpc_method == 'cast' and method == 'run_instance': + kwargs['call'] = False + + rpc_method_mock = mock.Mock() + rpc_method_mock.return_value = expected_retval + setattr(rpc, rpc_method, rpc_method_mock) + + retval = getattr(rpcapi, method)(ctxt, **kwargs) + + self.assertEqual(retval, expected_retval) + + expected_args = [ctxt, topic, expected_msg] + for arg, expected_arg in zip(rpc_method_mock.call_args[0], + expected_args): + self.assertEqual(arg, expected_arg) + + def test_delete_network(self): + rpcapi = ana.AgentNotifierApi(topics.AGENT) + self._test_hyperv_quantum_api( + rpcapi, + topics.get_topic_name( + topics.AGENT, + topics.NETWORK, + topics.DELETE), + 'network_delete', rpc_method='fanout_cast', + network_id='fake_request_spec') + + def test_port_update(self): + rpcapi = ana.AgentNotifierApi(topics.AGENT) + self._test_hyperv_quantum_api( + rpcapi, + topics.get_topic_name( + topics.AGENT, + topics.PORT, + topics.UPDATE), + 'port_update', rpc_method='fanout_cast', + port='fake_port', + network_type='fake_network_type', + segmentation_id='fake_segmentation_id', + physical_network='fake_physical_network') + + def test_port_delete(self): + rpcapi = ana.AgentNotifierApi(topics.AGENT) + self._test_hyperv_quantum_api( + rpcapi, + topics.get_topic_name( + topics.AGENT, + topics.PORT, + topics.DELETE), + 'port_delete', rpc_method='fanout_cast', + port_id='port_id') + + def test_tunnel_update(self): + rpcapi = ana.AgentNotifierApi(topics.AGENT) + self._test_hyperv_quantum_api( + rpcapi, + topics.get_topic_name( + topics.AGENT, + constants.TUNNEL, + topics.UPDATE), + 'tunnel_update', rpc_method='fanout_cast', + tunnel_ip='fake_ip', tunnel_id='fake_id') + + def test_device_details(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_hyperv_quantum_api( + rpcapi, topics.PLUGIN, + 'get_device_details', rpc_method='call', + device='fake_device', + agent_id='fake_agent_id') + + def test_update_device_down(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_hyperv_quantum_api( + rpcapi, topics.PLUGIN, + 'update_device_down', rpc_method='call', + device='fake_device', + agent_id='fake_agent_id') + + def test_tunnel_sync(self): + rpcapi = agent_rpc.PluginApi(topics.PLUGIN) + self._test_hyperv_quantum_api( + rpcapi, topics.PLUGIN, + 'tunnel_sync', rpc_method='call', + tunnel_ip='fake_tunnel_ip')