--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""NEC Port Binding
+
+Revision ID: 2a3bae1ceb8
+Revises: 46a0efbd8f0
+Create Date: 2013-08-22 11:09:19.955386
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '2a3bae1ceb8'
+down_revision = '46a0efbd8f0'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+ 'neutron.plugins.nec.nec_plugin.NECPluginV2'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+from neutron.db import migration
+
+
+def upgrade(active_plugins=None, options=None):
+ if not migration.should_run(active_plugins, migration_for_plugins):
+ return
+
+ op.create_table(
+ 'portbindingports',
+ sa.Column('port_id', sa.String(length=36), nullable=False),
+ sa.Column('host', sa.String(length=255), nullable=False),
+ sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ondelete='CASCADE'),
+ sa.PrimaryKeyConstraint('port_id')
+ )
+ op.create_foreign_key(
+ 'portinfos_ibfk_1',
+ source='portinfos', referent='ports',
+ local_cols=['id'], remote_cols=['id'],
+ ondelete='CASCADE')
+
+
+def downgrade(active_plugins=None, options=None):
+ if not migration.should_run(active_plugins, migration_for_plugins):
+ return
+
+ op.drop_constraint('portinfos_ibfk_1', 'portinfos', 'foreignkey')
+ op.drop_table('portbindingports')
class PortInfoNotFound(qexc.NotFound):
message = _("PortInfo %(id)s could not be found")
+
+
+class ProfilePortInfoInvalidDataPathId(qexc.InvalidInput):
+ message = _('Invalid input for operation: '
+ 'portinfo:datapath_id should be a hex string '
+ 'with at most 8 bytes')
+
+
+class ProfilePortInfoInvalidPortNo(qexc.InvalidInput):
+ message = _('Invalid input for operation: '
+ 'portinfo:port_no should be [0:65535]')
# @author: Ryota MIBU
import sqlalchemy as sa
+from sqlalchemy import orm
from neutron.db import model_base
from neutron.db import models_v2
"""Represents a Filter on OpenFlow Network/Controller."""
-class PortInfo(model_base.BASEV2, models_v2.HasId):
+class PortInfo(model_base.BASEV2):
"""Represents a Virtual Interface."""
+ id = sa.Column(sa.String(36),
+ sa.ForeignKey('ports.id', ondelete="CASCADE"),
+ primary_key=True)
datapath_id = sa.Column(sa.String(36), nullable=False)
port_no = sa.Column(sa.Integer, nullable=False)
vlan_id = sa.Column(sa.Integer, nullable=False)
mac = sa.Column(sa.String(32), nullable=False)
+ port = orm.relationship(
+ models_v2.Port,
+ backref=orm.backref("portinfo",
+ lazy='joined', uselist=False,
+ cascade='delete'))
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
+from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const
from neutron.common import exceptions as q_exc
from neutron.common import rpc as q_rpc
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_base
+from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
packet_filter.PacketFilterMixin,
- portbindings_base.PortBindingBaseMixin):
+ portbindings_db.PortBindingMixin):
"""NECPluginV2 controls an OpenFlow Controller.
The Neutron NECPluginV2 maps L2 logical networks to L2 virtualized networks
'security-group' in self.supported_extension_aliases}}
return binding
+ def _extend_port_dict_binding_portinfo(self, port_res, portinfo):
+ if portinfo:
+ port_res[portbindings.PROFILE] = {
+ 'portinfo:datapath_id': portinfo['datapath_id'],
+ 'portinfo:port_no': portinfo['port_no'],
+ }
+ elif portbindings.PROFILE in port_res:
+ del port_res[portbindings.PROFILE]
+
+ def _validate_portinfo(self, profile):
+ key_specs = {
+ 'portinfo:datapath_id': {'type:string': None, 'required': True},
+ 'portinfo:port_no': {'type:non_negative': None, 'required': True,
+ 'convert_to': attrs.convert_to_int}
+ }
+ msg = attrs._validate_dict_or_empty(profile, key_specs=key_specs)
+ if msg:
+ raise q_exc.InvalidInput(error_message=msg)
+
+ datapath_id = profile.get('portinfo:datapath_id')
+ port_no = profile.get('portinfo:port_no')
+ try:
+ dpid = int(datapath_id, 16)
+ except ValueError:
+ raise nexc.ProfilePortInfoInvalidDataPathId()
+ if dpid > 0xffffffffffffffffL:
+ raise nexc.ProfilePortInfoInvalidDataPathId()
+ # Make sure dpid is a hex string beginning with 0x.
+ dpid = hex(dpid)
+
+ if int(port_no) > 65535:
+ raise nexc.ProfilePortInfoInvalidPortNo()
+
+ return {'datapath_id': dpid, 'port_no': port_no}
+
+ def _process_portbindings_portinfo_create(self, context, port_data, port):
+ """Add portinfo according to bindings:profile in create_port().
+
+ :param context: neutron api request context
+ :param port_data: port attributes passed in PUT request
+ :param port: port attributes to be returned
+ """
+ profile = port_data.get(portbindings.PROFILE)
+ # If portbindings.PROFILE is None, unspecified or an empty dict
+ # it is regarded that portbinding.PROFILE is not set.
+ profile_set = attrs.is_attr_set(profile) and profile
+ if profile_set:
+ portinfo = self._validate_portinfo(profile)
+ portinfo['mac'] = port['mac_address']
+ ndb.add_portinfo(context.session, port['id'], **portinfo)
+ else:
+ portinfo = None
+ self._extend_port_dict_binding_portinfo(port, portinfo)
+
+ def _process_portbindings_portinfo_update(self, context, port_data, port):
+ """Update portinfo according to bindings:profile in update_port().
+
+ :param context: neutron api request context
+ :param port_data: port attributes passed in PUT request
+ :param port: port attributes to be returned
+ :returns: 'ADD', 'MOD', 'DEL' or None
+ """
+ if portbindings.PROFILE not in port_data:
+ return
+ profile = port_data.get(portbindings.PROFILE)
+ # If binding:profile is None or an empty dict,
+ # it means binding:.profile needs to be cleared.
+ # TODO(amotoki): Allow Make None in binding:profile in
+ # the API layer. See LP bug #1220011.
+ profile_set = attrs.is_attr_set(profile) and profile
+ cur_portinfo = ndb.get_portinfo(context.session, port['id'])
+ if profile_set:
+ portinfo = self._validate_portinfo(profile)
+ portinfo_changed = 'ADD'
+ if cur_portinfo:
+ if (portinfo['datapath_id'] == cur_portinfo.datapath_id and
+ portinfo['port_no'] == cur_portinfo.port_no):
+ return
+ ndb.del_portinfo(context.session, port['id'])
+ portinfo_changed = 'MOD'
+ portinfo['mac'] = port['mac_address']
+ ndb.add_portinfo(context.session, port['id'], **portinfo)
+ elif cur_portinfo:
+ portinfo_changed = 'DEL'
+ portinfo = None
+ ndb.del_portinfo(context.session, port['id'])
+ self._extend_port_dict_binding_portinfo(port, portinfo)
+ return portinfo_changed
+
+ def extend_port_dict_binding(self, port_res, port_db):
+ super(NECPluginV2, self).extend_port_dict_binding(port_res, port_db)
+ self._extend_port_dict_binding_portinfo(port_res, port_db.portinfo)
+
+ def _process_portbindings_create(self, context, port_data, port):
+ super(NECPluginV2, self)._process_portbindings_create_and_update(
+ context, port_data, port)
+ self._process_portbindings_portinfo_create(context, port_data, port)
+
+ def _process_portbindings_update(self, context, port_data, port):
+ super(NECPluginV2, self)._process_portbindings_create_and_update(
+ context, port_data, port)
+ portinfo_changed = self._process_portbindings_portinfo_update(
+ context, port_data, port)
+ return portinfo_changed
+
def create_port(self, context, port):
"""Create a new port entry on DB, then try to activate it."""
LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
self._ensure_default_security_group_on_port(context, port)
sgids = self._get_security_groups_on_port(context, port)
port = super(NECPluginV2, self).create_port(context, port)
- self._process_portbindings_create_and_update(context,
- port_data,
- port)
+ self._process_portbindings_create(context, port_data, port)
self._process_port_create_security_group(
context, port, sgids)
self.notify_security_groups_member_updated(context, port)
return self.activate_port_if_ready(context, port)
+ def _update_ofc_port_if_required(self, context, old_port, new_port,
+ portinfo_changed):
+ def get_ofport_exist(port):
+ return (port['admin_state_up'] and
+ bool(port.get(portbindings.PROFILE)))
+
+ # Determine it is required to update OFC port
+ need_add = False
+ need_del = False
+ need_packet_filter_update = False
+
+ old_ofport_exist = get_ofport_exist(old_port)
+ new_ofport_exist = get_ofport_exist(new_port)
+
+ if old_port['admin_state_up'] != new_port['admin_state_up']:
+ if new_port['admin_state_up']:
+ need_add |= new_ofport_exist
+ else:
+ need_del |= old_ofport_exist
+
+ if portinfo_changed:
+ if portinfo_changed in ['DEL', 'MOD']:
+ need_del |= old_ofport_exist
+ if portinfo_changed in ['ADD', 'MOD']:
+ need_add |= new_ofport_exist
+ need_packet_filter_update |= True
+
+ # Update OFC port if required
+ if need_del:
+ self.deactivate_port(context, new_port)
+ if need_packet_filter_update:
+ self.deactivate_packet_filters_by_port(context, id)
+ if need_add:
+ if need_packet_filter_update:
+ self.activate_packet_filters_by_port(context, id)
+ self.activate_port_if_ready(context, new_port)
+
def update_port(self, context, id, port):
"""Update port, and handle packetfilters associated with the port.
with context.session.begin(subtransactions=True):
old_port = super(NECPluginV2, self).get_port(context, id)
new_port = super(NECPluginV2, self).update_port(context, id, port)
- self._process_portbindings_create_and_update(context,
- port['port'],
- new_port)
+ portinfo_changed = self._process_portbindings_update(
+ context, port['port'], new_port)
need_port_update_notify = self.update_security_group_on_port(
context, id, port, old_port, new_port)
if need_port_update_notify:
self.notifier.port_update(context, new_port)
- changed = (old_port['admin_state_up'] != new_port['admin_state_up'])
- if changed:
- if new_port['admin_state_up']:
- new_port = self.activate_port_if_ready(context, new_port)
- else:
- new_port = self.deactivate_port(context, new_port)
-
+ self._update_ofc_port_if_required(context, old_port, new_port,
+ portinfo_changed)
return new_port
def delete_port(self, context, id, l3_port_check=True):
"port_added message (port_id=%s)."), id)
continue
ndb.del_portinfo(session, id)
- ndb.add_portinfo(session, id, datapath_id, p['port_no'],
- mac=p.get('mac', ''))
port = self._get_port(rpc_context, id)
if port:
+ ndb.add_portinfo(session, id, datapath_id, p['port_no'],
+ mac=p.get('mac', ''))
# NOTE: Make sure that packet filters on this port exist while
# the port is active to avoid unexpected packet transfer.
if portinfo:
# under the License.
# @author: Ryota MIBU
+import contextlib
import random
from neutron.common import constants as q_const
-from neutron.db import api as db_api
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec.db import models as nmodels # noqa
-from neutron.tests import base
+from neutron.tests.unit.nec import test_nec_plugin
-class NECPluginV2DBTestBase(base.BaseTestCase):
+class NECPluginV2DBTestBase(test_nec_plugin.NecPluginV2TestCase):
"""Class conisting of NECPluginV2 DB unit tests."""
def setUp(self):
"""Setup for tests."""
super(NECPluginV2DBTestBase, self).setUp()
- ndb.initialize()
- self.session = db_api.get_session()
- self.addCleanup(ndb.clear_db)
+ self.session = self.context.session
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test."""
none = uuidutils.generate_uuid()
return ofc_id, neutron_id, none
- def get_portinfo_random_params(self):
- """create random parameters for portinfo test."""
- port_id = uuidutils.generate_uuid()
- datapath_id = hex(random.randint(0, 0xffffffff))
- port_no = random.randint(1, 100)
- vlan_id = random.randint(q_const.MIN_VLAN_TAG, q_const.MAX_VLAN_TAG)
- mac = ':'.join(["%02x" % random.randint(0, 0xff) for x in range(6)])
- none = uuidutils.generate_uuid()
- return port_id, datapath_id, port_no, vlan_id, mac, none
+ @contextlib.contextmanager
+ def portinfo_random_params(self):
+ with self.port() as port:
+ params = {'port_id': port['port']['id'],
+ 'datapath_id': hex(random.randint(0, 0xffffffff)),
+ 'port_no': random.randint(1, 100),
+ 'vlan_id': random.randint(q_const.MIN_VLAN_TAG,
+ q_const.MAX_VLAN_TAG),
+ 'mac': ':'.join(["%02x" % random.randint(0, 0xff)
+ for x in range(6)])
+ }
+ yield params
class NECPluginV2DBTest(NECPluginV2DBTestBase):
'ofc_tenant', o)
self.assertEqual(None, tenant_none)
+ def _compare_portinfo(self, portinfo, expected):
+ self.assertEqual(portinfo.id, expected['port_id'])
+ self.assertEqual(portinfo.datapath_id, expected['datapath_id'])
+ self.assertEqual(portinfo.port_no, expected['port_no'])
+ self.assertEqual(portinfo.vlan_id, expected['vlan_id'])
+ self.assertEqual(portinfo.mac, expected['mac'])
+
+ def _add_portinfo(self, session, params):
+ return ndb.add_portinfo(session, params['port_id'],
+ params['datapath_id'], params['port_no'],
+ params['vlan_id'], params['mac'])
+
def testd_add_portinfo(self):
"""test add portinfo."""
- i, d, p, v, m, n = self.get_portinfo_random_params()
- portinfo = ndb.add_portinfo(self.session, i, d, p, v, m)
- self.assertEqual(portinfo.id, i)
- self.assertEqual(portinfo.datapath_id, d)
- self.assertEqual(portinfo.port_no, p)
- self.assertEqual(portinfo.vlan_id, v)
- self.assertEqual(portinfo.mac, m)
-
- exception_raised = False
- try:
- ndb.add_portinfo(self.session, i, d, p, v, m)
- except nexc.NECDBException:
- exception_raised = True
- self.assertTrue(exception_raised)
+ with self.portinfo_random_params() as params:
+ portinfo = self._add_portinfo(self.session, params)
+ self._compare_portinfo(portinfo, params)
+
+ exception_raised = False
+ try:
+ self._add_portinfo(self.session, params)
+ except nexc.NECDBException:
+ exception_raised = True
+ self.assertTrue(exception_raised)
def teste_get_portinfo(self):
"""test get portinfo."""
- i, d, p, v, m, n = self.get_portinfo_random_params()
- ndb.add_portinfo(self.session, i, d, p, v, m)
- portinfo = ndb.get_portinfo(self.session, i)
- self.assertEqual(portinfo.id, i)
- self.assertEqual(portinfo.datapath_id, d)
- self.assertEqual(portinfo.port_no, p)
- self.assertEqual(portinfo.vlan_id, v)
- self.assertEqual(portinfo.mac, m)
-
- portinfo_none = ndb.get_portinfo(self.session, n)
- self.assertEqual(None, portinfo_none)
+ with self.portinfo_random_params() as params:
+ self._add_portinfo(self.session, params)
+ portinfo = ndb.get_portinfo(self.session, params['port_id'])
+ self._compare_portinfo(portinfo, params)
+
+ nonexist_id = uuidutils.generate_uuid()
+ portinfo_none = ndb.get_portinfo(self.session, nonexist_id)
+ self.assertEqual(None, portinfo_none)
def testf_del_portinfo(self):
"""test delete portinfo."""
- i, d, p, v, m, n = self.get_portinfo_random_params()
- ndb.add_portinfo(self.session, i, d, p, v, m)
- portinfo = ndb.get_portinfo(self.session, i)
- self.assertEqual(portinfo.id, i)
- ndb.del_portinfo(self.session, i)
- portinfo_none = ndb.get_portinfo(self.session, i)
- self.assertEqual(None, portinfo_none)
+ with self.portinfo_random_params() as params:
+ self._add_portinfo(self.session, params)
+ portinfo = ndb.get_portinfo(self.session, params['port_id'])
+ self.assertEqual(portinfo.id, params['port_id'])
+ ndb.del_portinfo(self.session, params['port_id'])
+ portinfo_none = ndb.get_portinfo(self.session, params['port_id'])
+ self.assertEqual(None, portinfo_none)
class NECPluginV2DBOldMappingTest(NECPluginV2DBTestBase):
from neutron.common import topics
from neutron import context
from neutron.db import db_base_plugin_v2
-from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec import nec_plugin
-from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.nec import fake_ofc_manager
from neutron.tests.unit import test_db_plugin as test_plugin
-from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = 'neutron.plugins.nec.nec_plugin.NECPluginV2'
class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase):
-
- VIF_TYPE = portbindings.VIF_TYPE_OVS
- HAS_PORT_FILTER = True
+ pass
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):
pass
-class TestNecPortBinding(test_bindings.PortBindingsTestCase,
- NecPluginV2TestCase):
- VIF_TYPE = portbindings.VIF_TYPE_OVS
- HAS_PORT_FILTER = True
- FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
-
- def setUp(self):
- test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
- super(TestNecPortBinding, self).setUp()
-
-
-class TestNecPortBindingNoSG(TestNecPortBinding):
- HAS_PORT_FILTER = False
- FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
-
-
class TestNecPortsV2Callback(NecPluginV2TestCase):
def _get_portinfo(self, port_id):
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertIsNone(self._get_portinfo(port_id))
- # The port is expected to delete when exiting with-clause.
+ # The port and portinfo is expected to delete when exiting with-clause.
+ self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
+ self.assertIsNone(self._get_portinfo(port_id))
if not portinfo_delete_first:
- self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
- self.assertIsNotNone(self._get_portinfo(port_id))
self.rpcapi_update_ports(removed=[port_id])
# Ensure port deletion is called once.
def test_portinfo_added_unknown_port(self):
portinfo = {'id': 'dummy-p1', 'port_no': 123}
self.rpcapi_update_ports(added=[portinfo])
- self.assertIsNotNone(ndb.get_portinfo(self.context.session,
- 'dummy-p1'))
+ self.assertIsNone(ndb.get_portinfo(self.context.session,
+ 'dummy-p1'))
self.assertEqual(self.ofc.exists_ofc_port.call_count, 0)
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
# No OFC operations are expected.
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
- self.assertEqual(ndb.get_portinfo(self.context.session,
- port_id).port_no, 456)
+ self.assertIsNone(ndb.get_portinfo(self.context.session, port_id))
def test_portinfo_change(self):
self._test_portinfo_change()
# under the License.
# @author: Ryota MIBU
+import mock
+
from neutron import context
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import config
from neutron.tests import base
+class FakePortInfo(object):
+ def __init__(self, id, datapath_id, port_no=0,
+ vlan_id=65535, mac='00:11:22:33:44:55'):
+ self.data = {'id': id, 'datapath_id': datapath_id,
+ 'port_no': port_no, 'vlan_id': vlan_id, 'mac': mac}
+
+ def __getattr__(self, name):
+ if name in self.fields:
+ return self[name]
+ else:
+ raise AttributeError(name)
+
+
class OFCManagerTestBase(base.BaseTestCase):
"""Class conisting of OFCManager unit tests."""
self.addCleanup(ndb.clear_db)
self.ofc = ofc_manager.OFCManager()
self.ctx = context.get_admin_context()
+ self.addCleanup(mock.patch.stopall)
def get_random_params(self):
"""create random parameters for portinfo test."""
self.ofc.delete_ofc_network(self.ctx, n, {'tenant_id': t})
self.assertFalse(ndb.get_ofc_item(self.ctx.session, 'ofc_network', n))
+ def _mock_get_portinfo(self, port_id, datapath_id='0xabc', port_no=1):
+ get_portinfo = mock.patch.object(ndb, 'get_portinfo').start()
+ fake_portinfo = FakePortInfo(id=port_id, datapath_id=datapath_id,
+ port_no=port_no)
+ get_portinfo.return_value = fake_portinfo
+ return get_portinfo
+
def testg_create_ofc_port(self):
"""test create ofc_port."""
t, n, p, f, none = self.get_random_params()
self.ofc.create_ofc_tenant(self.ctx, t)
self.ofc.create_ofc_network(self.ctx, t, n)
- ndb.add_portinfo(self.ctx.session, p, "0xabc", 1, 65535,
- "00:11:22:33:44:55")
self.assertFalse(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
+ get_portinfo = self._mock_get_portinfo(p)
port = {'tenant_id': t, 'network_id': n}
self.ofc.create_ofc_port(self.ctx, p, port)
self.assertTrue(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
port = ndb.get_ofc_item(self.ctx.session, 'ofc_port', p)
self.assertEqual(port.ofc_id, "ofc-" + p[:-4])
+ get_portinfo.assert_called_once_with(mock.ANY, p)
def testh_exists_ofc_port(self):
"""test exists_ofc_port."""
t, n, p, f, none = self.get_random_params()
self.ofc.create_ofc_tenant(self.ctx, t)
self.ofc.create_ofc_network(self.ctx, t, n)
- ndb.add_portinfo(self.ctx.session, p, "0xabc", 2, 65535,
- "00:12:22:33:44:55")
self.assertFalse(self.ofc.exists_ofc_port(self.ctx, p))
+ get_portinfo = self._mock_get_portinfo(p)
port = {'tenant_id': t, 'network_id': n}
self.ofc.create_ofc_port(self.ctx, p, port)
self.assertTrue(self.ofc.exists_ofc_port(self.ctx, p))
+ get_portinfo.assert_called_once_with(mock.ANY, p)
def testi_delete_ofc_port(self):
"""test delete ofc_port."""
t, n, p, f, none = self.get_random_params()
self.ofc.create_ofc_tenant(self.ctx, t)
self.ofc.create_ofc_network(self.ctx, t, n)
- ndb.add_portinfo(self.ctx.session, p, "0xabc", 3, 65535,
- "00:13:22:33:44:55")
+ get_portinfo = self._mock_get_portinfo(p)
port = {'tenant_id': t, 'network_id': n}
self.ofc.create_ofc_port(self.ctx, p, port)
self.assertTrue(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
self.ofc.delete_ofc_port(self.ctx, p, port)
self.assertFalse(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
+ get_portinfo.assert_called_once_with(mock.ANY, p)
def testj_create_ofc_packet_filter(self):
"""test create ofc_filter."""
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 NEC Corporation
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Akihiro Motoki, NEC Corporation
+
+from testtools import matchers
+from webob import exc
+
+from neutron.common import exceptions as q_exc
+from neutron import context
+from neutron.extensions import portbindings
+from neutron.tests.unit import _test_extension_portbindings as test_bindings
+from neutron.tests.unit.nec import test_nec_plugin
+from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
+
+
+class TestNecPortBinding(test_bindings.PortBindingsTestCase,
+ test_nec_plugin.NecPluginV2TestCase):
+ VIF_TYPE = portbindings.VIF_TYPE_OVS
+ HAS_PORT_FILTER = True
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
+
+ def setUp(self):
+ test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
+ super(TestNecPortBinding, self).setUp()
+
+
+class TestNecPortBindingNoSG(TestNecPortBinding):
+ HAS_PORT_FILTER = False
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
+
+
+class TestNecPortBindingHost(
+ test_bindings.PortBindingsHostTestCaseMixin,
+ test_nec_plugin.NecPluginV2TestCase):
+ pass
+
+
+class TestNecPortBindingPortInfo(test_nec_plugin.NecPluginV2TestCase):
+ def _get_portinfo(self, datapath_id=None, port_no=None, prefix=None):
+ if datapath_id is None:
+ datapath_id = '0xabc'
+ if port_no is None:
+ port_no = 1
+ if prefix is None:
+ prefix = 'portinfo:'
+ return {prefix + 'datapath_id': datapath_id,
+ prefix + 'port_no': port_no}
+
+ def _check_response_portbinding_profile(self, port, datapath_id=None,
+ port_no=None):
+ expected = self._get_portinfo(datapath_id, port_no, prefix='')
+ profile = port[portbindings.PROFILE]
+ self.assertEqual(len(profile), 2)
+ self.assertEqual(profile['portinfo:datapath_id'],
+ expected['datapath_id'])
+ self.assertEqual(profile['portinfo:port_no'],
+ expected['port_no'])
+
+ def _check_response_portbinding_no_profile(self, port):
+ self.assertIn('status', port)
+ self.assertNotIn(portbindings.PROFILE, port)
+
+ def _get_non_admin_context(self):
+ return context.Context(user_id=None,
+ tenant_id=self._tenant_id,
+ is_admin=False,
+ read_deleted="no")
+
+ def test_port_create_portinfo(self):
+ profile_arg = {portbindings.PROFILE: self._get_portinfo()}
+ with self.port(arg_list=(portbindings.PROFILE,),
+ **profile_arg) as port:
+ port_id = port['port']['id']
+ # Check a response of create_port
+ self._check_response_portbinding_profile(port['port'])
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+ # Check a response of get_port
+ ctx = context.get_admin_context()
+ port = self._show('ports', port_id, neutron_context=ctx)['port']
+ self._check_response_portbinding_profile(port)
+ # By default user is admin - now test non admin user
+ ctx = self._get_non_admin_context()
+ non_admin_port = self._show(
+ 'ports', port_id, neutron_context=ctx)['port']
+ self._check_response_portbinding_no_profile(non_admin_port)
+ # port-update with non admin user should fail
+ self._update('ports', port_id,
+ {'port': profile_arg},
+ expected_code=404,
+ neutron_context=ctx)
+
+ def test_port_update_portinfo(self):
+ profile_arg = {portbindings.PROFILE: self._get_portinfo()}
+ with self.port() as port:
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
+ port_id = port['port']['id']
+ # Check a response of create_port
+ self._check_response_portbinding_no_profile(port['port'])
+ # Check a response of update_port
+ ctx = context.get_admin_context()
+ port = self._update('ports', port_id, {'port': profile_arg},
+ neutron_context=ctx)['port']
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+ self._check_response_portbinding_profile(port)
+ port = self._show('ports', port_id, neutron_context=ctx)['port']
+ self._check_response_portbinding_profile(port)
+
+ def test_port_update_portinfo_detail(self):
+ with self.port() as port:
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
+ self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
+ port_id = port['port']['id']
+ ctx = context.get_admin_context()
+
+ # add portinfo
+ profile_arg = {portbindings.PROFILE: self._get_portinfo()}
+ port = self._update('ports', port_id, {'port': profile_arg},
+ neutron_context=ctx)['port']
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+ self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
+
+ # portinfo unchanged
+ port = self._update('ports', port_id, {'port': profile_arg},
+ neutron_context=ctx)['port']
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+ self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
+
+ # modify portinfo
+ profile_arg = {portbindings.PROFILE:
+ self._get_portinfo(datapath_id='0x1234567890',
+ port_no=99)}
+ port = self._update('ports', port_id, {'port': profile_arg},
+ neutron_context=ctx)['port']
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
+ self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
+
+ # delete portinfo
+ profile_arg = {portbindings.PROFILE: {}}
+ port = self._update('ports', port_id, {'port': profile_arg},
+ neutron_context=ctx)['port']
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
+ self.assertEqual(self.ofc.delete_ofc_port.call_count, 2)
+
+ def test_port_create_portinfo_with_empty_dict(self):
+ profile_arg = {portbindings.PROFILE: {}}
+ with self.port(arg_list=(portbindings.PROFILE,),
+ **profile_arg) as port:
+ port_id = port['port']['id']
+
+ # Check a response of create_port
+ self._check_response_portbinding_no_profile(port['port'])
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
+ # add portinfo
+ ctx = context.get_admin_context()
+ profile_arg = {portbindings.PROFILE: self._get_portinfo()}
+ port = self._update('ports', port_id, {'port': profile_arg},
+ neutron_context=ctx)['port']
+ self._check_response_portbinding_profile(port)
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+ self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
+
+ def test_port_create_portinfo_non_admin(self):
+ with self.network(set_context=True, tenant_id='test') as net1:
+ with self.subnet(network=net1) as subnet1:
+ profile_arg = {portbindings.PROFILE: self._get_portinfo()}
+ try:
+ with self.port(subnet=subnet1,
+ expected_res_status=403,
+ arg_list=(portbindings.PROFILE,),
+ set_context=True, tenant_id='test',
+ **profile_arg):
+ pass
+ except exc.HTTPClientError:
+ pass
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
+
+ def test_port_update_portinfo_non_admin(self):
+ profile_arg = {portbindings.PROFILE: self._get_portinfo()}
+ with self.network() as net1:
+ with self.subnet(network=net1) as subnet1:
+ with self.port(subnet=subnet1) as port:
+ # By default user is admin - now test non admin user
+ # Note that 404 is returned when prohibit by policy.
+ # See comment for PolicyNotAuthorized except clause
+ # in update() in neutron.api.v2.base.Controller.
+ port_id = port['port']['id']
+ ctx = self._get_non_admin_context()
+ port = self._update('ports', port_id,
+ {'port': profile_arg},
+ expected_code=404,
+ neutron_context=ctx)
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
+
+ def test_port_create_portinfo_validation_called(self):
+ # Check validate_portinfo is called.
+ profile_arg = {portbindings.PROFILE:
+ {'portinfo:datapath_id': '0xabc',
+ 'portinfo:port_no': 0xffff + 1}}
+ try:
+ with self.port(arg_list=(portbindings.PROFILE,),
+ expected_res_status=400,
+ **profile_arg):
+ pass
+ except exc.HTTPClientError:
+ pass
+
+
+class TestNecPortBindingValidatePortInfo(test_nec_plugin.NecPluginV2TestCase):
+
+ def test_validate_portinfo_ok(self):
+ profile = {'portinfo:datapath_id': '0x1234567890abcdef',
+ 'portinfo:port_no': 123}
+ portinfo = self.plugin._validate_portinfo(profile)
+ self.assertEqual(portinfo['datapath_id'], '0x1234567890abcdef')
+ self.assertEqual(portinfo['port_no'], 123)
+
+ def test_validate_portinfo_ok_without_0x(self):
+ profile = {'portinfo:datapath_id': '1234567890abcdef',
+ 'portinfo:port_no': 123}
+ portinfo = self.plugin._validate_portinfo(profile)
+ self.assertEqual(portinfo['datapath_id'], '0x1234567890abcdef')
+ self.assertEqual(portinfo['port_no'], 123)
+
+ def _test_validate_exception(self, profile, expected_msg):
+ e = self.assertRaises(q_exc.InvalidInput,
+ self.plugin._validate_portinfo, profile)
+ self.assertThat(str(e), matchers.StartsWith(expected_msg))
+
+ def test_validate_portinfo_dict_validation(self):
+ expected_msg = ("Invalid input for operation: "
+ "Validation of dictionary's keys failed.")
+
+ profile = {'portinfo:port_no': 123}
+ self._test_validate_exception(profile, expected_msg)
+
+ profile = {'portinfo:datapath_id': '0xabcdef'}
+ self._test_validate_exception(profile, expected_msg)
+
+ def test_validate_portinfo_negative_port_number(self):
+ profile = {'portinfo:datapath_id': '0x1234567890abcdef',
+ 'portinfo:port_no': -1}
+ expected_msg = ("Invalid input for operation: "
+ "'-1' should be non-negative.")
+ self._test_validate_exception(profile, expected_msg)
+
+ def test_validate_portinfo_invalid_datapath_id(self):
+ expected_msg = ("Invalid input for operation: "
+ "portinfo:datapath_id should be a hex string")
+
+ # non hexidecimal datapath_id
+ profile = {'portinfo:datapath_id': 'INVALID',
+ 'portinfo:port_no': 123}
+ self._test_validate_exception(profile, expected_msg)
+
+ # Too big datapath_id
+ profile = {'portinfo:datapath_id': '0x10000000000000000',
+ 'portinfo:port_no': 123}
+ self._test_validate_exception(profile, expected_msg)
+
+ def test_validate_portinfo_too_big_port_number(self):
+ profile = {'portinfo:datapath_id': '0x1234567890abcdef',
+ 'portinfo:port_no': 65536}
+ expected_msg = ("Invalid input for operation: "
+ "portinfo:port_no should be [0:65535]")
+ self._test_validate_exception(profile, expected_msg)