from oslo.config import cfg
+from neutron.common import exceptions as n_exception
from neutron.common import rpc as q_rpc
from neutron.common import topics
from neutron import context as neutron_context
)
+class FirewallCountExceeded(n_exception.NeutronException):
+
+ """Reference implementation specific exception for firewall count.
+
+ Only one firewall is supported per tenant. When a second
+ firewall is tried to be created, this exception will be raised.
+ """
+ message = _("Exceeded allowed count of firewalls for tenant "
+ "%(tenant_id)s. Only one firewall is supported per tenant.")
+
+
class FirewallPlugin(firewall_db.Firewall_db_mixin):
"""Implementation of the Neutron Firewall Service Plugin.
def create_firewall(self, context, firewall):
LOG.debug(_("create_firewall() called"))
+ tenant_id = self._get_tenant_id_for_create(context,
+ firewall['firewall'])
+ fw_count = self.get_firewalls_count(context)
+ if fw_count:
+ raise FirewallCountExceeded(tenant_id=tenant_id)
firewall['firewall']['status'] = const.PENDING_CREATE
fw = super(FirewallPlugin, self).create_firewall(context, firewall)
fw_with_rules = (
fwp_id = fwp['firewall_policy']['id']
attrs = self._get_test_firewall_attrs()
attrs['firewall_policy_id'] = fwp_id
- with contextlib.nested(self.firewall(
- firewall_policy_id=fwp_id,
- tenant_id=tenant_id,
- admin_state_up=test_db_firewall.ADMIN_STATE_UP,
- no_delete=True), self.firewall(
- firewall_policy_id=fwp_id,
- tenant_id=tenant_id,
- admin_state_up=test_db_firewall.ADMIN_STATE_UP,
- no_delete=True)) as fws:
- fw_list = [fw['firewall'] for fw in fws]
+ with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP,
+ no_delete=True) as fw:
+ fw_list = [fw['firewall']]
f = self.callbacks.get_firewalls_for_tenant_without_rules
res = f(ctx, host='dummy')
for fw in res:
super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS)
self.callbacks = self.plugin.callbacks
+ def test_create_second_firewall_not_permitted(self):
+ with self.firewall(no_delete=True):
+ res = self._create_firewall(
+ None, 'firewall2', description='test',
+ firewall_policy_id=None, admin_state_up=True)
+ self.assertEqual(res.status_int, 500)
+
def test_update_firewall(self):
ctx = context.get_admin_context()
name = "new_firewall1"
fw_id = fw['firewall']['id']
fw_rules = self.plugin._make_firewall_dict_with_rules(ctx, fw_id)
self.assertEqual(fw_rules['firewall_rule_list'], [])
+
+ def test_list_firewalls(self):
+ with self.firewall_policy(no_delete=True) as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ with self.firewall(name='fw1', firewall_policy_id=fwp_id,
+ description='fw') as fwalls:
+ self._test_list_resources('firewall', [fwalls],
+ query_params='description=fw')