def create_network(self, context, network):
result, mech_context = self._create_network_with_retries(context,
network)
+ self._notify_registry(
+ resources.NETWORK, events.AFTER_CREATE, context, result)
try:
self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
def create_network_bulk(self, context, networks):
objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
+
+ for obj in objects:
+ self._notify_registry(resources.NETWORK,
+ events.AFTER_CREATE,
+ context,
+ obj)
return [obj['result'] for obj in objects]
def update_network(self, context, id, network):
original_network=original_network)
self.mechanism_manager.update_network_precommit(mech_context)
+ # Notifications must be sent after the above transaction is complete
+ self._notify_registry(
+ resources.NETWORK, events.AFTER_UPDATE, context, updated_network)
+
# TODO(apech) - handle errors raised by update_network, potentially
# by re-calling update_network with the previous attributes. For
# now the error is propogated to the caller, which is expected to
if port:
return port.id
return device
+
+ def _notify_registry(self, resource_type, event_type, context, resource):
+ kwargs = {
+ 'context': context,
+ resource_type: resource,
+ }
+ registry.notify(resource_type, event_type, self, **kwargs)
# run the transaction balancing function defined in this test
plugin.delete_port(self.context, 'fake_id')
self.assertTrue(self.notify.call_count)
+
+
+class TestMl2PluginCreateUpdateNetwork(base.BaseTestCase):
+ def setUp(self):
+ super(TestMl2PluginCreateUpdateNetwork, self).setUp()
+ self.context = mock.MagicMock()
+ self.notify_p = mock.patch('neutron.callbacks.registry.notify')
+ self.notify = self.notify_p.start()
+
+ def _ensure_transaction_is_closed(self):
+ transaction = self.context.session.begin(subtransactions=True)
+ enter = transaction.__enter__.call_count
+ exit = transaction.__exit__.call_count
+ self.assertEqual(enter, exit)
+
+ def _create_plugin_for_create_update_network(self):
+ plugin = ml2_plugin.Ml2Plugin()
+ plugin.extension_manager = mock.Mock()
+ plugin.type_manager = mock.Mock()
+ plugin.mechanism_manager = mock.Mock()
+ plugin.notifier = mock.Mock()
+ mock.patch('neutron.extensions.providernet.'
+ '_raise_if_updates_provider_attributes').start()
+
+ self.notify.side_effect = (
+ lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
+
+ return plugin
+
+ def test_create_network_rpc_outside_transaction(self):
+ with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
+ mock.patch.object(base_plugin.NeutronDbPluginV2,
+ 'create_network'):
+ init.return_value = None
+
+ plugin = self._create_plugin_for_create_update_network()
+
+ plugin.create_network(self.context, mock.MagicMock())
+
+ kwargs = {'context': self.context, 'network': mock.ANY}
+ self.notify.assert_called_once_with('network', 'after_create',
+ plugin, **kwargs)
+
+ def test_create_network_bulk_rpc_outside_transaction(self):
+ with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
+ mock.patch.object(base_plugin.NeutronDbPluginV2,
+ 'create_network'):
+ init.return_value = None
+
+ plugin = self._create_plugin_for_create_update_network()
+
+ plugin.create_network_bulk(self.context,
+ {'networks':
+ [mock.MagicMock(), mock.MagicMock()]})
+
+ self.assertEqual(2, self.notify.call_count)
+
+ def test_update_network_rpc_outside_transaction(self):
+ with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
+ mock.patch.object(base_plugin.NeutronDbPluginV2,
+ 'update_network'):
+ init.return_value = None
+ plugin = self._create_plugin_for_create_update_network()
+
+ plugin.update_network(self.context, 'fake_id', mock.MagicMock())
+
+ kwargs = {
+ 'context': self.context,
+ 'network': mock.ANY,
+ }
+ self.notify.assert_called_once_with('network', 'after_update',
+ plugin, **kwargs)