import eventlet
from oslo.config import cfg
+from neutron.api.v2 import attributes
from neutron.common import log as call_log
from neutron import context
from neutron.db.loadbalancer import loadbalancer_db as lb_db
from neutron.extensions import loadbalancer
+from neutron.openstack.common import excutils
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
"ha_network_name": "HA-Network",
"ha_ip_pool_name": "default",
"allocate_ha_vrrp": True,
- "allocate_ha_ips": True},
+ "allocate_ha_ips": True,
+ "twoleg_enabled": "_REPLACE_"},
help=_('l2_l3 workflow constructor params')),
cfg.DictOpt('l2_l3_setup_params',
default={"data_port": 1,
self.completion_handler_started = False
def create_vip(self, context, vip):
- LOG.debug(_('create_vip. vip: %s'), str(vip))
- extended_vip = self.plugin.populate_vip_graph(context, vip)
- LOG.debug(_('create_vip. extended_vip: %s'), str(extended_vip))
- network_id = self._get_vip_network_id(context, extended_vip)
- LOG.debug(_('create_vip. network_id: %s '), str(network_id))
- service_name = self._get_service(extended_vip['pool_id'], network_id)
- LOG.debug(_('create_vip. service_name: %s '), service_name)
- self._create_workflow(
- vip['pool_id'], self.l4_wf_name,
- {"service": service_name})
- self._update_workflow(
- vip['pool_id'],
- self.l4_action_name, extended_vip, context)
+ log_info = {'vip': vip,
+ 'extended_vip': 'NOT_ASSIGNED',
+ 'vip_network_id': 'NOT_ASSIGNED',
+ 'service_name': 'NOT_ASSIGNED',
+ 'pip_info': 'NOT_ASSIGNED'}
+ try:
+ extended_vip = self.plugin.populate_vip_graph(context, vip)
+ vip_network_id = self._get_vip_network_id(context, extended_vip)
+ pool_network_id = self._get_pool_network_id(context, extended_vip)
+ service_name = self._get_service(vip_network_id, pool_network_id)
+ log_info['extended_vip'] = extended_vip
+ log_info['vip_network_id'] = vip_network_id
+ log_info['service_name'] = service_name
+
+ self._create_workflow(
+ vip['pool_id'], self.l4_wf_name,
+ {"service": service_name})
+ # if VIP and PIP are different, we need an IP address for the PIP
+ # so create port on PIP's network and use its IP address
+ if vip_network_id != pool_network_id:
+ pip_address = self._create_port_for_pip(
+ context,
+ vip['tenant_id'],
+ _make_pip_name_from_vip(vip),
+ pool_network_id)
+ extended_vip['pip_address'] = pip_address
+ log_info['pip_info'] = 'pip_address: ' + pip_address
+ else:
+ extended_vip['pip_address'] = extended_vip['address']
+ log_info['pip_info'] = 'vip == pip: %(address)s' % extended_vip
+ self._update_workflow(
+ vip['pool_id'],
+ self.l4_action_name, extended_vip, context)
+
+ finally:
+ LOG.debug(_('vip: %(vip)s, '
+ 'extended_vip: %(extended_vip)s, '
+ 'network_id: %(vip_network_id)s, '
+ 'service_name: %(service_name)s, '
+ 'pip_info: %(pip_info)s'), log_info)
def update_vip(self, context, old_vip, vip):
extended_vip = self.plugin.populate_vip_graph(context, vip)
ids = params.pop('__ids__')
try:
+ # get neutron port id associated with the vip (present if vip and
+ # pip are different) and release it after workflow removed
+ port_filter = {
+ 'name': [_make_pip_name_from_vip(vip)],
+ }
+ ports = self.plugin._core_plugin.get_ports(context,
+ filters=port_filter)
+ if ports:
+ LOG.debug(_('Retrieved pip nport: %(port)r for '
+ 'vip: %(vip)s'), {'port': ports[0],
+ 'vip': vip['id']})
+
+ delete_pip_nport_function = self._get_delete_pip_nports(
+ context, ports)
+ else:
+ delete_pip_nport_function = None
+ LOG.debug(_('Found no pip nports associated with '
+ 'vip: %s'), vip['id'])
+
# removing the WF will cause deletion of the configuration from the
# device
- self._remove_workflow(ids, context)
+ self._remove_workflow(ids, context, delete_pip_nport_function)
+
except r_exc.RESTRequestFailure:
pool_id = extended_vip['pool_id']
LOG.exception(_('Failed to remove workflow %s. '
self.plugin.update_status(context, lb_db.Vip, ids['vip'],
constants.ERROR)
+ def _get_delete_pip_nports(self, context, ports):
+ def _delete_pip_nports(success):
+ if success:
+ for port in ports:
+ try:
+ self.plugin._core_plugin.delete_port(
+ context, port['id'])
+ LOG.debug(_('pip nport id: %s'), port['id'])
+ except Exception as exception:
+ # stop exception propagation, nport may have
+ # been deleted by other means
+ LOG.warning(_('pip nport delete failed: %r'),
+ exception)
+ return _delete_pip_nports
+
def create_pool(self, context, pool):
# nothing to do
pass
self.completion_handler.start()
self.completion_handler_started = True
+ def _get_pool_network_id(self, context, extended_vip):
+ subnet = self.plugin._core_plugin.get_subnet(
+ context, extended_vip['pool']['subnet_id'])
+ return subnet['network_id']
+
@call_log.log
def _update_workflow(self, wf_name, action,
wf_params, context,
self._start_completion_handling_thread()
self.queue.put_nowait(oper)
- def _remove_workflow(self, ids, context):
+ def _remove_workflow(self, ids, context, post_remove_function):
wf_name = ids['pool']
LOG.debug(_('Remove the workflow %s') % wf_name)
resource = '/api/workflow/%s' % (wf_name)
- response = _rest_wrapper(self.rest_client.call('DELETE', resource,
- None, None),
- [204, 202, 404])
- msg = response.get('message', None)
- if msg == "Not Found":
+ rest_return = self.rest_client.call('DELETE', resource, None, None)
+ response = _rest_wrapper(rest_return, [204, 202, 404])
+ if rest_return[RESP_STATUS] in [404]:
+ if post_remove_function:
+ try:
+ post_remove_function(True)
+ LOG.debug(_('Post-remove workflow function '
+ '%r completed'), post_remove_function)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(_('Post-remove workflow function '
+ '%r failed'), post_remove_function)
self.plugin._delete_db_vip(context, ids['vip'])
else:
- oper = OperationAttributes(response['uri'],
- ids,
- lb_db.Vip,
- ids['vip'],
- delete=True)
+ oper = OperationAttributes(
+ response['uri'],
+ ids,
+ lb_db.Vip,
+ ids['vip'],
+ delete=True,
+ post_op_function=post_remove_function)
LOG.debug(_('Pushing operation %s to the queue'), oper)
self._start_completion_handling_thread()
resource, None, None),
[202])
- def _get_service(self, pool_id, network_id):
+ def _get_service(self, vip_network_id, pool_network_id):
"""Get a service name.
if you can't find one,
- create a service and create l2_l2 WF.
+ create a service and create l2_l3 WF.
"""
if not self.workflow_templates_exists:
self._verify_workflow_templates()
- incoming_service_name = 'srv_' + network_id
+ if vip_network_id != pool_network_id:
+ networks_name = '%s_%s' % (vip_network_id, pool_network_id)
+ self.l2_l3_ctor_params["twoleg_enabled"] = True
+ else:
+ networks_name = vip_network_id
+ self.l2_l3_ctor_params["twoleg_enabled"] = False
+ incoming_service_name = 'srv_%s' % (networks_name,)
service_name = self._get_available_service(incoming_service_name)
if not service_name:
LOG.debug(
'Could not find a service named ' + incoming_service_name)
- service_name = self._create_service(pool_id, network_id)
+ service_name = self._create_service(vip_network_id,
+ pool_network_id)
self.l2_l3_ctor_params["service"] = incoming_service_name
- wf_name = 'l2_l3_' + network_id
+ wf_name = 'l2_l3_' + networks_name
self._create_workflow(
wf_name, self.l2_l3_wf_name, self.l2_l3_ctor_params)
self._update_workflow(
LOG.debug('A service named ' + service_name + ' was found.')
return service_name
- def _create_service(self, pool_id, network_id):
+ def _create_service(self, vip_network_id, pool_network_id):
"""create the service and provision it (async)."""
# 1) create the service
- service_name = 'srv_' + network_id
- resource = '/api/service?name=%s' % service_name
-
service = copy.deepcopy(self.service)
- service['primary']['network']['portgroups'] = [network_id]
+ if vip_network_id != pool_network_id:
+ service_name = 'srv_%s_%s' % (vip_network_id, pool_network_id)
+ service['primary']['network']['portgroups'] = [vip_network_id,
+ pool_network_id]
+ else:
+ service_name = 'srv_' + vip_network_id
+ service['primary']['network']['portgroups'] = [vip_network_id]
+ resource = '/api/service?name=%s' % service_name
response = _rest_wrapper(self.rest_client.call('POST', resource,
service,
raise r_exc.WorkflowMissing(workflow=wf)
self.workflow_templates_exists = True
+ def _create_port_for_pip(self, context, tenant_id, port_name, subnet):
+ """Creates port on subnet, returns that port's IP."""
+
+ # create port, we just want any IP allocated to the port based on the
+ # network id, so setting 'fixed_ips' to ATTR_NOT_SPECIFIED
+ port_data = {
+ 'tenant_id': tenant_id,
+ 'name': port_name,
+ 'network_id': subnet,
+ 'mac_address': attributes.ATTR_NOT_SPECIFIED,
+ 'admin_state_up': False,
+ 'device_id': '',
+ 'device_owner': 'neutron:' + constants.LOADBALANCER,
+ 'fixed_ips': attributes.ATTR_NOT_SPECIFIED
+ }
+ port = self.plugin._core_plugin.create_port(context,
+ {'port': port_data})
+ return port['fixed_ips'][0]['ip_address']
+
class vDirectRESTClient:
"""REST server proxy to Radware vDirect."""
class OperationAttributes:
- """Holds operation attributes."""
+ """Holds operation attributes.
+
+ The parameter 'post_op_function' (if supplied) is a function that takes
+ one boolean argument, specifying the success of the operation
+
+ """
def __init__(self,
operation_url,
object_graph,
lbaas_entity=None,
entity_id=None,
- delete=False):
+ delete=False,
+ post_op_function=None):
self.operation_url = operation_url
self.object_graph = object_graph
self.delete = delete
self.lbaas_entity = lbaas_entity
self.entity_id = entity_id
self.creation_time = time.time()
+ self.post_op_function = post_op_function
def __repr__(self):
items = ("%s = %r" % (k, v) for k, v in self.__dict__.items())
if db_status:
_update_vip_graph_status(self.plugin, oper, db_status)
+ OperationCompletionHandler._run_post_op_function(success, oper)
+
return completed
def run(self):
- oper = None
while not self.stoprequest.isSet():
try:
oper = self.queue.get(timeout=1)
m = _("Exception was thrown inside OperationCompletionHandler")
LOG.exception(m)
+ @staticmethod
+ def _run_post_op_function(success, oper):
+ if oper.post_op_function:
+ log_data = {'func': oper.post_op_function, 'oper': oper}
+ try:
+ oper.post_op_function(success)
+ LOG.debug(_('Post-operation function '
+ '%(func)r completed '
+ 'after operation %(oper)r'),
+ log_data)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(_('Post-operation function '
+ '%(func)r failed '
+ 'after operation %(oper)r'),
+ log_data)
+
def _rest_wrapper(response, success_codes=[202]):
"""Wrap a REST call and make sure a valid status is returned."""
return response[RESP_DATA]
+def _make_pip_name_from_vip(vip):
+ """Standard way of making PIP name based on VIP ID."""
+ return 'pip_' + vip['id']
+
+
def _update_vip_graph_status(plugin, oper, status):
"""Update the status
operation='Remove from DB', entity=oper.lbaas_entity
)
-TRANSLATION_DEFAULTS = {'session_persistence_type': 'SOURCE_IP',
+TRANSLATION_DEFAULTS = {'session_persistence_type': 'none',
'session_persistence_cookie_name': 'none',
'url_path': '/',
'http_method': 'GET',
_trans_prop_name(hm_property))].append(value)
ids = get_ids(extended_vip)
trans_vip['__ids__'] = ids
+ for key in ['pip_address']:
+ if key in extended_vip:
+ trans_vip[key] = extended_vip[key]
LOG.debug('Translated Vip graph: ' + str(trans_vip))
return trans_vip
def _delete_handler(resource):
- return 202, '', '', {'message': 'Not Found'}
+ return 404, '', '', {'message': 'Not Found'}
def _post_handler(resource, binary):
"""Test the rest call failure handling by Exception raising."""
with self.network(do_delete=False) as network:
with self.subnet(network=network, do_delete=False) as subnet:
- with self.pool(no_delete=True, provider='radware') as pool:
+ with self.pool(no_delete=True,
+ provider='radware',
+ subnet_id=subnet['subnet']['id']) as pool:
vip_data = {
'name': 'vip1',
'subnet_id': subnet['subnet']['id'],
def test_create_vip(self):
with self.subnet() as subnet:
- with self.pool(provider='radware') as pool:
+ with self.pool(provider='radware',
+ subnet_id=subnet['subnet']['id']) as pool:
vip_data = {
'name': 'vip1',
'subnet_id': subnet['subnet']['id'],
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
+ def test_create_vip_2_leg(self):
+ """Test creation of a VIP where Alteon VIP and PIP are different."""
+
+ with self.subnet(cidr='10.0.0.0/24') as subnet:
+ with self.subnet(cidr='10.0.1.0/24') as pool_sub:
+ with self.pool(provider='radware',
+ subnet_id=pool_sub['subnet']['id']) as pool:
+ vip_data = {
+ 'name': 'vip1',
+ 'subnet_id': subnet['subnet']['id'],
+ 'pool_id': pool['pool']['id'],
+ 'description': '',
+ 'protocol_port': 80,
+ 'protocol': 'HTTP',
+ 'connection_limit': -1,
+ 'admin_state_up': True,
+ 'status': constants.PENDING_CREATE,
+ 'tenant_id': self._tenant_id,
+ 'session_persistence': ''
+ }
+
+ vip = self.plugin_instance.create_vip(
+ context.get_admin_context(), {'vip': vip_data})
+ name_suffix = '%s_%s' % (subnet['subnet']['network_id'],
+ pool_sub['subnet']['network_id'])
+ # Test creation REST calls
+ calls = [
+ mock.call('GET', '/api/workflowTemplate', None, None),
+ mock.call('GET', '/api/service/srv_' + name_suffix,
+ None, None),
+ mock.call('POST', '/api/service?name=srv_' +
+ name_suffix, mock.ANY,
+ driver.CREATE_SERVICE_HEADER),
+ mock.call('POST', 'someuri',
+ None, driver.PROVISION_HEADER),
+ mock.call('GET', '/api/workflow/l2_l3_' + name_suffix,
+ None, None),
+ mock.call('POST', '/api/workflowTemplate/' +
+ 'openstack_l2_l3' +
+ '?name=l2_l3_' + name_suffix,
+ mock.ANY,
+ driver.TEMPLATE_HEADER),
+ mock.call('POST', '/api/workflow/l2_l3_' +
+ name_suffix + '/action/setup_l2_l3',
+ mock.ANY, driver.TEMPLATE_HEADER),
+ mock.call('GET', '/api/workflow/' +
+ pool['pool']['id'], None, None),
+ mock.call('POST', '/api/workflowTemplate/' +
+ 'openstack_l4' +
+ '?name=' + pool['pool']['id'],
+ mock.ANY,
+ driver.TEMPLATE_HEADER),
+ mock.call('POST', '/api/workflow/' +
+ pool['pool']['id'] + '/action/BaseCreate',
+ mock.ANY, driver.TEMPLATE_HEADER)
+ ]
+ self.driver_rest_call_mock.assert_has_calls(calls)
+ #Test DB
+ new_vip = self.plugin_instance.get_vip(
+ context.get_admin_context(),
+ vip['id']
+ )
+ self.assertEqual(new_vip['status'], constants.ACTIVE)
+
+ # Test that PIP neutron port was created
+ pip_port_filter = {
+ 'name': ['pip_' + vip['id']],
+ }
+ plugin = manager.NeutronManager.get_plugin()
+ num_ports = plugin.get_ports_count(
+ context.get_admin_context(), filters=pip_port_filter)
+ self.assertTrue(num_ports > 0)
+
+ # Delete VIP
+ self.plugin_instance.delete_vip(
+ context.get_admin_context(), vip['id'])
+
+ # Test deletion REST calls
+ calls = [
+ mock.call('DELETE', u'/api/workflow/' +
+ pool['pool']['id'], None, None)
+ ]
+ self.driver_rest_call_mock.assert_has_calls(calls)
+
def test_update_vip(self):
with self.subnet() as subnet:
- with self.pool(provider='radware', no_delete=True) as pool:
+ with self.pool(provider='radware',
+ no_delete=True,
+ subnet_id=subnet['subnet']['id']) as pool:
vip_data = {
'name': 'vip1',
'subnet_id': subnet['subnet']['id'],
self.plugin_instance.delete_vip(
context.get_admin_context(), vip['id'])
+ def test_update_vip_2_leg(self):
+ """Test update of a VIP where Alteon VIP and PIP are different."""
+
+ with self.subnet(cidr='10.0.0.0/24') as subnet:
+ with self.subnet(cidr='10.0.1.0/24') as pool_subnet:
+ with self.pool(provider='radware',
+ subnet_id=pool_subnet['subnet']['id']) as pool:
+ vip_data = {
+ 'name': 'vip1',
+ 'subnet_id': subnet['subnet']['id'],
+ 'pool_id': pool['pool']['id'],
+ 'description': '',
+ 'protocol_port': 80,
+ 'protocol': 'HTTP',
+ 'connection_limit': -1,
+ 'admin_state_up': True,
+ 'status': constants.PENDING_CREATE,
+ 'tenant_id': self._tenant_id,
+ 'session_persistence': ''
+ }
+
+ vip = self.plugin_instance.create_vip(
+ context.get_admin_context(), {'vip': vip_data})
+
+ self.plugin_instance.update_vip(
+ context.get_admin_context(),
+ vip['id'], {'vip': vip_data})
+
+ # Test REST calls
+ calls = [
+ mock.call('POST', '/api/workflow/' +
+ pool['pool']['id'] + '/action/BaseCreate',
+ mock.ANY, driver.TEMPLATE_HEADER),
+ ]
+ self.driver_rest_call_mock.assert_has_calls(calls)
+
+ updated_vip = self.plugin_instance.get_vip(
+ context.get_admin_context(), vip['id'])
+ self.assertEqual(updated_vip['status'], constants.ACTIVE)
+
+ # delete VIP
+ self.plugin_instance.delete_vip(
+ context.get_admin_context(), vip['id'])
+
def test_delete_vip_failure(self):
plugin = self.plugin_instance
with self.network(do_delete=False) as network:
with self.subnet(network=network, do_delete=False) as subnet:
- with self.pool(no_delete=True, provider='radware') as pool:
+ with self.pool(no_delete=True,
+ provider='radware',
+ subnet_id=subnet['subnet']['id']) as pool:
with contextlib.nested(
self.member(pool_id=pool['pool']['id'],
no_delete=True),
def test_delete_vip(self):
with self.subnet() as subnet:
- with self.pool(provider='radware', no_delete=True) as pool:
+ with self.pool(provider='radware',
+ no_delete=True,
+ subnet_id=subnet['subnet']['id']) as pool:
vip_data = {
'name': 'vip1',
'subnet_id': subnet['subnet']['id'],
self.plugin_instance.get_vip,
context.get_admin_context(), vip['id'])
+ def test_delete_vip_2_leg(self):
+ """Test deletion of a VIP where Alteon VIP and PIP are different."""
+
+ self.driver_rest_call_mock.reset_mock()
+ with self.subnet(cidr='10.0.0.0/24') as subnet:
+ with self.subnet(cidr='10.0.1.0/24') as pool_subnet:
+ with self.pool(provider='radware',
+ no_delete=True,
+ subnet_id=pool_subnet['subnet']['id']) as pool:
+ vip_data = {
+ 'name': 'vip1',
+ 'subnet_id': subnet['subnet']['id'],
+ 'pool_id': pool['pool']['id'],
+ 'description': '',
+ 'protocol_port': 80,
+ 'protocol': 'HTTP',
+ 'connection_limit': -1,
+ 'admin_state_up': True,
+ 'status': constants.PENDING_CREATE,
+ 'tenant_id': self._tenant_id,
+ 'session_persistence': ''
+ }
+
+ vip = self.plugin_instance.create_vip(
+ context.get_admin_context(), {'vip': vip_data})
+
+ self.plugin_instance.delete_vip(
+ context.get_admin_context(), vip['id'])
+
+ calls = [
+ mock.call('DELETE', '/api/workflow/' +
+ pool['pool']['id'], None, None)
+ ]
+ self.driver_rest_call_mock.assert_has_calls(calls)
+
+ # Test that PIP neutron port was deleted
+ pip_port_filter = {
+ 'name': ['pip_' + vip['id']],
+ }
+ plugin = manager.NeutronManager.get_plugin()
+ num_ports = plugin.get_ports_count(
+ context.get_admin_context(), filters=pip_port_filter)
+ self.assertTrue(num_ports == 0)
+
+ self.assertRaises(loadbalancer.VipNotFound,
+ self.plugin_instance.get_vip,
+ context.get_admin_context(), vip['id'])
+
def test_update_pool(self):
with self.subnet():
with self.pool() as pool:
def test_delete_pool_with_vip(self):
with self.subnet() as subnet:
- with self.pool(provider='radware', no_delete=True) as pool:
+ with self.pool(provider='radware',
+ no_delete=True,
+ subnet_id=subnet['subnet']['id']) as pool:
with self.vip(pool=pool, subnet=subnet):
self.assertRaises(loadbalancer.PoolInUse,
self.plugin_instance.delete_pool,
def test_create_member_with_vip(self):
with self.subnet() as subnet:
- with self.pool(provider='radware') as p:
+ with self.pool(provider='radware',
+ subnet_id=subnet['subnet']['id']) as p:
with self.vip(pool=p, subnet=subnet):
with self.member(pool_id=p['pool']['id']):
calls = [
def test_update_member_with_vip(self):
with self.subnet() as subnet:
- with self.pool(provider='radware') as p:
+ with self.pool(provider='radware',
+ subnet_id=subnet['subnet']['id']) as p:
with self.member(pool_id=p['pool']['id']) as member:
with self.vip(pool=p, subnet=subnet):
self.plugin_instance.update_member(
def test_delete_member_with_vip(self):
with self.subnet() as subnet:
- with self.pool(provider='radware') as p:
+ with self.pool(provider='radware',
+ subnet_id=subnet['subnet']['id']) as p:
with self.member(pool_id=p['pool']['id'],
no_delete=True) as m:
with self.vip(pool=p, subnet=subnet):
def test_create_hm_with_vip(self):
with self.subnet() as subnet:
with self.health_monitor() as hm:
- with self.pool(provider='radware') as pool:
+ with self.pool(provider='radware',
+ subnet_id=subnet['subnet']['id']) as pool:
with self.vip(pool=pool, subnet=subnet):
self.plugin_instance.create_pool_health_monitor(
def test_delete_pool_hm_with_vip(self):
with self.subnet() as subnet:
with self.health_monitor(no_delete=True) as hm:
- with self.pool(provider='radware') as pool:
+ with self.pool(provider='radware',
+ subnet_id=subnet['subnet']['id']) as pool:
with self.vip(pool=pool, subnet=subnet):
self.plugin_instance.create_pool_health_monitor(
context.get_admin_context(),