import sys
sys.path.insert(0, os.getcwd())
-from quantum.plugins.services.agent_loadbalancer.agent import main
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy.agent import main
main()
AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer'
-LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin'
L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent'
-LOADBALANCER_AGENT = 'loadbalancer_agent'
def get_topic_name(prefix, table, operation):
context.session.delete(vip)
if vip.port: # this is a Quantum port
self._core_plugin.delete_port(context, vip.port.id)
- context.session.flush()
def get_vip(self, context, id, fields=None):
vip = self._get_resource(context, Vip, id)
from quantum.agent.common import config
from quantum.agent.linux import interface
-from quantum.common import topics
from quantum.openstack.common.rpc import service as rpc_service
from quantum.openstack.common import service
-from quantum.plugins.services.agent_loadbalancer.agent import manager
-
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+ agent_manager as manager,
+ plugin_driver
+)
OPTS = [
cfg.IntOpt(
mgr = manager.LbaasAgentManager(cfg.CONF)
svc = LbaasAgentService(
host=cfg.CONF.host,
- topic=topics.LOADBALANCER_AGENT,
+ topic=plugin_driver.TOPIC_LOADBALANCER_AGENT,
manager=mgr
)
service.launch(svc).wait()
from oslo.config import cfg
from quantum.agent.common import config
-from quantum.common import topics
from quantum import context
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import periodic_task
-from quantum.plugins.services.agent_loadbalancer.agent import api
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+ agent_api,
+ plugin_driver
+)
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-'
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % conf.device_driver)
ctx = context.get_admin_context_without_session()
- self.plugin_rpc = api.LbaasAgentApi(
- topics.LOADBALANCER_PLUGIN,
+ self.plugin_rpc = agent_api.LbaasAgentApi(
+ plugin_driver.TOPIC_PROCESS_ON_HOST,
ctx,
conf.host
)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import uuid
+
+from oslo.config import cfg
+
+from quantum.common import exceptions as q_exc
+from quantum.common import rpc as q_rpc
+from quantum.db.loadbalancer import loadbalancer_db
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import proxy
+from quantum.plugins.common import constants
+from quantum.plugins.services.agent_loadbalancer.drivers import (
+ abstract_driver
+)
+
+LOG = logging.getLogger(__name__)
+
+ACTIVE_PENDING = (
+ constants.ACTIVE,
+ constants.PENDING_CREATE,
+ constants.PENDING_UPDATE
+)
+
+# topic name for this particular agent implementation
+TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
+TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
+
+
+class LoadBalancerCallbacks(object):
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, plugin):
+ self.plugin = plugin
+
+ def create_rpc_dispatcher(self):
+ return q_rpc.PluginRpcDispatcher([self])
+
+ def get_ready_devices(self, context, host=None):
+ with context.session.begin(subtransactions=True):
+ qry = (context.session.query(loadbalancer_db.Pool.id).
+ join(loadbalancer_db.Vip))
+
+ qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
+ qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
+ up = True # makes pep8 and sqlalchemy happy
+ qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
+ qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
+ return [id for id, in qry]
+
+ def get_logical_device(self, context, pool_id=None, activate=True,
+ **kwargs):
+ with context.session.begin(subtransactions=True):
+ qry = context.session.query(loadbalancer_db.Pool)
+ qry = qry.filter_by(id=pool_id)
+ pool = qry.one()
+
+ if activate:
+ # set all resources to active
+ if pool.status in ACTIVE_PENDING:
+ pool.status = constants.ACTIVE
+
+ if pool.vip.status in ACTIVE_PENDING:
+ pool.vip.status = constants.ACTIVE
+
+ for m in pool.members:
+ if m.status in ACTIVE_PENDING:
+ m.status = constants.ACTIVE
+
+ for hm in pool.monitors:
+ if hm.healthmonitor.status in ACTIVE_PENDING:
+ hm.healthmonitor.status = constants.ACTIVE
+
+ if (pool.status != constants.ACTIVE
+ or pool.vip.status != constants.ACTIVE):
+ raise q_exc.Invalid(_('Expected active pool and vip'))
+
+ retval = {}
+ retval['pool'] = self.plugin._make_pool_dict(pool)
+ retval['vip'] = self.plugin._make_vip_dict(pool.vip)
+ retval['vip']['port'] = (
+ self.plugin._core_plugin._make_port_dict(pool.vip.port)
+ )
+ for fixed_ip in retval['vip']['port']['fixed_ips']:
+ fixed_ip['subnet'] = (
+ self.plugin._core_plugin.get_subnet(
+ context,
+ fixed_ip['subnet_id']
+ )
+ )
+ retval['members'] = [
+ self.plugin._make_member_dict(m)
+ for m in pool.members if m.status == constants.ACTIVE
+ ]
+ retval['healthmonitors'] = [
+ self.plugin._make_health_monitor_dict(hm.healthmonitor)
+ for hm in pool.monitors
+ if hm.healthmonitor.status == constants.ACTIVE
+ ]
+
+ return retval
+
+ def pool_destroyed(self, context, pool_id=None, host=None):
+ """Agent confirmation hook that a pool has been destroyed.
+
+ This method exists for subclasses to change the deletion
+ behavior.
+ """
+ pass
+
+ def plug_vip_port(self, context, port_id=None, host=None):
+ if not port_id:
+ return
+
+ try:
+ port = self.plugin._core_plugin.get_port(
+ context,
+ port_id
+ )
+ except q_exc.PortNotFound:
+ msg = _('Unable to find port %s to plug.')
+ LOG.debug(msg, port_id)
+ return
+
+ port['admin_state_up'] = True
+ port['device_owner'] = 'quantum:' + constants.LOADBALANCER
+ port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
+
+ self.plugin._core_plugin.update_port(
+ context,
+ port_id,
+ {'port': port}
+ )
+
+ def unplug_vip_port(self, context, port_id=None, host=None):
+ if not port_id:
+ return
+
+ try:
+ port = self.plugin._core_plugin.get_port(
+ context,
+ port_id
+ )
+ except q_exc.PortNotFound:
+ msg = _('Unable to find port %s to unplug. This can occur when '
+ 'the Vip has been deleted first.')
+ LOG.debug(msg, port_id)
+ return
+
+ port['admin_state_up'] = False
+ port['device_owner'] = ''
+ port['device_id'] = ''
+
+ try:
+ self.plugin._core_plugin.update_port(
+ context,
+ port_id,
+ {'port': port}
+ )
+
+ except q_exc.PortNotFound:
+ msg = _('Unable to find port %s to unplug. This can occur when '
+ 'the Vip has been deleted first.')
+ LOG.debug(msg, port_id)
+
+ def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
+ # TODO(markmcclain): add stats collection
+ pass
+
+
+class LoadBalancerAgentApi(proxy.RpcProxy):
+ """Plugin side of plugin to agent RPC API."""
+
+ API_VERSION = '1.0'
+
+ def __init__(self, topic, host):
+ super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
+ self.host = host
+
+ def reload_pool(self, context, pool_id):
+ return self.cast(
+ context,
+ self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
+ topic=self.topic
+ )
+
+ def destroy_pool(self, context, pool_id):
+ return self.cast(
+ context,
+ self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
+ topic=self.topic
+ )
+
+ def modify_pool(self, context, pool_id):
+ return self.cast(
+ context,
+ self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
+ topic=self.topic
+ )
+
+
+class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
+ def __init__(self, plugin):
+ self.agent_rpc = LoadBalancerAgentApi(
+ TOPIC_LOADBALANCER_AGENT,
+ cfg.CONF.host
+ )
+ self.callbacks = LoadBalancerCallbacks(plugin)
+
+ self.conn = rpc.create_connection(new=True)
+ self.conn.create_consumer(
+ TOPIC_PROCESS_ON_HOST,
+ self.callbacks.create_rpc_dispatcher(),
+ fanout=False)
+ self.conn.consume_in_thread()
+ self.plugin = plugin
+
+ def create_vip(self, context, vip):
+ self.agent_rpc.reload_pool(context, vip['pool_id'])
+
+ def update_vip(self, context, old_vip, vip):
+ if vip['status'] in ACTIVE_PENDING:
+ self.agent_rpc.reload_pool(context, vip['pool_id'])
+ else:
+ self.agent_rpc.destroy_pool(context, vip['pool_id'])
+
+ def delete_vip(self, context, vip):
+ self.plugin._delete_db_vip(context, vip['id'])
+ self.agent_rpc.destroy_pool(context, vip['pool_id'])
+
+ def create_pool(self, context, pool):
+ # don't notify here because a pool needs a vip to be useful
+ pass
+
+ def update_pool(self, context, old_pool, pool):
+ if pool['status'] in ACTIVE_PENDING:
+ if pool['vip_id'] is not None:
+ self.agent_rpc.reload_pool(context, pool['id'])
+ else:
+ self.agent_rpc.destroy_pool(context, pool['id'])
+
+ def delete_pool(self, context, pool):
+ self.plugin._delete_db_pool(context, pool['id'])
+ self.agent_rpc.destroy_pool(context, pool['id'])
+
+ def create_member(self, context, member):
+ self.agent_rpc.modify_pool(context, member['pool_id'])
+
+ def update_member(self, context, old_member, member):
+ # member may change pool id
+ if member['pool_id'] != old_member['pool_id']:
+ self.agent_rpc.modify_pool(context, old_member['pool_id'])
+ self.agent_rpc.modify_pool(context, member['pool_id'])
+
+ def delete_member(self, context, member):
+ self.plugin._delete_db_member(context, member['id'])
+ self.agent_rpc.modify_pool(context, member['pool_id'])
+
+ def update_health_monitor(self, context, healthmon, pool_id):
+ # healthmon is unused here because agent will fetch what is necessary
+ self.agent_rpc.modify_pool(context, pool_id)
+
+ def delete_health_monitor(self, context, healthmon_id, pool_id):
+ # healthmon_id is not used in this driver
+ self.agent_rpc.modify_pool(context, pool_id)
+
+ def create_pool_health_monitor(self, context, healthmon, pool_id):
+ # healthmon is not used here
+ self.agent_rpc.modify_pool(context, pool_id)
+
+ def delete_pool_health_monitor(self, context, health_monitor, pool_id):
+ self.plugin._delete_db_pool_health_monitor(
+ context, health_monitor['id'], pool_id
+ )
+
+ # healthmon_id is not used here
+ self.agent_rpc.modify_pool(context, pool_id)
+
+ def create_health_monitor(self, context, health_monitor):
+ pass
+
+ def stats(self, context, pool_id):
+ pass
+++ /dev/null
-#
-# Copyright 2013 Radware LTD.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# @author: Avishay Balderman, Radware
-
-from oslo.config import cfg
-
-from quantum.db import api as qdbapi
-from quantum.db.loadbalancer import loadbalancer_db
-from quantum.openstack.common import importutils
-from quantum.openstack.common import log as logging
-from quantum.plugins.common import constants
-
-LOG = logging.getLogger(__name__)
-
-DEFAULT_DRIVER = ("quantum.plugins.services.agent_loadbalancer"
- ".drivers.noop"
- ".noop_driver.NoopLbaaSDriver")
-
-lbaas_plugin_opts = [
- cfg.StrOpt('driver_fqn',
- default=DEFAULT_DRIVER,
- help=_('LBaaS driver Fully Qualified Name'))
-]
-
-cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
-
-
-class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
-
- """Implementation of the Quantum Loadbalancer Service Plugin.
-
- This class manages the workflow of LBaaS request/response.
- Most DB related works are implemented in class
- loadbalancer_db.LoadBalancerPluginDb.
- """
- supported_extension_aliases = ["lbaas"]
-
- def __init__(self):
- """Initialization for the loadbalancer service plugin."""
-
- qdbapi.register_models()
- self.driver = importutils.import_object(
- cfg.CONF.LBAAS.driver_fqn, self)
-
- def get_plugin_type(self):
- return constants.LOADBALANCER
-
- def get_plugin_description(self):
- return "Quantum LoadBalancer Service Plugin"
-
- def create_vip(self, context, vip):
- v = super(LoadBalancerPlugin, self).create_vip(context, vip)
- self.driver.create_vip(context, v)
- return v
-
- def update_vip(self, context, id, vip):
- if 'status' not in vip['vip']:
- vip['vip']['status'] = constants.PENDING_UPDATE
- old_vip = self.get_vip(context, id)
- v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
- self.driver.update_vip(context, old_vip, v)
- return v
-
- def _delete_db_vip(self, context, id):
- super(LoadBalancerPlugin, self).delete_vip(context, id)
-
- def delete_vip(self, context, id):
- self.update_status(context, loadbalancer_db.Vip,
- id, constants.PENDING_DELETE)
- v = self.get_vip(context, id)
- self.driver.delete_vip(context, v)
-
- def create_pool(self, context, pool):
- p = super(LoadBalancerPlugin, self).create_pool(context, pool)
- self.driver.create_pool(context, p)
- return p
-
- def update_pool(self, context, id, pool):
- if 'status' not in pool['pool']:
- pool['pool']['status'] = constants.PENDING_UPDATE
- old_pool = self.get_pool(context, id)
- p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
- self.driver.update_pool(context, old_pool, p)
- return p
-
- def _delete_db_pool(self, context, id):
- super(LoadBalancerPlugin, self).delete_pool(context, id)
-
- def delete_pool(self, context, id):
- self.update_status(context, loadbalancer_db.Pool,
- id, constants.PENDING_DELETE)
- p = self.get_pool(context, id)
- self.driver.delete_pool(context, p)
-
- def create_member(self, context, member):
- m = super(LoadBalancerPlugin, self).create_member(context, member)
- self.driver.create_member(context, m)
- return m
-
- def update_member(self, context, id, member):
- if 'status' not in member['member']:
- member['member']['status'] = constants.PENDING_UPDATE
- old_member = self.get_member(context, id)
- m = super(LoadBalancerPlugin, self).update_member(context, id, member)
- self.driver.update_member(context, old_member, m)
- return m
-
- def _delete_db_member(self, context, id):
- super(LoadBalancerPlugin, self).delete_member(context, id)
-
- def delete_member(self, context, id):
- self.update_status(context, loadbalancer_db.Member,
- id, constants.PENDING_DELETE)
- m = self.get_member(context, id)
- self.driver.delete_member(context, m)
-
- def create_health_monitor(self, context, health_monitor):
- hm = super(LoadBalancerPlugin, self).create_health_monitor(
- context,
- health_monitor
- )
- self.driver.create_health_monitor(context, hm)
- return hm
-
- def update_health_monitor(self, context, id, health_monitor):
- if 'status' not in health_monitor['health_monitor']:
- health_monitor['health_monitor']['status'] = (
- constants.PENDING_UPDATE
- )
- old_hm = self.get_health_monitor(context, id)
- hm = super(LoadBalancerPlugin, self).update_health_monitor(
- context,
- id,
- health_monitor
- )
-
- with context.session.begin(subtransactions=True):
- qry = context.session.query(
- loadbalancer_db.PoolMonitorAssociation
- )
- qry = qry.filter_by(monitor_id=hm['id'])
- assocs = qry.all()
- for assoc in assocs:
- self.driver.update_health_monitor(context, old_hm, hm, assoc)
- return hm
-
- def _delete_db_pool_health_monitor(self, context, hm_id, pool_id):
- super(LoadBalancerPlugin, self).delete_pool_health_monitor(context,
- hm_id,
- pool_id)
-
- def delete_health_monitor(self, context, id):
- with context.session.begin(subtransactions=True):
- qry = context.session.query(
- loadbalancer_db.PoolMonitorAssociation
- )
- qry = qry.filter_by(monitor_id=id)
- assocs = qry.all()
- hm = self.get_health_monitor(context, id)
- for assoc in assocs:
- self.driver.delete_pool_health_monitor(context,
- hm,
- assoc['pool_id'])
-
- def create_pool_health_monitor(self, context, health_monitor, pool_id):
- retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
- context,
- health_monitor,
- pool_id
- )
- # open issue: PoolMonitorAssociation has no status field
- # so we cant set the status to pending and let the driver
- # set the real status of the association
- self.driver.create_pool_health_monitor(
- context, health_monitor, pool_id)
- return retval
-
- def delete_pool_health_monitor(self, context, id, pool_id):
- hm = self.get_health_monitor(context, id)
- self.driver.delete_pool_health_monitor(
- context, hm, pool_id)
-
- def stats(self, context, pool_id):
- stats_data = self.driver.stats(context, pool_id)
- # if we get something from the driver -
- # update the db and return the value from db
- # else - return what we have in db
- if stats_data:
- super(LoadBalancerPlugin, self)._update_pool_stats(
- context,
- pool_id,
- stats_data
- )
- return super(LoadBalancerPlugin, self).stats(context,
- pool_id)
-
- def populate_vip_graph(self, context, vip):
- """Populate the vip with: pool, members, healthmonitors."""
-
- pool = self.get_pool(context, vip['pool_id'])
- vip['pool'] = pool
- vip['members'] = [
- self.get_member(context, member_id)
- for member_id in pool['members']]
- vip['health_monitors'] = [
- self.get_health_monitor(context, hm_id)
- for hm_id in pool['health_monitors']]
- return vip
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2013 OpenStack Foundation.
-# All Rights Reserved.
+#
+# Copyright 2013 Radware LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-import uuid
+#
+# @author: Avishay Balderman, Radware
from oslo.config import cfg
-from quantum.common import exceptions as q_exc
-from quantum.common import rpc as q_rpc
-from quantum.common import topics
from quantum.db import api as qdbapi
from quantum.db.loadbalancer import loadbalancer_db
+from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
-from quantum.openstack.common import rpc
-from quantum.openstack.common.rpc import proxy
from quantum.plugins.common import constants
LOG = logging.getLogger(__name__)
-ACTIVE_PENDING = (
- constants.ACTIVE,
- constants.PENDING_CREATE,
- constants.PENDING_UPDATE
-)
-
-
-class LoadBalancerCallbacks(object):
- RPC_API_VERSION = '1.0'
-
- def __init__(self, plugin):
- self.plugin = plugin
-
- def create_rpc_dispatcher(self):
- return q_rpc.PluginRpcDispatcher([self])
-
- def get_ready_devices(self, context, host=None):
- with context.session.begin(subtransactions=True):
- qry = (context.session.query(loadbalancer_db.Pool.id).
- join(loadbalancer_db.Vip))
-
- qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
- qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
- up = True # makes pep8 and sqlalchemy happy
- qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
- qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
- return [id for id, in qry]
-
- def get_logical_device(self, context, pool_id=None, activate=True,
- **kwargs):
- with context.session.begin(subtransactions=True):
- qry = context.session.query(loadbalancer_db.Pool)
- qry = qry.filter_by(id=pool_id)
- pool = qry.one()
-
- if activate:
- # set all resources to active
- if pool.status in ACTIVE_PENDING:
- pool.status = constants.ACTIVE
-
- if pool.vip.status in ACTIVE_PENDING:
- pool.vip.status = constants.ACTIVE
-
- for m in pool.members:
- if m.status in ACTIVE_PENDING:
- m.status = constants.ACTIVE
-
- for hm in pool.monitors:
- if hm.healthmonitor.status in ACTIVE_PENDING:
- hm.healthmonitor.status = constants.ACTIVE
-
- if (pool.status != constants.ACTIVE
- or pool.vip.status != constants.ACTIVE):
- raise q_exc.Invalid(_('Expected active pool and vip'))
-
- retval = {}
- retval['pool'] = self.plugin._make_pool_dict(pool)
- retval['vip'] = self.plugin._make_vip_dict(pool.vip)
- retval['vip']['port'] = (
- self.plugin._core_plugin._make_port_dict(pool.vip.port)
- )
- for fixed_ip in retval['vip']['port']['fixed_ips']:
- fixed_ip['subnet'] = (
- self.plugin._core_plugin.get_subnet(
- context,
- fixed_ip['subnet_id']
- )
- )
- retval['members'] = [
- self.plugin._make_member_dict(m)
- for m in pool.members if m.status == constants.ACTIVE
- ]
- retval['healthmonitors'] = [
- self.plugin._make_health_monitor_dict(hm.healthmonitor)
- for hm in pool.monitors
- if hm.healthmonitor.status == constants.ACTIVE
- ]
-
- return retval
-
- def pool_destroyed(self, context, pool_id=None, host=None):
- """Agent confirmation hook that a pool has been destroyed.
-
- This method exists for subclasses to change the deletion
- behavior.
- """
- pass
-
- def plug_vip_port(self, context, port_id=None, host=None):
- if not port_id:
- return
-
- try:
- port = self.plugin._core_plugin.get_port(
- context,
- port_id
- )
- except q_exc.PortNotFound:
- msg = _('Unable to find port %s to plug.')
- LOG.debug(msg, port_id)
- return
-
- port['admin_state_up'] = True
- port['device_owner'] = 'quantum:' + constants.LOADBALANCER
- port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
-
- self.plugin._core_plugin.update_port(
- context,
- port_id,
- {'port': port}
- )
-
- def unplug_vip_port(self, context, port_id=None, host=None):
- if not port_id:
- return
-
- try:
- port = self.plugin._core_plugin.get_port(
- context,
- port_id
- )
- except q_exc.PortNotFound:
- msg = _('Unable to find port %s to unplug. This can occur when '
- 'the Vip has been deleted first.')
- LOG.debug(msg, port_id)
- return
-
- port['admin_state_up'] = False
- port['device_owner'] = ''
- port['device_id'] = ''
-
- try:
- self.plugin._core_plugin.update_port(
- context,
- port_id,
- {'port': port}
- )
-
- except q_exc.PortNotFound:
- msg = _('Unable to find port %s to unplug. This can occur when '
- 'the Vip has been deleted first.')
- LOG.debug(msg, port_id)
-
- def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
- # TODO(markmcclain): add stats collection
- pass
-
+DEFAULT_DRIVER = ("quantum.plugins.services.agent_loadbalancer"
+ ".drivers.haproxy"
+ ".plugin_driver.HaproxyOnHostPluginDriver")
-class LoadBalancerAgentApi(proxy.RpcProxy):
- """Plugin side of plugin to agent RPC API."""
+lbaas_plugin_opts = [
+ cfg.StrOpt('driver_fqn',
+ default=DEFAULT_DRIVER,
+ help=_('LBaaS driver Fully Qualified Name'))
+]
- API_VERSION = '1.0'
-
- def __init__(self, topic, host):
- super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
- self.host = host
-
- def reload_pool(self, context, pool_id):
- return self.cast(
- context,
- self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
- topic=self.topic
- )
-
- def destroy_pool(self, context, pool_id):
- return self.cast(
- context,
- self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
- topic=self.topic
- )
-
- def modify_pool(self, context, pool_id):
- return self.cast(
- context,
- self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
- topic=self.topic
- )
+cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
supported_extension_aliases = ["lbaas"]
def __init__(self):
- """Do the initialization for the loadbalancer service plugin here."""
- qdbapi.register_models()
+ """Initialization for the loadbalancer service plugin."""
- self.callbacks = LoadBalancerCallbacks(self)
+ qdbapi.register_models()
+ self._load_drivers()
- self.conn = rpc.create_connection(new=True)
- self.conn.create_consumer(
- topics.LOADBALANCER_PLUGIN,
- self.callbacks.create_rpc_dispatcher(),
- fanout=False)
- self.conn.consume_in_thread()
+ def _load_drivers(self):
+ """Loads plugin-driver from configuration.
- self.agent_rpc = LoadBalancerAgentApi(
- topics.LOADBALANCER_AGENT,
- cfg.CONF.host
- )
+ That method will later leverage service type framework
+ """
+ try:
+ self.driver = importutils.import_object(
+ cfg.CONF.LBAAS.driver_fqn, self)
+ except ImportError:
+ LOG.exception(_("Error loading LBaaS driver %s"),
+ cfg.CONF.LBAAS.driver_fqn)
def get_plugin_type(self):
return constants.LOADBALANCER
return "Quantum LoadBalancer Service Plugin"
def create_vip(self, context, vip):
- vip['vip']['status'] = constants.PENDING_CREATE
v = super(LoadBalancerPlugin, self).create_vip(context, vip)
- self.agent_rpc.reload_pool(context, v['pool_id'])
+ self.driver.create_vip(context, v)
return v
def update_vip(self, context, id, vip):
if 'status' not in vip['vip']:
vip['vip']['status'] = constants.PENDING_UPDATE
+ old_vip = self.get_vip(context, id)
v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
- if v['status'] in ACTIVE_PENDING:
- self.agent_rpc.reload_pool(context, v['pool_id'])
- else:
- self.agent_rpc.destroy_pool(context, v['pool_id'])
+ self.driver.update_vip(context, old_vip, v)
return v
- def delete_vip(self, context, id):
- vip = self.get_vip(context, id)
+ def _delete_db_vip(self, context, id):
+ # proxy the call until plugin inherits from DBPlugin
super(LoadBalancerPlugin, self).delete_vip(context, id)
- self.agent_rpc.destroy_pool(context, vip['pool_id'])
+
+ def delete_vip(self, context, id):
+ self.update_status(context, loadbalancer_db.Vip,
+ id, constants.PENDING_DELETE)
+ v = self.get_vip(context, id)
+ self.driver.delete_vip(context, v)
def create_pool(self, context, pool):
p = super(LoadBalancerPlugin, self).create_pool(context, pool)
- # don't notify here because a pool needs a vip to be useful
+ self.driver.create_pool(context, p)
return p
def update_pool(self, context, id, pool):
if 'status' not in pool['pool']:
pool['pool']['status'] = constants.PENDING_UPDATE
+ old_pool = self.get_pool(context, id)
p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
- if p['status'] in ACTIVE_PENDING:
- if p['vip_id'] is not None:
- self.agent_rpc.reload_pool(context, p['id'])
- else:
- self.agent_rpc.destroy_pool(context, p['id'])
+ self.driver.update_pool(context, old_pool, p)
return p
- def delete_pool(self, context, id):
+ def _delete_db_pool(self, context, id):
+ # proxy the call until plugin inherits from DBPlugin
super(LoadBalancerPlugin, self).delete_pool(context, id)
- self.agent_rpc.destroy_pool(context, id)
+
+ def delete_pool(self, context, id):
+ self.update_status(context, loadbalancer_db.Pool,
+ id, constants.PENDING_DELETE)
+ p = self.get_pool(context, id)
+ self.driver.delete_pool(context, p)
def create_member(self, context, member):
m = super(LoadBalancerPlugin, self).create_member(context, member)
- self.agent_rpc.modify_pool(context, m['pool_id'])
+ self.driver.create_member(context, m)
return m
def update_member(self, context, id, member):
if 'status' not in member['member']:
member['member']['status'] = constants.PENDING_UPDATE
+ old_member = self.get_member(context, id)
m = super(LoadBalancerPlugin, self).update_member(context, id, member)
- self.agent_rpc.modify_pool(context, m['pool_id'])
+ self.driver.update_member(context, old_member, m)
return m
+ def _delete_db_member(self, context, id):
+ # proxy the call until plugin inherits from DBPlugin
+ super(LoadBalancerPlugin, self).delete_member(context, id)
+
def delete_member(self, context, id):
+ self.update_status(context, loadbalancer_db.Member,
+ id, constants.PENDING_DELETE)
m = self.get_member(context, id)
- super(LoadBalancerPlugin, self).delete_member(context, id)
- self.agent_rpc.modify_pool(context, m['pool_id'])
+ self.driver.delete_member(context, m)
+
+ def create_health_monitor(self, context, health_monitor):
+ # no PENDING_CREATE status sinse healthmon is shared DB object
+ hm = super(LoadBalancerPlugin, self).create_health_monitor(
+ context,
+ health_monitor
+ )
+ self.driver.create_health_monitor(context, hm)
+ return hm
def update_health_monitor(self, context, id, health_monitor):
if 'status' not in health_monitor['health_monitor']:
health_monitor['health_monitor']['status'] = (
constants.PENDING_UPDATE
)
+ old_hm = self.get_health_monitor(context, id)
hm = super(LoadBalancerPlugin, self).update_health_monitor(
context,
id,
with context.session.begin(subtransactions=True):
qry = context.session.query(
loadbalancer_db.PoolMonitorAssociation
- )
- qry = qry.filter_by(monitor_id=hm['id'])
-
+ ).filter_by(monitor_id=hm['id'])
for assoc in qry:
- self.agent_rpc.modify_pool(context, assoc['pool_id'])
+ self.driver.update_health_monitor(context, old_hm, hm, assoc)
return hm
+ def _delete_db_pool_health_monitor(self, context, hm_id, pool_id):
+ super(LoadBalancerPlugin, self).delete_pool_health_monitor(context,
+ hm_id,
+ pool_id)
+
def delete_health_monitor(self, context, id):
with context.session.begin(subtransactions=True):
+ hm = self.get_health_monitor(context, id)
qry = context.session.query(
loadbalancer_db.PoolMonitorAssociation
- )
- qry = qry.filter_by(monitor_id=id)
-
- pool_ids = [a['pool_id'] for a in qry]
- super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
- for pid in pool_ids:
- self.agent_rpc.modify_pool(context, pid)
+ ).filter_by(monitor_id=id)
+ for assoc in qry:
+ self.driver.delete_pool_health_monitor(context,
+ hm,
+ assoc['pool_id'])
def create_pool_health_monitor(self, context, health_monitor, pool_id):
retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
health_monitor,
pool_id
)
- self.agent_rpc.modify_pool(context, pool_id)
-
+ # open issue: PoolMonitorAssociation has no status field
+ # so we cant set the status to pending and let the driver
+ # set the real status of the association
+ self.driver.create_pool_health_monitor(
+ context, health_monitor, pool_id)
return retval
def delete_pool_health_monitor(self, context, id, pool_id):
- retval = super(LoadBalancerPlugin, self).delete_pool_health_monitor(
- context,
- id,
- pool_id
- )
- self.agent_rpc.modify_pool(context, pool_id)
-
- return retval
+ hm = self.get_health_monitor(context, id)
+ self.driver.delete_pool_health_monitor(
+ context, hm, pool_id)
+
+ def stats(self, context, pool_id):
+ stats_data = self.driver.stats(context, pool_id)
+ # if we get something from the driver -
+ # update the db and return the value from db
+ # else - return what we have in db
+ if stats_data:
+ super(LoadBalancerPlugin, self)._update_pool_stats(
+ context,
+ pool_id,
+ stats_data
+ )
+ return super(LoadBalancerPlugin, self).stats(context,
+ pool_id)
+
+ def populate_vip_graph(self, context, vip):
+ """Populate the vip with: pool, members, healthmonitors."""
+
+ pool = self.get_pool(context, vip['pool_id'])
+ vip['pool'] = pool
+ vip['members'] = [
+ self.get_member(context, member_id)
+ for member_id in pool['members']]
+ vip['health_monitors'] = [
+ self.get_health_monitor(context, hm_id)
+ for hm_id in pool['health_monitors']]
+ return vip
DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
DB_LB_PLUGIN_KLASS = (
"quantum.plugins.services.agent_loadbalancer."
- "lbaas_plugin.LoadBalancerPlugin"
+ "plugin.LoadBalancerPlugin"
)
ROOTDIR = os.path.dirname(__file__) + '../../../..'
ETCDIR = os.path.join(ROOTDIR, 'etc')
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2013 New Dream Network, LLC (DreamHost)
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# @author: Mark McClain, DreamHost
import mock
from oslo.config import cfg
-from quantum.plugins.services.agent_loadbalancer import agent
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import agent
from quantum.tests import base
import mock
-from quantum.plugins.services.agent_loadbalancer.agent import manager
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+ agent_manager as manager
+)
from quantum.tests import base
self.mock_importer = mock.patch.object(manager, 'importutils').start()
rpc_mock_cls = mock.patch(
- 'quantum.plugins.services.agent_loadbalancer.agent.api'
- '.LbaasAgentApi'
+ 'quantum.plugins.services.agent_loadbalancer.drivers'
+ '.haproxy.agent_api.LbaasAgentApi'
).start()
self.mgr = manager.LbaasAgentManager(mock_conf)
import mock
-from quantum.plugins.services.agent_loadbalancer.agent import api
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+ agent_api as api
+)
from quantum.tests import base
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2013 OpenStack Foundation.
-# All Rights Reserved.
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
from quantum import context
from quantum.db.loadbalancer import loadbalancer_db as ldb
from quantum import manager
-from quantum.openstack.common import importutils
from quantum.openstack.common import uuidutils
from quantum.plugins.common import constants
-from quantum.plugins.services.agent_loadbalancer import plugin
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+ plugin_driver
+)
from quantum.tests import base
from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer
# we need access to loaded plugins to modify models
loaded_plugins = manager.QuantumManager().get_service_plugins()
- # TODO(avishayb) - below is a little hack that helps the
- # test to pass :-)
- # the problem is the code below assumes the existance of 'callbacks'
- # on the plugin. So the bypass is to load the plugin that has
- # the callbacks as a member.The hack will be removed once we will
- # have one lbaas plugin. (we currently have 2 - (Grizzly and Havana))
- hack = True
- if hack:
- HACK_KLASS = (
- "quantum.plugins.services.agent_loadbalancer."
- "plugin.LoadBalancerPlugin"
- )
- self.plugin_instance = importutils.import_object(HACK_KLASS)
- else:
- self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
- self.callbacks = self.plugin_instance.callbacks
+
+ self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
+ def setUp(self):
+ super(TestLoadBalancerCallbacks, self).setUp()
+
+ self.callbacks = plugin_driver.LoadBalancerCallbacks(
+ self.plugin_instance
+ )
+
def test_get_ready_devices(self):
with self.vip() as vip:
ready = self.callbacks.get_ready_devices(
super(TestLoadBalancerAgentApi, self).setUp()
self.addCleanup(mock.patch.stopall)
- self.api = plugin.LoadBalancerAgentApi('topic', 'host')
+ self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host')
self.mock_cast = mock.patch.object(self.api, 'cast').start()
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
def test_modify_pool(self):
self._call_test_helper('modify_pool')
+
+
+class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
+ def setUp(self):
+ self.log = mock.patch.object(plugin_driver, 'LOG')
+ api_cls = mock.patch.object(plugin_driver,
+ 'LoadBalancerAgentApi').start()
+ super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
+ self.mock_api = api_cls.return_value
+
+ self.addCleanup(mock.patch.stopall)
+
+ def test_create_vip(self):
+ with self.subnet() as subnet:
+ with self.pool(subnet=subnet) as pool:
+ with self.vip(pool=pool, subnet=subnet) as vip:
+ self.mock_api.reload_pool.assert_called_once_with(
+ mock.ANY,
+ vip['vip']['pool_id']
+ )
+
+ def test_update_vip(self):
+ with self.subnet() as subnet:
+ with self.pool(subnet=subnet) as pool:
+ with self.vip(pool=pool, subnet=subnet) as vip:
+ self.mock_api.reset_mock()
+ ctx = context.get_admin_context()
+ vip['vip'].pop('status')
+ new_vip = self.plugin_instance.update_vip(
+ ctx,
+ vip['vip']['id'],
+ vip
+ )
+
+ self.mock_api.reload_pool.assert_called_once_with(
+ mock.ANY,
+ vip['vip']['pool_id']
+ )
+
+ self.assertEqual(
+ new_vip['status'],
+ constants.PENDING_UPDATE
+ )
+
+ def test_delete_vip(self):
+ with self.subnet() as subnet:
+ with self.pool(subnet=subnet) as pool:
+ with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
+ self.mock_api.reset_mock()
+ ctx = context.get_admin_context()
+ self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
+ self.mock_api.destroy_pool.assert_called_once_with(
+ mock.ANY,
+ vip['vip']['pool_id']
+ )