# quota_health_monitors = -1
# Number of routers allowed per tenant. A negative value means unlimited.
-# quota_router = 10
+# quota_router = 10
# Number of floating IPs allowed per tenant. A negative value means unlimited.
# quota_floatingip = 50
# service_provider=LOADBALANCER:NetScaler:neutron.services.loadbalancer.drivers.netscaler.netscaler_driver.NetScalerPluginDriver
# Uncomment the following line (and comment out the OpenSwan VPN line) to enable Cisco's VPN driver.
# service_provider=VPN:cisco:neutron.services.vpn.service_drivers.cisco_ipsec.CiscoCsrIPsecVPNDriver:default
+# Uncomment the line below to use Embrane heleos as Load Balancer service provider.
+# service_provider=LOADBALANCER:Embrane:neutron.services.loadbalancer.drivers.embrane.driver.EmbraneLbaas:default
#netscaler_ncc_uri = https://ncc_server.acme.org/ncc/v1/api
#netscaler_ncc_username = admin
#netscaler_ncc_password = secret
+
+[heleoslb]
+#esm_mgmt =
+#admin_username =
+#admin_password =
+#lb_image =
+#inband_id =
+#oob_id =
+#mgmt_id =
+#dummy_utif_id =
+#resource_pool_id =
+#async_requests =
+#lb_flavor = small
+#sync_interval = 60
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""embrane_lbaas_driver
+
+Revision ID: 33dd0a9fa487
+Revises: 19180cf98af6
+Create Date: 2014-02-25 00:15:35.567111
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '33dd0a9fa487'
+down_revision = '19180cf98af6'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+ 'neutron.services.loadbalancer.plugin.LoadBalancerPlugin'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+from neutron.db import migration
+
+
+def upgrade(active_plugins=None, options=None):
+ if not migration.should_run(active_plugins, migration_for_plugins):
+ return
+
+ op.create_table(
+ u'embrane_pool_port',
+ sa.Column(u'pool_id', sa.String(length=36), nullable=False),
+ sa.Column(u'port_id', sa.String(length=36), nullable=False),
+ sa.ForeignKeyConstraint(['pool_id'], [u'pools.id'],
+ name=u'embrane_pool_port_ibfk_1'),
+ sa.ForeignKeyConstraint(['port_id'], [u'ports.id'],
+ name=u'embrane_pool_port_ibfk_2'),
+ sa.PrimaryKeyConstraint(u'pool_id'))
+
+
+def downgrade(active_plugins=None, options=None):
+ if not migration.should_run(active_plugins, migration_for_plugins):
+ return
+
+ op.drop_table(u'embrane_pool_port')
def dispatch_l3(self, d_context, args=(), kwargs={}):
item = d_context.item
event = d_context.event
- q_context = d_context.q_context
+ n_context = d_context.n_context
chain = d_context.chain
item_id = item["id"]
self.sync_items[item_id] = (queue.Queue(),)
first_run = True
self.sync_items[item_id][0].put(
- ctx.OperationContext(event, q_context, item, chain, f,
+ ctx.OperationContext(event, n_context, item, chain, f,
args, kwargs))
t = None
if first_run:
try:
dva_state = operation_context.function(
plugin._esm_api,
- operation_context.q_context.tenant_id,
+ operation_context.n_context.tenant_id,
operation_context.item,
*operation_context.args,
**operation_context.kwargs)
if transient_state:
if transient_state == p_con.Status.DELETED:
current_state = plugin._delete_router(
- operation_context.q_context,
+ operation_context.n_context,
operation_context.item["id"])
# Error state cannot be reverted
elif transient_state != p_con.Status.ERROR:
current_state = plugin._update_neutron_state(
- operation_context.q_context,
+ operation_context.n_context,
operation_context.item,
transient_state)
except Exception:
def __init__(self, event, item, neutron_context, chain=None):
self.event = event
self.item = item
- self.q_context = neutron_context
+ self.n_context = neutron_context
self.chain = chain
class EmbranePluginException(neutron_exec.NeutronException):
message = _("An unexpected error occurred:%(err_msg)s")
+
+
+class UnsupportedException(EmbranePluginException):
+ message = _("%(err_msg)s")
--- /dev/null
+Embrane LBaaS Driver
+
+This DRIVER interfaces OpenStack Neutron with Embrane's heleos platform,
+Load Balancing appliances for cloud environments.
+
+L2 connectivity is leveraged by one of the supported existing plugins.
+
+For more details on use, configuration and implementation please refer to:
+https://wiki.openstack.org/wiki/Neutron/LBaaS/EmbraneDriver
\ No newline at end of file
--- /dev/null
+# Copyright 2014 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+from eventlet import greenthread
+from eventlet import queue
+from heleosapi import exceptions as h_exc
+
+from neutron.openstack.common import log as logging
+from neutron.plugins.embrane.common import contexts as ctx
+from neutron.services.loadbalancer.drivers.embrane.agent import lb_operations
+from neutron.services.loadbalancer.drivers.embrane import constants as econ
+
+LOG = logging.getLogger(__name__)
+
+
+class Dispatcher(object):
+ def __init__(self, driver, async=True):
+ self._async = async
+ self._driver = driver
+ self.sync_items = dict()
+ self.handlers = lb_operations.handlers
+
+ def dispatch_lb(self, d_context, *args, **kwargs):
+ item = d_context.item
+ event = d_context.event
+ n_context = d_context.n_context
+ chain = d_context.chain
+
+ item_id = item["id"]
+ if event in self.handlers:
+ for f in self.handlers[event]:
+ first_run = False
+ if item_id not in self.sync_items:
+ self.sync_items[item_id] = [queue.Queue()]
+ first_run = True
+ self.sync_items[item_id][0].put(
+ ctx.OperationContext(event, n_context, item, chain, f,
+ args, kwargs))
+ if first_run:
+ t = greenthread.spawn(self._consume_lb,
+ item_id,
+ self.sync_items[item_id][0],
+ self._driver,
+ self._async)
+ self.sync_items[item_id].append(t)
+ if not self._async:
+ t = self.sync_items[item_id][1]
+ t.wait()
+
+ def _consume_lb(self, sync_item, sync_queue, driver, a_sync):
+ current_state = None
+ while True:
+ try:
+ if current_state == econ.DELETED:
+ del self.sync_items[sync_item]
+ return
+ try:
+ operation_context = sync_queue.get(
+ block=a_sync,
+ timeout=econ.QUEUE_TIMEOUT)
+ except queue.Empty:
+ del self.sync_items[sync_item]
+ return
+
+ (operation_context.chain and
+ operation_context.chain.execute_all())
+
+ transient_state = None
+ try:
+ transient_state = operation_context.function(
+ driver, operation_context.n_context,
+ operation_context.item, *operation_context.args,
+ **operation_context.kwargs)
+ except (h_exc.PendingDva, h_exc.DvaNotFound,
+ h_exc.BrokenInterface, h_exc.DvaCreationFailed,
+ h_exc.BrokenDva, h_exc.ConfigurationFailed) as ex:
+ LOG.warning(econ.error_map[type(ex)], ex.message)
+ except h_exc.DvaDeleteFailed as ex:
+ LOG.warning(econ.error_map[type(ex)], ex.message)
+ transient_state = econ.DELETED
+ finally:
+ # if the returned transient state is None, no operations
+ # are required on the DVA status
+ if transient_state == econ.DELETED:
+ current_state = driver._delete_vip(
+ operation_context.n_context,
+ operation_context.item)
+ # Error state cannot be reverted
+ else:
+ driver._update_vip_graph_state(
+ operation_context.n_context,
+ operation_context.item)
+ except Exception:
+ LOG.exception(_('Unhandled exception occurred'))
--- /dev/null
+# Copyright 2014 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+from functools import wraps
+
+from heleosapi import exceptions as h_exc
+
+from neutron.openstack.common import log as logging
+from neutron.services.loadbalancer import constants as lcon
+from neutron.services.loadbalancer.drivers.embrane import constants as econ
+
+LOG = logging.getLogger(__name__)
+handlers = {}
+
+
+def handler(event, handler):
+ def wrap(f):
+ if event not in handler.keys():
+ handler[event] = [f]
+ else:
+ handler[event].append(f)
+
+ @wraps(f)
+ def wrapped_f(*args, **kwargs):
+ return f(*args, **kwargs)
+
+ return wrapped_f
+
+ return wrap
+
+
+@handler(econ.Events.CREATE_VIP, handlers)
+def _provision_load_balancer(driver, context, vip, flavor,
+ vip_utif_info, vip_ip_allocation_info,
+ pool_utif_info=None,
+ pool_ip_allocation_info=None,
+ pool=None, members=None,
+ monitors=None):
+ api = driver._heleos_api
+ tenant_id = context.tenant_id
+ admin_state = vip["admin_state_up"]
+ # Architectural configuration
+ api.create_load_balancer(tenant_id=tenant_id,
+ router_id=vip["id"],
+ name=vip["name"],
+ flavor=flavor,
+ up=False)
+ api.grow_interface(vip_utif_info, False, tenant_id, vip["id"])
+ if pool:
+ api.grow_interface(pool_utif_info, False, tenant_id,
+ vip["id"])
+
+ # Logical configuration
+ api.allocate_address(vip["id"], True, vip_ip_allocation_info)
+ if pool:
+ api.allocate_address(vip["id"], True, pool_ip_allocation_info)
+ dva = api.configure_load_balancer(vip["id"], admin_state,
+ vip, pool,
+ monitors, members)
+ return api.extract_dva_state(dva)
+
+
+@handler(econ.Events.UPDATE_VIP, handlers)
+def _update_load_balancer(driver, context, vip,
+ old_pool_id=None, old_port_id=None,
+ removed_ip=None, pool_utif_info=None,
+ pool_ip_allocation_info=None,
+ new_pool=None, members=None,
+ monitors=None):
+ api = driver._heleos_api
+ tenant_id = context.tenant_id
+ admin_state = vip["admin_state_up"]
+
+ if old_pool_id:
+ # Architectural Changes
+ api.de_allocate_address(vip['id'], False, old_port_id, removed_ip)
+ api.shrink_interface(tenant_id, vip["id"], False, old_port_id)
+ api.grow_interface(pool_utif_info, False, tenant_id, vip["id"])
+ # Configuration Changes
+ api.allocate_address(vip["id"], True, pool_ip_allocation_info)
+ api.replace_pool(vip["id"], True, vip, old_pool_id,
+ new_pool, monitors, members)
+
+ api.update_vservice(vip["id"], True, vip)
+ # Dva update
+ dva = api.update_dva(tenant_id, vip["id"], vip["name"],
+ admin_state, description=vip["description"])
+
+ return api.extract_dva_state(dva)
+
+
+@handler(econ.Events.DELETE_VIP, handlers)
+def _delete_load_balancer(driver, context, vip):
+ try:
+ driver._heleos_api.delete_dva(context.tenant_id, vip['id'])
+ except h_exc.DvaNotFound:
+ LOG.warning(_('The load balancer %s had no physical representation, '
+ 'likely already deleted'), vip['id'])
+ return econ.DELETED
+
+
+@handler(econ.Events.UPDATE_POOL, handlers)
+def _update_server_pool(driver, context, vip, pool,
+ monitors=None):
+ api = driver._heleos_api
+ cookie = ((vip.get('session_persistence') or {}).get('type') ==
+ lcon.SESSION_PERSISTENCE_HTTP_COOKIE)
+ return api.extract_dva_state(api.update_pool(vip['id'],
+ vip['admin_state_up'],
+ pool, cookie, monitors))
+
+
+@handler(econ.Events.ADD_OR_UPDATE_MEMBER, handlers)
+def _add_or_update_pool_member(driver, context, vip, member, protocol):
+ api = driver._heleos_api
+ return api.extract_dva_state(api.update_backend_server(
+ vip['id'], vip['admin_state_up'], member, protocol))
+
+
+@handler(econ.Events.REMOVE_MEMBER, handlers)
+def _remove_member_from_pool(driver, context, vip, member):
+ api = driver._heleos_api
+ return api.extract_dva_state(api.remove_pool_member(vip['id'],
+ vip['admin_state_up'],
+ member))
+
+
+@handler(econ.Events.DELETE_MEMBER, handlers)
+def _delete_member(driver, context, vip, member):
+ with context.session.begin(subtransactions=True):
+ api = driver._heleos_api
+ dva = api.delete_backend_server(vip['id'], vip['admin_state_up'],
+ member)
+ driver._delete_member(context, member)
+ return api.extract_dva_state(dva)
+
+
+@handler(econ.Events.ADD_POOL_HM, handlers)
+def _create_pool_hm(driver, context, vip, hm, pool_id):
+ api = driver._heleos_api
+ return api.extract_dva_state(api.add_pool_monitor(
+ vip['id'], vip['admin_state_up'], hm, pool_id))
+
+
+@handler(econ.Events.UPDATE_POOL_HM, handlers)
+def _update_pool_hm(driver, context, vip, hm, pool_id):
+ api = driver._heleos_api
+ return api.extract_dva_state(api.update_pool_monitor(
+ vip['id'], vip['admin_state_up'], hm, pool_id))
+
+
+@handler(econ.Events.DELETE_POOL_HM, handlers)
+def _delete_pool_hm(driver, context, vip, hm, pool_id):
+ with context.session.begin(subtransactions=True):
+ api = driver._heleos_api
+ dva = api.add_pool_monitor(vip['id'], vip['admin_state_up'],
+ hm, pool_id)
+ driver._delete_pool_hm(context, hm, pool_id)
+ return api.extract_dva_state(dva)
+
+
+@handler(econ.Events.POLL_GRAPH, handlers)
+def _poll_graph(driver, context, vip):
+ api = driver._heleos_api
+ return api.extract_dva_state(api.get_dva(vip['id']))
--- /dev/null
+# Copyright 2014 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+from oslo.config import cfg
+
+# User may want to use LB service together with the L3 plugin, but using
+# different resources. The service will inherit the configuration from the
+# L3 heleos plugin if present and not overridden.
+heleos_opts = [
+ cfg.StrOpt('esm_mgmt',
+ help=_('ESM management root address')),
+ cfg.StrOpt('admin_username', default=None,
+ help=_('ESM admin username.')),
+ cfg.StrOpt('admin_password', default=None,
+ secret=True,
+ help=_('ESM admin password.')),
+ cfg.StrOpt('lb_image', default=None,
+ help=_('Load Balancer image id (Embrane LB)')),
+ cfg.StrOpt('inband_id', default=None,
+ help=_('In band Security Zone id for LBs')),
+ cfg.StrOpt('oob_id', default=None,
+ help=_('Out of band Security Zone id for LBs')),
+ cfg.StrOpt('mgmt_id', default=None,
+ help=_('Management Security Zone id for LBs')),
+ cfg.StrOpt('dummy_utif_id', default=None,
+ help=_('Dummy user traffic Security Zone id for LBs')),
+ cfg.StrOpt('resource_pool_id', default=None,
+ help=_('Shared resource pool id')),
+ cfg.StrOpt('lb_flavor', default="small",
+ help=_('choose LB image flavor to use, accepted values: small, '
+ 'medium')),
+ cfg.IntOpt('sync_interval', default=60,
+ help=_('resource synchronization interval in seconds')),
+ cfg.BoolOpt('async_requests', default=None,
+ help=_('Define if the requests have '
+ 'run asynchronously or not')),
+]
+
+cfg.CONF.register_opts(heleos_opts, 'heleoslb')
--- /dev/null
+# Copyright 2014 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+from heleosapi import constants as h_con
+from heleosapi import exceptions as h_exc
+
+from neutron.plugins.common import constants as ccon
+
+DELETED = 'DELETED' # not visible status
+QUEUE_TIMEOUT = 300
+BACK_SUB_LIMIT = 6
+
+
+class BackendActions:
+ UPDATE = 'update'
+ GROW = 'grow'
+ REMOVE = 'remove'
+ SHRINK = 'shrink'
+
+
+class Events:
+ CREATE_VIP = 'create_vip'
+ UPDATE_VIP = 'update_vip'
+ DELETE_VIP = 'delete_vip'
+ UPDATE_POOL = 'update_pool'
+ UPDATE_MEMBER = 'update_member'
+ ADD_OR_UPDATE_MEMBER = 'add_or_update_member'
+ REMOVE_MEMBER = 'remove_member'
+ DELETE_MEMBER = 'delete_member'
+ POLL_GRAPH = 'poll_graph'
+ ADD_POOL_HM = "create_pool_hm"
+ UPDATE_POOL_HM = "update_pool_hm"
+ DELETE_POOL_HM = "delete_pool_hm"
+
+
+_DVA_PENDING_ERROR_MSG = _('Dva is pending for the following reason: %s')
+_DVA_NOT_FOUNT_ERROR_MSG = _('%s, '
+ 'probably was cancelled through the heleos UI')
+_DVA_BROKEN_ERROR_MSG = _('Dva seems to be broken for reason %s')
+_DVA_CREATION_FAILED_ERROR_MSG = _('Dva creation failed reason %s')
+_DVA_CREATION_PENDING_ERROR_MSG = _('Dva creation is in pending state '
+ 'for reason %s')
+_CFG_FAILED_ERROR_MSG = _('Dva configuration failed for reason %s')
+_DVA_DEL_FAILED_ERROR_MSG = _('Failed to delete the backend '
+ 'load balancer for reason %s. Please remove '
+ 'it manually through the heleos UI')
+NO_MEMBER_SUBNET_WARN = _('No subnet is associated to member %s (required '
+ 'to identify the proper load balancer port)')
+
+error_map = {h_exc.PendingDva: _DVA_PENDING_ERROR_MSG,
+ h_exc.DvaNotFound: _DVA_NOT_FOUNT_ERROR_MSG,
+ h_exc.BrokenDva: _DVA_BROKEN_ERROR_MSG,
+ h_exc.DvaCreationFailed: _DVA_CREATION_FAILED_ERROR_MSG,
+ h_exc.DvaCreationPending: _DVA_CREATION_PENDING_ERROR_MSG,
+ h_exc.ConfigurationFailed: _CFG_FAILED_ERROR_MSG,
+ h_exc.DvaDeleteFailed: _DVA_DEL_FAILED_ERROR_MSG}
+
+state_map = {h_con.DvaState.POWER_ON: ccon.ACTIVE,
+ None: ccon.ERROR,
+ DELETED: DELETED}
--- /dev/null
+# Copyright 2014 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+import neutron.db.api as db
+from neutron.db import models_v2 as nmodel
+from neutron.services.loadbalancer.drivers.embrane import models
+
+
+def initialize():
+ db.configure_db()
+
+
+def add_pool_port(context, pool_id, port_id):
+ session = context.session
+ with session.begin(subtransactions=True):
+ pool_port = models.PoolPort()
+ pool_port.pool_id = pool_id
+ pool_port.port_id = port_id
+ session.add(pool_port)
+
+
+def get_pool_port(context, pool_id):
+ return (context.session.query(models.PoolPort).filter_by(pool_id=pool_id).
+ first())
+
+
+def delete_pool_backend(context, pool_id):
+ session = context.session
+ backend = (session.query(models.PoolPort).filter_by(
+ pool_id=pool_id))
+ for b in backend:
+ delete_pool_port(context, b)
+
+
+def delete_pool_port(context, backend_port):
+ session = context.session
+ with session.begin(subtransactions=True):
+ port = (session.query(nmodel.Port).filter_by(
+ id=backend_port['port_id'])).first()
+ if port:
+ session.delete(backend_port)
+ session.delete(port)
--- /dev/null
+# Copyright 2014 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+from heleosapi import backend_operations as h_op
+from heleosapi import constants as h_con
+from heleosapi import info as h_info
+from oslo.config import cfg
+
+from neutron.api.v2 import attributes
+from neutron.common import exceptions as n_exc
+from neutron.db.loadbalancer import loadbalancer_db as ldb
+from neutron.extensions import loadbalancer as lb_ext
+from neutron.openstack.common import log as logging
+from neutron.plugins.common import constants as pcon
+from neutron.plugins.embrane.common import contexts as embrane_ctx
+from neutron.plugins.embrane.common import exceptions as h_exc
+from neutron.plugins.embrane.common import utils
+from neutron.services.loadbalancer import constants as lbcon
+from neutron.services.loadbalancer.drivers import abstract_driver
+from neutron.services.loadbalancer.drivers.embrane.agent import dispatcher
+from neutron.services.loadbalancer.drivers.embrane import config # noqa
+from neutron.services.loadbalancer.drivers.embrane import constants as econ
+from neutron.services.loadbalancer.drivers.embrane import db as edb
+from neutron.services.loadbalancer.drivers.embrane import poller
+
+LOG = logging.getLogger(__name__)
+conf = cfg.CONF.heleoslb
+confh = {}
+
+try:
+ confh = cfg.CONF.heleos
+except cfg.NoSuchOptError:
+ pass
+
+
+def get_conf(x):
+ try:
+ return conf.get(x) or confh.get(x)
+ except cfg.NoSuchOptError:
+ return
+
+
+class EmbraneLbaas(abstract_driver.LoadBalancerAbstractDriver):
+ def __init__(self, plugin):
+ edb.initialize()
+ config_esm_mgmt = get_conf('esm_mgmt')
+ config_admin_username = get_conf('admin_username')
+ config_admin_password = get_conf('admin_password')
+ config_lb_image_id = get_conf('lb_image')
+ config_security_zones = {h_con.SzType.IB: get_conf('inband_id'),
+ h_con.SzType.OOB: get_conf('oob_id'),
+ h_con.SzType.MGMT: get_conf('mgmt_id'),
+ h_con.SzType.DUMMY: get_conf('dummy_utif_id')}
+ config_resource_pool = get_conf('resource_pool_id')
+ self._heleos_api = h_op.BackendOperations(
+ esm_mgmt=config_esm_mgmt,
+ admin_username=config_admin_username,
+ admin_password=config_admin_password,
+ lb_image_id=config_lb_image_id,
+ security_zones=config_security_zones,
+ resource_pool=config_resource_pool)
+ self._dispatcher = dispatcher.Dispatcher(
+ self, get_conf("async_requests"))
+ self.plugin = plugin
+ poll_interval = conf.get('sync_interval')
+ if poll_interval > 0:
+ self._loop_call = poller.Poller(self)
+ self._loop_call.start_polling(conf.get('sync_interval'))
+ self._flavor = get_conf('lb_flavor')
+
+ def _validate_vip(self, vip):
+ if vip.get('connection_limit') and vip['connection_limit'] != -1:
+ raise h_exc.UnsupportedException(
+ err_msg=_('Connection limit is not supported by Embrane LB'))
+ persistance = vip.get('session_persistence')
+ if (persistance and persistance.get('type') ==
+ lbcon.SESSION_PERSISTENCE_APP_COOKIE):
+ p_type = vip['session_persistence']['type']
+ raise h_exc.UnsupportedException(
+ err_msg=_('Session persistence %s '
+ 'not supported by Embrane LBaaS') % p_type)
+
+ def _delete_vip(self, context, vip):
+ with context.session.begin(subtransactions=True):
+ self.plugin._delete_db_vip(context, vip['id'])
+ return econ.DELETED
+
+ def _delete_member(self, context, member):
+ self.plugin._delete_db_member(context, member['id'])
+
+ def _delete_pool_hm(self, context, health_monitor, pool_id):
+ self.plugin._delete_db_pool_health_monitor(context,
+ health_monitor['id'],
+ pool_id)
+
+ def _update_vip_graph_state(self, context, vip):
+ self._heleos_api.update_vip_status(vip)
+ self.plugin.update_status(context, ldb.Vip, vip['id'],
+ vip['status'])
+ if vip['status'] != pcon.ERROR:
+ pool = self.plugin.get_pool(context, vip['pool_id'])
+ pool_members = pool['members']
+ # Manages possible manual changes and monitor actions
+ self._heleos_api.update_pool_status(vip['id'], pool)
+ self._heleos_api.update_members_status(vip['id'], pool['id'],
+ pool_members)
+ self.plugin.update_status(context, ldb.Pool, pool['id'],
+ pool['status'])
+ for member in pool_members:
+ self.plugin.update_status(context, ldb.Member,
+ member['id'], member['status'])
+
+ def _create_backend_port(self, context, db_pool):
+ try:
+ subnet = self.plugin._core_plugin.get_subnet(context,
+ db_pool["subnet_id"])
+ except n_exc.SubnetNotFound:
+ LOG.warning(_("Subnet assigned to pool %s doesn't exist, "
+ "backend port can't be created"), db_pool['id'])
+ return
+
+ fixed_ip = {'subnet_id': subnet['id'],
+ 'fixed_ips': attributes.ATTR_NOT_SPECIFIED}
+
+ port_data = {
+ 'tenant_id': db_pool['tenant_id'],
+ 'name': 'pool-' + db_pool['id'],
+ 'network_id': subnet['network_id'],
+ 'mac_address': attributes.ATTR_NOT_SPECIFIED,
+ 'admin_state_up': False,
+ 'device_id': '',
+ 'device_owner': '',
+ 'fixed_ips': [fixed_ip]
+ }
+
+ port = self.plugin._core_plugin.create_port(context,
+ {'port': port_data})
+ return edb.add_pool_port(context, db_pool['id'], port['id'])
+
+ def _retrieve_utif_info(self, context, neutron_port):
+ network = self.plugin._core_plugin.get_network(
+ context, neutron_port['network_id'])
+ result = h_info.UtifInfo(network.get('provider:segmentation_id'),
+ network['name'],
+ network['id'],
+ False,
+ network['tenant_id'],
+ neutron_port['id'],
+ neutron_port['mac_address'],
+ network.get('provider:network_type'))
+ return result
+
+ def create_vip(self, context, vip):
+ self._validate_vip(vip)
+ db_vip = self.plugin.populate_vip_graph(context, vip)
+ vip_port = self.plugin._core_plugin._get_port(context,
+ db_vip['port_id'])
+ vip_utif_info = self._retrieve_utif_info(context, vip_port)
+ vip_ip_allocation_info = utils.retrieve_ip_allocation_info(
+ context, vip_port)
+ vip_ip_allocation_info.is_gw = True
+ db_pool = pool_utif_info = pool_ip_allocation_info = None
+ members = monitors = []
+ if db_vip['pool_id']:
+ db_pool = self.plugin.get_pool(
+ context, db_vip['pool_id'])
+ pool_port = edb.get_pool_port(context, db_pool["id"])
+ if pool_port:
+ db_port = self.plugin._core_plugin._get_port(
+ context, pool_port["port_id"])
+ pool_utif_info = self._retrieve_utif_info(context, db_port)
+ pool_ip_allocation_info = utils.retrieve_ip_allocation_info(
+ context, db_port)
+ members = self.plugin.get_members(
+ context, filters={'id': db_pool['members']})
+ monitors = self.plugin.get_members(
+ context, filters={'id': db_pool['health_monitors']})
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(econ.Events.CREATE_VIP,
+ db_vip, context, None),
+ self._flavor, vip_utif_info, vip_ip_allocation_info,
+ pool_utif_info, pool_ip_allocation_info, db_pool, members,
+ monitors)
+
+ def update_vip(self, context, old_vip, vip):
+ new_pool = old_port_id = removed_ip = None
+ new_pool_utif = new_pool_ip_allocation = None
+ old_pool = {}
+ members = monitors = []
+ if old_vip['pool_id'] != vip['pool_id']:
+ new_pool = self.plugin.get_pool(
+ context, vip['pool_id'])
+ members = self.plugin.get_members(
+ context, filters={'id': new_pool['members']})
+ monitors = self.plugin.get_members(
+ context, filters={'id': new_pool['health_monitors']})
+ new_pool_port = edb.get_pool_port(context, new_pool["id"])
+ if new_pool_port:
+ db_port = self.plugin._core_plugin._get_port(
+ context, new_pool_port["port_id"])
+ new_pool_utif = self._retrieve_utif_info(context, db_port)
+ new_pool_ip_allocation = utils.retrieve_ip_allocation_info(
+ context, db_port)
+ old_pool = self.plugin.get_pool(
+ context, old_vip['pool_id'])
+ old_pool_port = edb.get_pool_port(context, old_pool["id"])
+ if old_pool_port:
+ old_port = self.plugin._core_plugin._get_port(
+ context, old_pool_port['port_id'])
+ # remove that subnet ip
+ removed_ip = old_port['fixed_ips'][0]['ip_address']
+ old_port_id = old_port['id']
+
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(econ.Events.UPDATE_VIP, vip,
+ context, None),
+ old_pool.get('id'), old_port_id, removed_ip, new_pool_utif,
+ new_pool_ip_allocation, new_pool, members, monitors)
+
+ def delete_vip(self, context, vip):
+ db_vip = self.plugin.populate_vip_graph(context, vip)
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(
+ econ.Events.DELETE_VIP, db_vip, context, None))
+
+ def create_pool(self, context, pool):
+ if pool['subnet_id']:
+ self._create_backend_port(context, pool)
+
+ def update_pool(self, context, old_pool, pool):
+ with context.session.begin(subtransactions=True):
+ if old_pool['vip_id']:
+ try:
+ db_vip = self.plugin._get_resource(
+ context, ldb.Vip, old_pool['vip_id'])
+ except lb_ext.VipNotFound:
+ return
+ monitors = self.plugin.get_members(
+ context, filters={'id': old_pool['health_monitors']})
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(econ.Events.UPDATE_POOL,
+ db_vip, context, None),
+ pool, monitors)
+
+ def delete_pool(self, context, pool):
+ edb.delete_pool_backend(context, pool['id'])
+ self.plugin._delete_db_pool(context, pool['id'])
+
+ def create_member(self, context, member):
+ db_pool = self.plugin.get_pool(context, member['pool_id'])
+ if db_pool['vip_id']:
+ db_vip = self.plugin._get_resource(context, ldb.Vip,
+ db_pool['vip_id'])
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(
+ econ.Events.ADD_OR_UPDATE_MEMBER, db_vip, context, None),
+ member, db_pool['protocol'])
+
+ def update_member(self, context, old_member, member):
+ db_pool = self.plugin.get_pool(context, member['pool_id'])
+ if member['pool_id'] != old_member['pool_id']:
+ old_pool = self.plugin.get_pool(context, old_member['pool_id'])
+ if old_pool['vip_id']:
+ db_vip = self.plugin._get_resource(context, ldb.Vip,
+ old_pool['vip_id'])
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(
+ econ.Events.REMOVE_MEMBER, db_vip, context, None),
+ old_member)
+ if db_pool['vip_id']:
+ db_vip = self.plugin._get_resource(
+ context, ldb.Vip, db_pool['vip_id'])
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(
+ econ.Events.ADD_OR_UPDATE_MEMBER, db_vip, context, None),
+ member, db_pool['protocol'])
+
+ def delete_member(self, context, member):
+ db_pool = self.plugin.get_pool(context, member['pool_id'])
+ if db_pool['vip_id']:
+ db_vip = self.plugin._get_resource(context, ldb.Vip,
+ db_pool['vip_id'])
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(
+ econ.Events.DELETE_MEMBER, db_vip, context, None),
+ member)
+ else:
+ self._delete_member(context, member)
+
+ def stats(self, context, pool_id):
+ return {'bytes_in': 0,
+ 'bytes_out': 0,
+ 'active_connections': 0,
+ 'total_connections': 0}
+
+ def create_pool_health_monitor(self, context, health_monitor, pool_id):
+ db_pool = self.plugin.get_pool(context, pool_id)
+ # API call only if vip exists
+ if db_pool['vip_id']:
+ db_vip = self.plugin._get_resource(context, ldb.Vip,
+ db_pool['vip_id'])
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(
+ econ.Events.ADD_POOL_HM, db_vip, context, None),
+ health_monitor, pool_id)
+
+ def update_pool_health_monitor(self, context, old_health_monitor,
+ health_monitor, pool_id):
+ db_pool = self.plugin.get_pool(context, pool_id)
+ if db_pool['vip_id']:
+ db_vip = self.plugin._get_resource(context, ldb.Vip,
+ db_pool['vip_id'])
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(
+ econ.Events.UPDATE_POOL_HM, db_vip, context, None),
+ health_monitor, pool_id)
+
+ def delete_pool_health_monitor(self, context, health_monitor, pool_id):
+ db_pool = self.plugin.get_pool(context, pool_id)
+ if db_pool['vip_id']:
+ db_vip = self.plugin._get_resource(context, ldb.Vip,
+ db_pool['vip_id'])
+ self._dispatcher.dispatch_lb(
+ embrane_ctx.DispatcherContext(
+ econ.Events.DELETE_POOL_HM, db_vip, context, None),
+ health_monitor, pool_id)
+ else:
+ self._delete_pool_hm(context, health_monitor, pool_id)
--- /dev/null
+# Copyright 2014 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+import sqlalchemy as sql
+
+from neutron.db.models_v2 import model_base
+
+
+class PoolPort(model_base.BASEV2):
+ """Represents the connection between pools and ports."""
+ __tablename__ = 'embrane_pool_port'
+
+ pool_id = sql.Column(sql.String(36), sql.ForeignKey('pools.id'),
+ primary_key=True)
+ port_id = sql.Column(sql.String(36), sql.ForeignKey('ports.id'),
+ nullable=False)
--- /dev/null
+# Copyright 2014 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+from heleosapi import exceptions as h_exc
+
+from neutron import context
+from neutron.db.loadbalancer import loadbalancer_db as ldb
+from neutron.db import servicetype_db as sdb
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
+from neutron.plugins.common import constants as ccon
+from neutron.plugins.embrane.common import contexts as embrane_ctx
+from neutron.services.loadbalancer.drivers.embrane import constants as econ
+
+LOG = logging.getLogger(__name__)
+skip_states = [ccon.PENDING_CREATE,
+ ccon.PENDING_DELETE,
+ ccon.PENDING_UPDATE,
+ ccon.ERROR]
+
+
+class Poller(object):
+ def __init__(self, driver):
+ self.dispatcher = driver._dispatcher
+ service_type_manager = sdb.ServiceTypeManager.get_instance()
+ self.provider = (service_type_manager.get_service_providers(
+ None, filters={
+ 'service_type': [ccon.LOADBALANCER],
+ 'driver': ['neutron.services.loadbalancer.drivers.'
+ 'embrane.driver.EmbraneLbaas']}))[0]['name']
+
+ def start_polling(self, interval):
+ loop_call = loopingcall.FixedIntervalLoopingCall(self._run)
+ loop_call.start(interval=interval)
+ return loop_call
+
+ def _run(self):
+ ctx = context.get_admin_context()
+ try:
+ self.synchronize_vips(ctx)
+ except h_exc.PollingException as e:
+ LOG.exception(_('Unhandled exception occurred'), e)
+
+ def synchronize_vips(self, ctx):
+ session = ctx.session
+ vips = session.query(ldb.Vip).join(
+ sdb.ProviderResourceAssociation,
+ sdb.ProviderResourceAssociation.resource_id ==
+ ldb.Vip.pool_id).filter(
+ sdb.ProviderResourceAssociation.provider_name == self.provider)
+ # No need to check pending states
+ for vip in vips:
+ if vip['status'] not in skip_states:
+ self.dispatcher.dispatch_lb(
+ d_context=embrane_ctx.DispatcherContext(
+ econ.Events.POLL_GRAPH, vip, ctx, None),
+ args=())
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc.
+
+import sys
+
+import mock
+from oslo.config import cfg
+
+from neutron.services.loadbalancer.drivers.embrane import config # noqa
+from neutron.tests import base
+
+sys.modules["heleosapi"] = mock.Mock()
+
+
+class ConfigurationTest(base.BaseTestCase):
+
+ def test_defaults(self):
+ self.assertEqual('small', cfg.CONF.heleoslb.lb_flavor)
+ self.assertEqual(60, cfg.CONF.heleoslb.sync_interval)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Embrane, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
+
+import sys
+
+import mock
+sys.modules["heleosapi"] = mock.Mock()
+from oslo.config import cfg
+
+from neutron import context
+from neutron.openstack.common.db import exception as n_exc
+from neutron.services.loadbalancer.drivers.embrane import config # noqa
+from neutron.services.loadbalancer.drivers.embrane import constants as h_con
+from neutron.services.loadbalancer.drivers.embrane import db as h_db
+from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
+
+
+EMBRANE_PROVIDER = ('LOADBALANCER:lbaas:neutron.services.'
+ 'loadbalancer.drivers.embrane.driver.'
+ 'EmbraneLbaas:default')
+
+
+class TestLoadBalancerPluginBase(
+ test_db_loadbalancer.LoadBalancerPluginDbTestCase):
+
+ def setUp(self):
+ cfg.CONF.set_override('admin_password', "admin123", 'heleoslb')
+ cfg.CONF.set_override('sync_interval', 0, 'heleoslb')
+
+ super(TestLoadBalancerPluginBase, self).setUp(
+ lbaas_provider=EMBRANE_PROVIDER)
+ self.driver = self.plugin.drivers['lbaas']
+
+
+class TestLoadBalancerPlugin(test_db_loadbalancer.TestLoadBalancer,
+ TestLoadBalancerPluginBase):
+
+ def test_create_vip_with_session_persistence_with_app_cookie(self):
+ self.skip("App cookie persistence not supported.")
+
+ def test_pool_port(self):
+ with self.port(no_delete=True) as port:
+ with self.pool() as pool:
+ h_db.add_pool_port(context.get_admin_context(),
+ pool['pool']['id'], port['port']['id'])
+ pool_port = h_db.get_pool_port(context.get_admin_context(),
+ pool['pool']['id'])
+ self.assertIsNotNone(pool_port)
+ pool_port = h_db.get_pool_port(context.get_admin_context(),
+ pool['pool']['id'])
+ self.assertIsNone(pool_port)
+
+ def test_create_pool_port_no_port(self):
+ with self.pool() as pool:
+ self.assertRaises(n_exc.DBError,
+ h_db.add_pool_port,
+ context.get_admin_context(),
+ pool['pool']['id'], None)
+
+ def test_lb_operations_handlers(self):
+ h = self.driver._dispatcher.handlers
+ self.assertIsNotNone(h[h_con.Events.ADD_OR_UPDATE_MEMBER])
+ self.assertIsNotNone(h[h_con.Events.CREATE_VIP])
+ self.assertIsNotNone(h[h_con.Events.DELETE_MEMBER])
+ self.assertIsNotNone(h[h_con.Events.DELETE_VIP])
+ self.assertIsNotNone(h[h_con.Events.POLL_GRAPH])
+ self.assertIsNotNone(h[h_con.Events.REMOVE_MEMBER])
+ self.assertIsNotNone(h[h_con.Events.UPDATE_POOL])
+ self.assertIsNotNone(h[h_con.Events.UPDATE_VIP])
+ self.assertIsNotNone(h[h_con.Events.UPDATE_POOL_HM])
+ self.assertIsNotNone(h[h_con.Events.DELETE_POOL_HM])
+ self.assertIsNotNone(h[h_con.Events.ADD_POOL_HM])