{'admin_state_up': admin_state_up},
host)
- def notify(self, context, data, methodname):
+ def notify(self, context, data, method_name):
# data is {'key' : 'value'} with only one key
- if methodname not in self.VALID_METHOD_NAMES:
+ if method_name not in self.VALID_METHOD_NAMES:
return
obj_type = data.keys()[0]
if obj_type not in self.VALID_RESOURCES:
network_id = obj_value['network_id']
if not network_id:
return
- methodname = methodname.replace(".", "_")
- if methodname.endswith("_delete_end"):
+ method_name = method_name.replace(".", "_")
+ if method_name.endswith("_delete_end"):
if 'id' in obj_value:
- self._notification(context, methodname,
+ self._notification(context, method_name,
{obj_type + '_id': obj_value['id']},
network_id)
else:
- self._notification(context, methodname, data, network_id)
+ self._notification(context, method_name, data, network_id)
def _send_dhcp_notification(self, context, data, methodname):
if cfg.CONF.dhcp_agent_notification:
- self._dhcp_agent_notifier.notify(context, data, methodname)
+ if self._collection in data:
+ for body in data[self._collection]:
+ item = {self._resource: body}
+ self._dhcp_agent_notifier.notify(context, item, methodname)
+ else:
+ self._dhcp_agent_notifier.notify(context, data, methodname)
def index(self, request, **kwargs):
"""Returns a list of the requested entity."""
from neutron.api import api_common
from neutron.api.extensions import PluginAwareExtensionManager
+from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.api.v2 import base as v2_base
from neutron.api.v2 import router
notification_level='DEBUG')
+class DHCPNotificationTest(APIv2TestBase):
+ def _test_dhcp_notifier(self, opname, resource, initial_input=None):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = initial_input
+ instance.get_networks_count.return_value = 0
+ expected_code = exc.HTTPCreated.code
+ with mock.patch.object(dhcp_rpc_agent_api.DhcpAgentNotifyAPI,
+ 'notify') as dhcp_notifier:
+ if opname == 'create':
+ res = self.api.post_json(
+ _get_path('networks'),
+ initial_input)
+ if opname == 'update':
+ res = self.api.put_json(
+ _get_path('networks', id=_uuid()),
+ initial_input)
+ expected_code = exc.HTTPOk.code
+ if opname == 'delete':
+ res = self.api.delete(_get_path('networks', id=_uuid()))
+ expected_code = exc.HTTPNoContent.code
+ expected_item = mock.call(mock.ANY, mock.ANY,
+ resource + "." + opname + ".end")
+ if initial_input and resource not in initial_input:
+ resource += 's'
+ num = len(initial_input[resource]) if initial_input and isinstance(
+ initial_input[resource], list) else 1
+ expected = [expected_item for x in xrange(num)]
+ self.assertEqual(expected, dhcp_notifier.call_args_list)
+ self.assertEqual(num, dhcp_notifier.call_count)
+ self.assertEqual(expected_code, res.status_int)
+
+ def test_network_create_dhcp_notifer(self):
+ input = {'network': {'name': 'net',
+ 'tenant_id': _uuid()}}
+ self._test_dhcp_notifier('create', 'network', input)
+
+ def test_network_delete_dhcp_notifer(self):
+ self._test_dhcp_notifier('delete', 'network')
+
+ def test_network_update_dhcp_notifer(self):
+ input = {'network': {'name': 'net'}}
+ self._test_dhcp_notifier('update', 'network', input)
+
+ def test_networks_create_bulk_dhcp_notifer(self):
+ input = {'networks': [{'name': 'net1',
+ 'tenant_id': _uuid()},
+ {'name': 'net2',
+ 'tenant_id': _uuid()}]}
+ self._test_dhcp_notifier('create', 'network', input)
+
+
class QuotaTest(APIv2TestBase):
def test_create_network_quota(self):
cfg.CONF.set_override('quota_network', 1, group='QUOTAS')