# License for the specific language governing permissions and limitations
# under the License.
-import time
-
+from networking_odl.ml2 import mech_driver
from oslo.config import cfg
-from oslo.serialization import jsonutils
-from oslo.utils import excutils
-import requests
from neutron.common import constants as n_const
-from neutron.common import exceptions as n_exc
-from neutron.common import utils
from neutron.extensions import portbindings
from neutron.openstack.common import log
from neutron.plugins.common import constants
cfg.CONF.register_opts(odl_opts, "ml2_odl")
-def try_del(d, keys):
- """Ignore key errors when deleting from a dictionary."""
- for key in keys:
- try:
- del d[key]
- except KeyError:
- pass
-
-
-class OpendaylightAuthError(n_exc.NeutronException):
- message = '%(msg)s'
-
-
-class JsessionId(requests.auth.AuthBase):
-
- """Attaches the JSESSIONID and JSESSIONIDSSO cookies to an HTTP Request.
-
- If the cookies are not available or when the session expires, a new
- set of cookies are obtained.
- """
-
- def __init__(self, url, username, password):
- """Initialization function for JsessionId."""
-
- # NOTE(kmestery) The 'limit' paramater is intended to limit how much
- # data is returned from ODL. This is not implemented in the Hydrogen
- # release of OpenDaylight, but will be implemented in the Helium
- # timeframe. Hydrogen will silently ignore this value.
- self.url = str(url) + '/' + ODL_NETWORKS + '?limit=1'
- self.username = username
- self.password = password
- self.auth_cookies = None
- self.last_request = None
- self.expired = None
- self.session_timeout = cfg.CONF.ml2_odl.session_timeout * 60
- self.session_deadline = 0
-
- def obtain_auth_cookies(self):
- """Make a REST call to obtain cookies for ODL authenticiation."""
-
- try:
- r = requests.get(self.url, auth=(self.username, self.password))
- r.raise_for_status()
- except requests.exceptions.HTTPError as e:
- raise OpendaylightAuthError(msg="Failed to authenticate with "
- "OpenDaylight: %s" % e)
- except requests.exceptions.Timeout as e:
- raise OpendaylightAuthError(msg="Authentication Timed Out: %s" % e)
-
- jsessionid = r.cookies.get('JSESSIONID')
- jsessionidsso = r.cookies.get('JSESSIONIDSSO')
- if jsessionid and jsessionidsso:
- self.auth_cookies = dict(JSESSIONID=jsessionid,
- JSESSIONIDSSO=jsessionidsso)
-
- def __call__(self, r):
- """Verify timestamp for Tomcat session timeout."""
-
- if time.time() > self.session_deadline:
- self.obtain_auth_cookies()
- self.session_deadline = time.time() + self.session_timeout
- r.prepare_cookies(self.auth_cookies)
- return r
-
-
class OpenDaylightMechanismDriver(api.MechanismDriver):
"""Mechanism Driver for OpenDaylight.
exposed by ODL is slightly different from the API exposed by NCS,
but the general concepts are the same.
"""
- auth = None
- out_of_sync = True
def initialize(self):
self.url = cfg.CONF.ml2_odl.url
for opt in required_opts:
if not getattr(self, opt):
raise cfg.RequiredOptError(opt, 'ml2_odl')
- self.auth = JsessionId(self.url, self.username, self.password)
self.vif_type = portbindings.VIF_TYPE_OVS
self.vif_details = {portbindings.CAP_PORT_FILTER: True}
+ self.odl_drv = mech_driver.OpenDaylightDriver()
# Postcommit hooks are used to trigger synchronization.
def create_network_postcommit(self, context):
- self.synchronize('create', ODL_NETWORKS, context)
+ self.odl_drv.synchronize('create', ODL_NETWORKS, context)
def update_network_postcommit(self, context):
- self.synchronize('update', ODL_NETWORKS, context)
+ self.odl_drv.synchronize('update', ODL_NETWORKS, context)
def delete_network_postcommit(self, context):
- self.synchronize('delete', ODL_NETWORKS, context)
+ self.odl_drv.synchronize('delete', ODL_NETWORKS, context)
def create_subnet_postcommit(self, context):
- self.synchronize('create', ODL_SUBNETS, context)
+ self.odl_drv.synchronize('create', ODL_SUBNETS, context)
def update_subnet_postcommit(self, context):
- self.synchronize('update', ODL_SUBNETS, context)
+ self.odl_drv.synchronize('update', ODL_SUBNETS, context)
def delete_subnet_postcommit(self, context):
- self.synchronize('delete', ODL_SUBNETS, context)
+ self.odl_drv.synchronize('delete', ODL_SUBNETS, context)
def create_port_postcommit(self, context):
- self.synchronize('create', ODL_PORTS, context)
+ self.odl_drv.synchronize('create', ODL_PORTS, context)
def update_port_postcommit(self, context):
- self.synchronize('update', ODL_PORTS, context)
+ self.odl_drv.synchronize('update', ODL_PORTS, context)
def delete_port_postcommit(self, context):
- self.synchronize('delete', ODL_PORTS, context)
-
- def synchronize(self, operation, object_type, context):
- """Synchronize ODL with Neutron following a configuration change."""
- if self.out_of_sync:
- self.sync_full(context)
- else:
- self.sync_single_resource(operation, object_type, context)
-
- @staticmethod
- def filter_create_network_attributes(network, context):
- """Filter out network attributes not required for a create."""
- try_del(network, ['status', 'subnets'])
-
- @staticmethod
- def filter_create_subnet_attributes(subnet, context):
- """Filter out subnet attributes not required for a create."""
- pass
-
- @classmethod
- def filter_create_port_attributes(cls, port, context):
- """Filter out port attributes not required for a create."""
- cls.add_security_groups(port, context)
- # TODO(kmestery): Converting to uppercase due to ODL bug
- # https://bugs.opendaylight.org/show_bug.cgi?id=477
- port['mac_address'] = port['mac_address'].upper()
- try_del(port, ['status'])
-
- def sync_resources(self, collection_name, context):
- """Sync objects from Neutron over to OpenDaylight.
-
- This will handle syncing networks, subnets, and ports from Neutron to
- OpenDaylight. It also filters out the requisite items which are not
- valid for create API operations.
- """
- to_be_synced = []
- dbcontext = context._plugin_context
- obj_getter = getattr(context._plugin, 'get_%s' % collection_name)
- resources = obj_getter(dbcontext)
- for resource in resources:
- try:
- urlpath = collection_name + '/' + resource['id']
- self.sendjson('get', urlpath, None)
- except requests.exceptions.HTTPError as e:
- with excutils.save_and_reraise_exception() as ctx:
- if e.response.status_code == requests.codes.not_found:
- attr_filter = self.create_object_map[collection_name]
- attr_filter(resource, context)
- to_be_synced.append(resource)
- ctx.reraise = False
- key = collection_name[:-1] if len(to_be_synced) == 1 else (
- collection_name)
- self.sendjson('post', collection_name, {key: to_be_synced})
-
- @utils.synchronized('odl-sync-full')
- def sync_full(self, context):
- """Resync the entire database to ODL.
-
- Transition to the in-sync state on success.
- Note: we only allow a single thread in here at a time.
- """
- if not self.out_of_sync:
- return
- for collection_name in [ODL_NETWORKS, ODL_SUBNETS, ODL_PORTS]:
- self.sync_resources(collection_name, context)
- self.out_of_sync = False
-
- @staticmethod
- def filter_update_network_attributes(network, context):
- """Filter out network attributes for an update operation."""
- try_del(network, ['id', 'status', 'subnets', 'tenant_id'])
-
- @staticmethod
- def filter_update_subnet_attributes(subnet, context):
- """Filter out subnet attributes for an update operation."""
- try_del(subnet, ['id', 'network_id', 'ip_version', 'cidr',
- 'allocation_pools', 'tenant_id'])
-
- @classmethod
- def filter_update_port_attributes(cls, port, context):
- """Filter out port attributes for an update operation."""
- cls.add_security_groups(port, context)
- try_del(port, ['network_id', 'id', 'status', 'mac_address',
- 'tenant_id', 'fixed_ips'])
-
- def sync_single_resource(self, operation, object_type, context):
- """Sync over a single resource from Neutron to OpenDaylight.
-
- Handle syncing a single operation over to OpenDaylight, and correctly
- filter attributes out which are not required for the requisite
- operation (create or update) being handled.
- """
- try:
- obj_id = context.current['id']
- if operation == 'delete':
- self.sendjson('delete', object_type + '/' + obj_id, None)
- else:
- if operation == 'create':
- urlpath = object_type
- method = 'post'
- attr_filter = self.create_object_map[object_type]
- elif operation == 'update':
- urlpath = object_type + '/' + obj_id
- method = 'put'
- attr_filter = self.update_object_map[object_type]
- resource = context.current.copy()
- attr_filter(resource, context)
- self.sendjson(method, urlpath, {object_type[:-1]: resource})
- except Exception:
- with excutils.save_and_reraise_exception():
- self.out_of_sync = True
-
- @staticmethod
- def add_security_groups(port, context):
- """Populate the 'security_groups' field with entire records."""
- dbcontext = context._plugin_context
- groups = [context._plugin.get_security_group(dbcontext, sg)
- for sg in port['security_groups']]
- port['security_groups'] = groups
-
- def sendjson(self, method, urlpath, obj):
- """Send json to the OpenDaylight controller."""
- headers = {'Content-Type': 'application/json'}
- data = jsonutils.dumps(obj, indent=2) if obj else None
- url = '/'.join([self.url, urlpath])
- LOG.debug("Sending METHOD (%(method)s) URL (%(url)s) JSON (%(obj)s)",
- {'method': method, 'url': url, 'obj': obj})
- r = requests.request(method, url=url,
- headers=headers, data=data,
- auth=self.auth, timeout=self.timeout)
- r.raise_for_status()
+ self.odl_drv.synchronize('delete', ODL_PORTS, context)
def bind_port(self, context):
LOG.debug("Attempting to bind port %(port)s on "
network_type = segment[api.NETWORK_TYPE]
return network_type in [constants.TYPE_LOCAL, constants.TYPE_GRE,
constants.TYPE_VXLAN, constants.TYPE_VLAN]
-
-
-OpenDaylightMechanismDriver.create_object_map = {
- ODL_NETWORKS: OpenDaylightMechanismDriver.filter_create_network_attributes,
- ODL_SUBNETS: OpenDaylightMechanismDriver.filter_create_subnet_attributes,
- ODL_PORTS: OpenDaylightMechanismDriver.filter_create_port_attributes}
-
-OpenDaylightMechanismDriver.update_object_map = {
- ODL_NETWORKS: OpenDaylightMechanismDriver.filter_update_network_attributes,
- ODL_SUBNETS: OpenDaylightMechanismDriver.filter_update_subnet_attributes,
- ODL_PORTS: OpenDaylightMechanismDriver.filter_update_port_attributes}
-# Copyright (c) 2013-2014 OpenStack Foundation
+# Copyright (c) 2013-2015 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# License for the specific language governing permissions and limitations
# under the License.
-import mock
-from oslo.serialization import jsonutils
-import requests
+import sys
-from neutron.plugins.common import constants
-from neutron.plugins.ml2 import config as config
-from neutron.plugins.ml2 import driver_api as api
-from neutron.plugins.ml2.drivers import mechanism_odl
-from neutron.plugins.ml2 import plugin
-from neutron.tests import base
+import mock
+from neutron import context
from neutron.tests.unit.ml2 import test_ml2_plugin as test_plugin
-from neutron.tests.unit import testlib_api
-
-PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
-
-
-class OpenDaylightTestCase(test_plugin.Ml2PluginV2TestCase):
- _mechanism_drivers = ['logger', 'opendaylight']
-
- def setUp(self):
- # Set URL/user/pass so init doesn't throw a cfg required error.
- # They are not used in these tests since sendjson is overwritten.
- config.cfg.CONF.set_override('url', 'http://127.0.0.1:9999', 'ml2_odl')
- config.cfg.CONF.set_override('username', 'someuser', 'ml2_odl')
- config.cfg.CONF.set_override('password', 'somepass', 'ml2_odl')
-
- super(OpenDaylightTestCase, self).setUp()
- self.port_create_status = 'DOWN'
- self.mech = mechanism_odl.OpenDaylightMechanismDriver()
- mechanism_odl.OpenDaylightMechanismDriver.sendjson = (
- self.check_sendjson)
-
- def check_sendjson(self, method, urlpath, obj, ignorecodes=[]):
- self.assertFalse(urlpath.startswith("http://"))
-
-
-class OpenDayLightMechanismConfigTests(testlib_api.SqlTestCase):
-
- def _set_config(self, url='http://127.0.0.1:9999', username='someuser',
- password='somepass'):
- config.cfg.CONF.set_override('mechanism_drivers',
- ['logger', 'opendaylight'],
- 'ml2')
- config.cfg.CONF.set_override('url', url, 'ml2_odl')
- config.cfg.CONF.set_override('username', username, 'ml2_odl')
- config.cfg.CONF.set_override('password', password, 'ml2_odl')
-
- def _test_missing_config(self, **kwargs):
- self._set_config(**kwargs)
- self.assertRaises(config.cfg.RequiredOptError,
- plugin.Ml2Plugin)
-
- def test_valid_config(self):
- self._set_config()
- plugin.Ml2Plugin()
-
- def test_missing_url_raises_exception(self):
- self._test_missing_config(url=None)
-
- def test_missing_username_raises_exception(self):
- self._test_missing_config(username=None)
-
- def test_missing_password_raises_exception(self):
- self._test_missing_config(password=None)
-
-
-class OpenDaylightMechanismTestBasicGet(test_plugin.TestMl2BasicGet,
- OpenDaylightTestCase):
- pass
-
-
-class OpenDaylightMechanismTestNetworksV2(test_plugin.TestMl2NetworksV2,
- OpenDaylightTestCase):
- pass
-
-
-class OpenDaylightMechanismTestSubnetsV2(test_plugin.TestMl2SubnetsV2,
- OpenDaylightTestCase):
- pass
-
-
-class OpenDaylightMechanismTestPortsV2(test_plugin.TestMl2PortsV2,
- OpenDaylightTestCase):
- pass
-
-class AuthMatcher(object):
- def __eq__(self, obj):
- return (obj.username == config.cfg.CONF.ml2_odl.username and
- obj.password == config.cfg.CONF.ml2_odl.password)
+ODL_NETWORKS = 'networks'
+ODL_SUBNETS = 'subnets'
+ODL_PORTS = 'ports'
-class DataMatcher(object):
+with mock.patch.dict(sys.modules,
+ {'networking_odl': mock.Mock(),
+ 'networking_odl.ml2': mock.Mock()}):
+ from neutron.plugins.ml2.drivers import mechanism_odl
- def __init__(self, operation, object_type, context):
- self._data = context.current.copy()
- self._object_type = object_type
- filter_map = getattr(mechanism_odl.OpenDaylightMechanismDriver,
- '%s_object_map' % operation)
- attr_filter = filter_map["%ss" % object_type]
- attr_filter(self._data, context)
- def __eq__(self, s):
- data = jsonutils.loads(s)
- return self._data == data[self._object_type]
-
-
-class OpenDaylightMechanismDriverTestCase(base.BaseTestCase):
+class TestODLShim(test_plugin.Ml2PluginV2TestCase):
def setUp(self):
- super(OpenDaylightMechanismDriverTestCase, self).setUp()
- config.cfg.CONF.set_override('mechanism_drivers',
- ['logger', 'opendaylight'], 'ml2')
- config.cfg.CONF.set_override('url', 'http://127.0.0.1:9999', 'ml2_odl')
- config.cfg.CONF.set_override('username', 'someuser', 'ml2_odl')
- config.cfg.CONF.set_override('password', 'somepass', 'ml2_odl')
- self.mech = mechanism_odl.OpenDaylightMechanismDriver()
- self.mech.initialize()
-
- @staticmethod
- def _get_mock_network_operation_context():
- current = {'status': 'ACTIVE',
- 'subnets': [],
- 'name': 'net1',
- 'provider:physical_network': None,
- 'admin_state_up': True,
- 'tenant_id': 'test-tenant',
- 'provider:network_type': 'local',
- 'router:external': False,
- 'shared': False,
- 'id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e',
- 'provider:segmentation_id': None}
- context = mock.Mock(current=current)
- return context
-
- @staticmethod
- def _get_mock_subnet_operation_context():
- current = {'ipv6_ra_mode': None,
- 'allocation_pools': [{'start': '10.0.0.2',
- 'end': '10.0.1.254'}],
- 'host_routes': [],
- 'ipv6_address_mode': None,
- 'cidr': '10.0.0.0/23',
- 'id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839',
- 'name': '',
- 'enable_dhcp': True,
- 'network_id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e',
- 'tenant_id': 'test-tenant',
- 'dns_nameservers': [],
- 'gateway_ip': '10.0.0.1',
- 'ip_version': 4,
- 'shared': False}
- context = mock.Mock(current=current)
- return context
-
- @staticmethod
- def _get_mock_port_operation_context():
- current = {'status': 'DOWN',
- 'binding:host_id': '',
- 'allowed_address_pairs': [],
- 'device_owner': 'fake_owner',
- 'binding:profile': {},
- 'fixed_ips': [],
- 'id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839',
- 'security_groups': ['2f9244b4-9bee-4e81-bc4a-3f3c2045b3d7'],
- 'device_id': 'fake_device',
- 'name': '',
- 'admin_state_up': True,
- 'network_id': 'c13bba05-eb07-45ba-ace2-765706b2d701',
- 'tenant_id': 'bad_tenant_id',
- 'binding:vif_details': {},
- 'binding:vnic_type': 'normal',
- 'binding:vif_type': 'unbound',
- 'mac_address': '12:34:56:78:21:b6'}
- context = mock.Mock(current=current)
- context._plugin.get_security_group = mock.Mock(return_value={})
- return context
-
- @classmethod
- def _get_mock_operation_context(cls, object_type):
- getter = getattr(cls, '_get_mock_%s_operation_context' % object_type)
- return getter()
-
- _status_code_msgs = {
- 200: '',
- 201: '',
- 204: '',
- 400: '400 Client Error: Bad Request',
- 401: '401 Client Error: Unauthorized',
- 403: '403 Client Error: Forbidden',
- 404: '404 Client Error: Not Found',
- 409: '409 Client Error: Conflict',
- 501: '501 Server Error: Not Implemented',
- 503: '503 Server Error: Service Unavailable',
- }
-
- @classmethod
- def _get_mock_request_response(cls, status_code):
- response = mock.Mock(status_code=status_code)
- response.raise_for_status = mock.Mock() if status_code < 400 else (
- mock.Mock(side_effect=requests.exceptions.HTTPError(
- cls._status_code_msgs[status_code])))
- return response
-
- def _test_single_operation(self, method, context, status_code,
- exc_class=None, *args, **kwargs):
- self.mech.out_of_sync = False
- request_response = self._get_mock_request_response(status_code)
- with mock.patch('requests.request',
- return_value=request_response) as mock_method:
- if exc_class is not None:
- self.assertRaises(exc_class, method, context)
- else:
- method(context)
- mock_method.assert_called_once_with(
- headers={'Content-Type': 'application/json'}, auth=AuthMatcher(),
- timeout=config.cfg.CONF.ml2_odl.timeout, *args, **kwargs)
-
- def _test_create_resource_postcommit(self, object_type, status_code,
- exc_class=None):
- method = getattr(self.mech, 'create_%s_postcommit' % object_type)
- context = self._get_mock_operation_context(object_type)
- url = '%s/%ss' % (config.cfg.CONF.ml2_odl.url, object_type)
- kwargs = {'url': url,
- 'data': DataMatcher('create', object_type, context)}
- self._test_single_operation(method, context, status_code, exc_class,
- 'post', **kwargs)
-
- def _test_update_resource_postcommit(self, object_type, status_code,
- exc_class=None):
- method = getattr(self.mech, 'update_%s_postcommit' % object_type)
- context = self._get_mock_operation_context(object_type)
- url = '%s/%ss/%s' % (config.cfg.CONF.ml2_odl.url, object_type,
- context.current['id'])
- kwargs = {'url': url,
- 'data': DataMatcher('update', object_type, context)}
- self._test_single_operation(method, context, status_code, exc_class,
- 'put', **kwargs)
-
- def _test_delete_resource_postcommit(self, object_type, status_code,
- exc_class=None):
- method = getattr(self.mech, 'delete_%s_postcommit' % object_type)
- context = self._get_mock_operation_context(object_type)
- url = '%s/%ss/%s' % (config.cfg.CONF.ml2_odl.url, object_type,
- context.current['id'])
- kwargs = {'url': url, 'data': None}
- self._test_single_operation(method, context, status_code, exc_class,
- 'delete', **kwargs)
+ super(TestODLShim, self).setUp()
+ self.context = context.get_admin_context()
+ self.plugin = mock.Mock()
+ self.driver = mechanism_odl.OpenDaylightMechanismDriver()
+ self.driver.odl_drv = mock.Mock()
def test_create_network_postcommit(self):
- self._test_create_resource_postcommit('network',
- requests.codes.created)
- for status_code in (requests.codes.bad_request,
- requests.codes.unauthorized):
- self._test_create_resource_postcommit(
- 'network', status_code, requests.exceptions.HTTPError)
+ self.driver.create_network_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('create',
+ ODL_NETWORKS,
+ self.context)
- def test_create_subnet_postcommit(self):
- self._test_create_resource_postcommit('subnet', requests.codes.created)
- for status_code in (requests.codes.bad_request,
- requests.codes.unauthorized,
- requests.codes.forbidden,
- requests.codes.not_found,
- requests.codes.conflict,
- requests.codes.not_implemented):
- self._test_create_resource_postcommit(
- 'subnet', status_code, requests.exceptions.HTTPError)
+ def test_update_network_postcommit(self):
+ self.driver.update_network_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('update',
+ ODL_NETWORKS,
+ self.context)
- def test_create_port_postcommit(self):
- self._test_create_resource_postcommit('port', requests.codes.created)
- for status_code in (requests.codes.bad_request,
- requests.codes.unauthorized,
- requests.codes.forbidden,
- requests.codes.not_found,
- requests.codes.conflict,
- requests.codes.not_implemented,
- requests.codes.service_unavailable):
- self._test_create_resource_postcommit(
- 'port', status_code, requests.exceptions.HTTPError)
+ def test_delete_network_postcommit(self):
+ self.driver.delete_network_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('delete',
+ ODL_NETWORKS,
+ self.context)
- def test_update_network_postcommit(self):
- self._test_update_resource_postcommit('network', requests.codes.ok)
- for status_code in (requests.codes.bad_request,
- requests.codes.forbidden,
- requests.codes.not_found):
- self._test_update_resource_postcommit(
- 'network', status_code, requests.exceptions.HTTPError)
+ def test_create_subnet_postcommit(self):
+ self.driver.create_subnet_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('create',
+ ODL_SUBNETS,
+ self.context)
def test_update_subnet_postcommit(self):
- self._test_update_resource_postcommit('subnet', requests.codes.ok)
- for status_code in (requests.codes.bad_request,
- requests.codes.unauthorized,
- requests.codes.forbidden,
- requests.codes.not_found,
- requests.codes.not_implemented):
- self._test_update_resource_postcommit(
- 'subnet', status_code, requests.exceptions.HTTPError)
+ self.driver.update_subnet_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('update',
+ ODL_SUBNETS,
+ self.context)
- def test_update_port_postcommit(self):
- self._test_update_resource_postcommit('port', requests.codes.ok)
- for status_code in (requests.codes.bad_request,
- requests.codes.unauthorized,
- requests.codes.forbidden,
- requests.codes.not_found,
- requests.codes.conflict,
- requests.codes.not_implemented):
- self._test_update_resource_postcommit(
- 'port', status_code, requests.exceptions.HTTPError)
+ def test_delete_subnet_postcommit(self):
+ self.driver.delete_subnet_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('delete',
+ ODL_SUBNETS,
+ self.context)
- def test_delete_network_postcommit(self):
- self._test_delete_resource_postcommit('network',
- requests.codes.no_content)
- for status_code in (requests.codes.unauthorized,
- requests.codes.not_found,
- requests.codes.conflict):
- self._test_delete_resource_postcommit(
- 'network', status_code, requests.exceptions.HTTPError)
+ def test_create_port_postcommit(self):
+ self.driver.create_port_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('create',
+ ODL_PORTS,
+ self.context)
- def test_delete_subnet_postcommit(self):
- self._test_delete_resource_postcommit('subnet',
- requests.codes.no_content)
- for status_code in (requests.codes.unauthorized,
- requests.codes.not_found,
- requests.codes.conflict,
- requests.codes.not_implemented):
- self._test_delete_resource_postcommit(
- 'subnet', status_code, requests.exceptions.HTTPError)
+ def test_update_port_postcommit(self):
+ self.driver.update_port_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('update',
+ ODL_PORTS,
+ self.context)
def test_delete_port_postcommit(self):
- self._test_delete_resource_postcommit('port',
- requests.codes.no_content)
- for status_code in (requests.codes.unauthorized,
- requests.codes.forbidden,
- requests.codes.not_found,
- requests.codes.not_implemented):
- self._test_delete_resource_postcommit(
- 'port', status_code, requests.exceptions.HTTPError)
-
- def test_check_segment(self):
- """Validate the check_segment call."""
- segment = {'api.NETWORK_TYPE': ""}
- segment[api.NETWORK_TYPE] = constants.TYPE_LOCAL
- self.assertTrue(self.mech.check_segment(segment))
- segment[api.NETWORK_TYPE] = constants.TYPE_FLAT
- self.assertFalse(self.mech.check_segment(segment))
- segment[api.NETWORK_TYPE] = constants.TYPE_VLAN
- self.assertTrue(self.mech.check_segment(segment))
- segment[api.NETWORK_TYPE] = constants.TYPE_GRE
- self.assertTrue(self.mech.check_segment(segment))
- segment[api.NETWORK_TYPE] = constants.TYPE_VXLAN
- self.assertTrue(self.mech.check_segment(segment))
- # Validate a network type not currently supported
- segment[api.NETWORK_TYPE] = 'mpls'
- self.assertFalse(self.mech.check_segment(segment))
+ self.driver.delete_port_postcommit(self.context)
+ self.driver.odl_drv.synchronize.assert_called_with('delete',
+ ODL_PORTS,
+ self.context)