from neutron.db import l3_db
from neutron.db import l3_gwmode_db
from neutron.db import models_v2
+from neutron.db import portbindings_db
from neutron.db import portsecurity_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_db
from neutron.extensions import extraroute
from neutron.extensions import l3
+from neutron.extensions import portbindings as pbin
from neutron.extensions import portsecurity as psec
from neutron.extensions import providernet as pnet
from neutron.extensions import securitygroup as ext_sg
class NvpPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
+ portbindings_db.PortBindingMixin,
portsecurity_db.PortSecurityDbMixin,
securitygroups_db.SecurityGroupDbMixin,
mac_db.MacLearningDbMixin,
"""
supported_extension_aliases = ["agent",
+ "binding",
"dhcp_agent_scheduler",
"ext_gw_mode",
"extraroute",
self.nvp_opts.concurrent_connections,
self.nvp_opts.nvp_gen_timeout)
+ self.extra_binding_dict = {
+ pbin.VIF_TYPE: pbin.VIF_TYPE_OVS,
+ pbin.CAPABILITIES: {
+ pbin.CAP_PORT_FILTER:
+ 'security-group' in self.supported_extension_aliases}}
+
db.configure_db()
# Extend the fault map
self._extend_fault_map()
return net
def get_ports(self, context, filters=None, fields=None):
- if filters is None:
- filters = {}
+ filters = filters or {}
with context.session.begin(subtransactions=True):
neutron_lports = super(NvpPluginV2, self).get_ports(
context, filters)
del port_data[ext_qos.QUEUE]
self._extend_port_port_security_dict(context, port_data)
self._extend_port_qos_queue(context, port_data)
+ self._process_portbindings_create_and_update(context,
+ port, port_data)
net = self.get_network(context, port_data['network_id'])
self.schedule_network(context, net)
if notify_dhcp_agent:
# remove since it will be added in extend based on policy
del ret_port[ext_qos.QUEUE]
self._extend_port_qos_queue(context, ret_port)
+ self._process_portbindings_create_and_update(context,
+ port['port'],
+ port)
return ret_port
def delete_port(self, context, id, l3_port_check=True,
import neutron.common.test_lib as test_lib
from neutron import context
from neutron.extensions import l3
+from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
from neutron.extensions import securitygroup as secgrp
from neutron import manager
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira.NvpApiClient import NVPVersion
from neutron.plugins.nicira import nvplib
+from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.nicira import fake_nvpapiclient
import neutron.tests.unit.nicira.test_networkgw as test_l2_gw
import neutron.tests.unit.test_db_plugin as test_plugin
pass
-class TestNiciraPortsV2(test_plugin.TestPortsV2, NiciraPluginV2TestCase):
+class TestNiciraPortsV2(test_plugin.TestPortsV2,
+ NiciraPluginV2TestCase,
+ test_bindings.PortBindingsTestCase):
+
+ VIF_TYPE = portbindings.VIF_TYPE_OVS
+ HAS_PORT_FILTER = True
def test_exhaust_ports_overlay_network(self):
cfg.CONF.set_override('max_lp_per_overlay_ls', 1, group='NVP')