# under the License.
from apicapi import apic_manager
-from apicapi import exceptions as exc
from keystoneclient.v2_0 import client as keyclient
import netaddr
from oslo.config import cfg
# Get port
port = context.current
# Check if a compute port
- if port.get('device_owner', '').startswith('compute'):
+ if context.host:
self._perform_path_port_operations(context, port)
- elif port.get('device_owner') == n_constants.DEVICE_OWNER_ROUTER_GW:
+ if port.get('device_owner') == n_constants.DEVICE_OWNER_ROUTER_GW:
self._perform_gw_port_operations(context, port)
- elif port.get('device_owner') == n_constants.DEVICE_OWNER_DHCP:
- self._perform_path_port_operations(context, port)
def _delete_contract(self, context):
port = context.current
self._delete_port_path(context, atenant_id, anetwork_id)
def _get_subnet_info(self, context, subnet):
- tenant_id = subnet['tenant_id']
- network_id = subnet['network_id']
- network = context._plugin.get_network(context._plugin_context,
- network_id)
- if not network.get('router:external'):
- cidr = netaddr.IPNetwork(subnet['cidr'])
- gateway_ip = '%s/%s' % (subnet['gateway_ip'], str(cidr.prefixlen))
-
- # Convert to APIC IDs
- tenant_id = self.name_mapper.tenant(context, tenant_id)
- network_id = self.name_mapper.network(context, network_id)
- return tenant_id, network_id, gateway_ip
+ if subnet['gateway_ip']:
+ tenant_id = subnet['tenant_id']
+ network_id = subnet['network_id']
+ network = context._plugin.get_network(context._plugin_context,
+ network_id)
+ if not network.get('router:external'):
+ cidr = netaddr.IPNetwork(subnet['cidr'])
+ gateway_ip = '%s/%s' % (subnet['gateway_ip'],
+ str(cidr.prefixlen))
+
+ # Convert to APIC IDs
+ tenant_id = self.name_mapper.tenant(context, tenant_id)
+ network_id = self.name_mapper.network(context, network_id)
+ return tenant_id, network_id, gateway_ip
@sync_init
def create_port_postcommit(self, context):
self._perform_port_operations(context)
- def update_port_precommit(self, context):
- orig = context.original
- curr = context.current
- if (orig['device_owner'] != curr['device_owner']
- or orig['device_id'] != curr['device_id']):
- raise exc.ApicOperationNotSupported(
- resource='Port', msg='Port device owner and id cannot be '
- 'changed.')
-
@sync_init
def update_port_postcommit(self, context):
self._perform_port_operations(context)
def delete_port_postcommit(self, context):
port = context.current
# Check if a compute port
- if port.get('device_owner', '').startswith('compute'):
+ if context.host:
self._delete_path_if_last(context)
- elif port.get('device_owner') == n_constants.DEVICE_OWNER_ROUTER_GW:
+ if port.get('device_owner') == n_constants.DEVICE_OWNER_ROUTER_GW:
self._delete_contract(context)
- elif port.get('device_owner') == n_constants.DEVICE_OWNER_DHCP:
- self._delete_path_if_last(context)
@sync_init
def create_network_postcommit(self, context):
TEST_SEGMENT1)
port_ctx = self._get_port_context(mocked.APIC_TENANT,
mocked.APIC_NETWORK,
- 'vm1', net_ctx, HOST_ID1)
+ 'vm1', net_ctx, HOST_ID1,
+ device_owner='any')
mgr = self.driver.apic_manager
self.driver.update_port_postcommit(port_ctx)
mgr.ensure_path_created_for_port.assert_called_once_with(
mocked.APIC_TENANT, mocked.APIC_NETWORK, HOST_ID1,
ENCAP, transaction='transaction')
+ def test_create_port_postcommit(self):
+ net_ctx = self._get_network_context(mocked.APIC_TENANT,
+ mocked.APIC_NETWORK,
+ TEST_SEGMENT1)
+ port_ctx = self._get_port_context(mocked.APIC_TENANT,
+ mocked.APIC_NETWORK,
+ 'vm1', net_ctx, HOST_ID1,
+ device_owner='any')
+ mgr = self.driver.apic_manager
+ self.driver.create_port_postcommit(port_ctx)
+ mgr.ensure_path_created_for_port.assert_called_once_with(
+ mocked.APIC_TENANT, mocked.APIC_NETWORK, HOST_ID1,
+ ENCAP, transaction='transaction')
+
+ def test_update_port_nobound_postcommit(self):
+ net_ctx = self._get_network_context(mocked.APIC_TENANT,
+ mocked.APIC_NETWORK,
+ TEST_SEGMENT1)
+ port_ctx = self._get_port_context(mocked.APIC_TENANT,
+ mocked.APIC_NETWORK,
+ 'vm1', net_ctx, None,
+ device_owner='any')
+ self.driver.update_port_postcommit(port_ctx)
+ mgr = self.driver.apic_manager
+ self.assertFalse(mgr.ensure_path_created_for_port.called)
+
+ def test_create_port_nobound_postcommit(self):
+ net_ctx = self._get_network_context(mocked.APIC_TENANT,
+ mocked.APIC_NETWORK,
+ TEST_SEGMENT1)
+ port_ctx = self._get_port_context(mocked.APIC_TENANT,
+ mocked.APIC_NETWORK,
+ 'vm1', net_ctx, None,
+ device_owner='any')
+ self.driver.create_port_postcommit(port_ctx)
+ mgr = self.driver.apic_manager
+ self.assertFalse(mgr.ensure_path_created_for_port.called)
+
def test_update_gw_port_postcommit(self):
net_ctx = self._get_network_context(mocked.APIC_TENANT,
mocked.APIC_NETWORK,
mocked.APIC_TENANT, mocked.APIC_NETWORK,
'%s/%s' % (SUBNET_GATEWAY, SUBNET_NETMASK))
+ def test_create_subnet_nogw_postcommit(self):
+ net_ctx = self._get_network_context(mocked.APIC_TENANT,
+ mocked.APIC_NETWORK,
+ TEST_SEGMENT1)
+ subnet_ctx = self._get_subnet_context(None,
+ SUBNET_CIDR,
+ net_ctx)
+ mgr = self.driver.apic_manager
+ self.driver.create_subnet_postcommit(subnet_ctx)
+ self.assertFalse(mgr.ensure_subnet_created_on_apic.called)
+
def _get_network_context(self, tenant_id, net_id, seg_id=None,
seg_type='vlan', external=False):
network = {'id': net_id,
return FakeSubnetContext(subnet, network)
def _get_port_context(self, tenant_id, net_id, vm_id, network, host,
- gw=False):
+ gw=False, device_owner='compute'):
port = {'device_id': vm_id,
- 'device_owner': 'compute',
+ 'device_owner': device_owner,
'binding:host_id': host,
'tenant_id': tenant_id,
'id': mocked.APIC_PORT,