# (StrOpt) Type of Network Interface to allocate for VM:
# direct or hosdev according to libvirt terminology
-# vnic_type = direct
+# vnic_type = mlnx_direct
# (StrOpt) Eswitch daemon end point connection url
# daemon_endpoint = 'tcp://127.0.0.1:5001'
[agent]
# Agent's polling interval in seconds
# polling_interval = 2
+
+# (BoolOpt) Enable server RPC compatibility with old (pre-havana)
+# agents.
+#
+# rpc_support_old_agents = True
AGENT_TYPE_NEC = 'NEC plugin agent'
AGENT_TYPE_L3 = 'L3 agent'
AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
+AGENT_TYPE_MLNX = 'Mellanox plugin agent'
L2_AGENT_TOPIC = 'N/A'
PAGINATION_INFINITE = 'infinite'
from oslo.config import cfg
from neutron.agent import rpc as agent_rpc
+from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as logging_config
from neutron.common import constants as q_constants
from neutron.common import topics
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
+from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.mlnx.agent import utils
from neutron.plugins.mlnx.common import config # noqa
net_map = self.network_map[network_id]
net_map['ports'].append({'port_id': port_id, 'port_mac': port_mac})
- if network_type == constants.TYPE_VLAN:
- LOG.info(_('Binding VLAN ID %(seg_id)s'
+ if network_type in (constants.TYPE_VLAN,
+ constants.TYPE_IB):
+ LOG.info(_('Binding Segmentation ID %(seg_id)s'
'to eSwitch for vNIC mac_address %(mac)s'),
{'seg_id': seg_id,
'mac': port_mac})
seg_id,
port_mac)
self.utils.port_up(physical_network, port_mac)
- elif network_type == constants.TYPE_IB:
- LOG.debug(_('Network Type IB currently not supported'))
else:
LOG.error(_('Unsupported network type %s'), network_type)
if network_type == constants.TYPE_VLAN:
LOG.debug(_("creating VLAN Network"))
elif network_type == constants.TYPE_IB:
- LOG.debug(_("currently IB network provisioning is not supported"))
+ LOG.debug(_("creating IB Network"))
else:
LOG.error(_("Unknown network type %(network_type) "
"for network %(network_id)"),
self.network_map[network_id] = data
-class MlnxEswitchRpcCallbacks():
+class MlnxEswitchRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
# Set RPC API version to 1.0 by default.
- RPC_API_VERSION = '1.0'
+ # history
+ # 1.1 Support Security Group RPC
+ RPC_API_VERSION = '1.1'
- def __init__(self, context, eswitch):
+ def __init__(self, context, agent):
self.context = context
- self.eswitch = eswitch
+ self.agent = agent
+ self.eswitch = agent.eswitch
+ self.sg_agent = agent
def network_delete(self, context, **kwargs):
LOG.debug(_("network_delete received"))
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received"))
port = kwargs.get('port')
- vlan_id = kwargs.get('vlan_id')
- physical_network = kwargs.get('physical_network')
net_type = kwargs.get('network_type')
+ segmentation_id = kwargs.get('segmentation_id')
+ if not segmentation_id:
+ # compatibility with pre-Havana RPC vlan_id encoding
+ segmentation_id = kwargs.get('vlan_id')
+ physical_network = kwargs.get('physical_network')
net_id = port['network_id']
if self.eswitch.vnic_port_exists(port['mac_address']):
- if port['admin_state_up']:
- self.eswitch.port_up(net_id,
- net_type,
- physical_network,
- vlan_id,
- port['id'],
- port['mac_address'])
- else:
- self.eswitch.port_down(net_id,
- physical_network,
- port['mac_address'])
+ if 'security_groups' in port:
+ self.sg_agent.refresh_firewall()
+ try:
+ if port['admin_state_up']:
+ self.eswitch.port_up(net_id,
+ net_type,
+ physical_network,
+ segmentation_id,
+ port['id'],
+ port['mac_address'])
+ # update plugin about port status
+ self.agent.plugin_rpc.update_device_up(self.context,
+ port['mac_address'],
+ self.agent.agent_id)
+ else:
+ self.eswitch.port_down(net_id,
+ physical_network,
+ port['mac_address'])
+ # update plugin about port status
+ self.agent.plugin_rpc.update_device_down(
+ self.context,
+ port['mac_address'],
+ self.agent.agent_id)
+ except rpc_common.Timeout:
+ LOG.error(_("RPC timeout while updating port %s"), port['id'])
else:
LOG.debug(_("No port %s defined on agent."), port['id'])
return dispatcher.RpcDispatcher([self])
-class MlnxEswitchNeutronAgent(object):
+class MlnxEswitchPluginApi(agent_rpc.PluginApi,
+ sg_rpc.SecurityGroupServerRpcApiMixin):
+ pass
+
+
+class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
# Set RPC API version to 1.0 by default.
- RPC_API_VERSION = '1.0'
+ #RPC_API_VERSION = '1.0'
def __init__(self, interface_mapping):
self._polling_interval = cfg.CONF.AGENT.polling_interval
'host': cfg.CONF.host,
'topic': q_constants.L2_AGENT_TOPIC,
'configurations': interface_mapping,
- 'agent_type': 'eSwitch agent',
+ 'agent_type': q_constants.AGENT_TYPE_MLNX,
'start_flag': True}
self._setup_rpc()
+ self.init_firewall()
def _setup_eswitches(self, interface_mapping):
daemon = cfg.CONF.ESWITCH.daemon_endpoint
def _setup_rpc(self):
self.agent_id = 'mlnx-agent.%s' % socket.gethostname()
+ LOG.info(_("RPC agent_id: %s"), self.agent_id)
+
self.topic = topics.AGENT
- self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
+ self.plugin_rpc = MlnxEswitchPluginApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
- self.callbacks = MlnxEswitchRpcCallbacks(self.context, self.eswitch)
+ self.callbacks = MlnxEswitchRpcCallbacks(self.context,
+ self)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
- [topics.NETWORK, topics.DELETE]]
+ [topics.NETWORK, topics.DELETE],
+ [topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
- if 'added' in port_info:
+ if port_info.get('added'):
LOG.debug(_("ports added!"))
resync_a = self.treat_devices_added(port_info['added'])
- if 'removed' in port_info:
+ if port_info.get('removed'):
LOG.debug(_("ports removed!"))
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
continue
if dev_details['exists']:
LOG.info(_("Port %s updated."), device)
- self.eswitch.port_release(device)
else:
LOG.debug(_("Device %s not defined on plugin"), device)
+ self.eswitch.port_release(device)
return resync
def daemon_loop(self):
port_info = self.update_ports(ports)
# notify plugin about port deltas
if port_info:
- LOG.debug(_("Agent loop has new devices!"))
+ LOG.debug(_("Agent loop process devices!"))
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
ports = port_info['current']
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from oslo.config import cfg
+from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import topics
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import proxy
LOG = logging.getLogger(__name__)
-class AgentNotifierApi(proxy.RpcProxy):
+class AgentNotifierApi(proxy.RpcProxy,
+ sg_rpc.SecurityGroupAgentRpcApiMixin):
"""Agent side of the Embedded Switch RPC API.
API version history:
1.0 - Initial version.
+ 1.1 - Added get_active_networks_info, create_dhcp_port,
+ and update_dhcp_port methods.
"""
- BASE_RPC_API_VERSION = '1.0'
+ BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic):
super(AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
def port_update(self, context, port, physical_network,
network_type, vlan_id):
LOG.debug(_("Sending update port message"))
- self.fanout_cast(context,
- self.make_msg('port_update',
- port=port,
- physical_network=physical_network,
- network_type=network_type,
- vlan_id=vlan_id),
+ kwargs = {'port': port,
+ 'network_type': network_type,
+ 'physical_network': physical_network,
+ 'segmentation_id': vlan_id}
+ if cfg.CONF.AGENT.rpc_support_old_agents:
+ kwargs['vlan_id'] = vlan_id
+ msg = self.make_msg('port_update', **kwargs)
+ self.fanout_cast(context, msg,
topic=self.topic_port_update)
help=_("List of <physical_network>:<physical_interface>")),
cfg.StrOpt('vnic_type',
default=constants.VIF_TYPE_DIRECT,
- help=_("type of VM network interface: direct or hosdev")),
+ help=_("type of VM network interface: mlnx_direct or "
+ "hostdev")),
cfg.StrOpt('daemon_endpoint',
default='tcp://127.0.0.1:5001',
help=_('eswitch daemon end point')),
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
+ cfg.BoolOpt('rpc_support_old_agents', default=True,
+ help=_("Enable server RPC compatibility with old agents")),
]
TYPE_IB = 'ib'
TYPE_NONE = 'none'
-VIF_TYPE_DIRECT = 'direct'
+VIF_TYPE_DIRECT = 'mlnx_direct'
VIF_TYPE_HOSTDEV = 'hostdev'
VNIC_TYPE = 'vnic_type'
from neutron.common import exceptions as q_exc
import neutron.db.api as db
from neutron.db import models_v2
+from neutron.db import securitygroups_db as sg_db
+from neutron import manager
from neutron.openstack.common import log as logging
from neutron.plugins.mlnx.common import config # noqa
from neutron.plugins.mlnx.db import mlnx_models_v2
with session.begin(subtransactions=True):
entry = (session.query(mlnx_models_v2.SegmentationIdAllocation).
filter_by(allocated=False).
+ with_lockmode('update').
first())
if not entry:
raise q_exc.NoNetworkAvailable()
entry = (session.query(mlnx_models_v2.SegmentationIdAllocation).
filter_by(physical_network=physical_network,
segmentation_id=segmentation_id).
+ with_lockmode('update').
one())
if entry.allocated:
raise q_exc.VlanIdInUse(vlan_id=segmentation_id,
def get_network_binding(session, network_id):
- qry = session.query(mlnx_models_v2.NetworkBinding)
- qry = qry.filter_by(network_id=network_id)
- return qry.first()
+ return (session.query(mlnx_models_v2.NetworkBinding).
+ filter_by(network_id=network_id).first())
def add_port_profile_binding(session, port_id, vnic_type):
def get_port_profile_binding(session, port_id):
- qry = session.query(mlnx_models_v2.PortProfileBinding)
- return qry.filter_by(port_id=port_id).first()
+ return (session.query(mlnx_models_v2.PortProfileBinding).
+ filter_by(port_id=port_id).first())
def get_port_from_device(device):
"""Get port from database."""
LOG.debug(_("get_port_from_device() called"))
session = db.get_session()
- ports = session.query(models_v2.Port).all()
- for port in ports:
- if port['id'].startswith(device):
- return port
+ sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
+
+ query = session.query(models_v2.Port,
+ sg_db.SecurityGroupPortBinding.security_group_id)
+ query = query.outerjoin(sg_db.SecurityGroupPortBinding,
+ models_v2.Port.id == sg_binding_port)
+ query = query.filter(models_v2.Port.id.startswith(device))
+ port_and_sgs = query.all()
+ if not port_and_sgs:
+ return
+ port = port_and_sgs[0][0]
+ plugin = manager.NeutronManager.get_plugin()
+ port_dict = plugin._make_port_dict(port)
+ port_dict['security_groups'] = [
+ sg_id for port_in_db, sg_id in port_and_sgs if sg_id
+ ]
+ port_dict['security_group_rules'] = []
+ port_dict['security_group_source_groups'] = []
+ port_dict['fixed_ips'] = [ip['ip_address']
+ for ip in port['fixed_ips']]
+ return port_dict
def get_port_from_device_mac(device_mac):
from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
+from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
+from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as q_exc
from neutron.common import topics
from neutron.common import utils
-from neutron.db import agents_db
+from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
-from neutron.db import l3_db
+from neutron.db import extraroute_db
+from neutron.db import l3_gwmode_db
+from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
+from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import utils as plugin_utils
class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
- l3_db.L3_NAT_db_mixin,
- agents_db.AgentDbMixin,
- sg_db_rpc.SecurityGroupServerRpcMixin):
+ extraroute_db.ExtraRoute_db_mixin,
+ l3_gwmode_db.L3_NAT_db_mixin,
+ sg_db_rpc.SecurityGroupServerRpcMixin,
+ agentschedulers_db.L3AgentSchedulerDbMixin,
+ agentschedulers_db.DhcpAgentSchedulerDbMixin,
+ portbindings_db.PortBindingMixin):
"""Realization of Neutron API on Mellanox HCA embedded switch technology.
Current plugin provides embedded HCA Switch connectivity.
Code is based on the Linux Bridge plugin content to
support consistency with L3 & DHCP Agents.
+
+ A new VLAN is created for each network. An agent is relied upon
+ to perform the actual HCA configuration on each host.
+
+ The provider extension is also supported.
+
+ The port binding extension enables an external application relay
+ information to and from the plugin.
"""
# This attribute specifies whether the plugin supports or not
# is qualified by class
__native_bulk_support = True
- _supported_extension_aliases = ["provider", "router", "binding",
- "agent", "quotas", "security-group"]
+ _supported_extension_aliases = ["provider", "router", "ext-gw-mode",
+ "binding", "quotas", "security-group",
+ "agent", "extraroute",
+ "l3_agent_scheduler",
+ "dhcp_agent_scheduler"]
@property
def supported_extension_aliases(self):
self._aliases = aliases
return self._aliases
- network_view = "extension:provider_network:view"
- network_set = "extension:provider_network:set"
- binding_view = "extension:port_binding:view"
- binding_set = "extension:port_binding:set"
-
def __init__(self):
"""Start Mellanox Neutron Plugin."""
db.initialize()
db.sync_network_states(self.network_vlan_ranges)
self._set_tenant_network_type()
self.vnic_type = cfg.CONF.ESWITCH.vnic_type
+ self.base_binding_dict = {
+ portbindings.VIF_TYPE: self.vnic_type,
+ portbindings.CAPABILITIES: {
+ portbindings.CAP_PORT_FILTER:
+ 'security-group' in self.supported_extension_aliases}}
self._setup_rpc()
+ self.network_scheduler = importutils.import_object(
+ cfg.CONF.network_scheduler_driver
+ )
+ self.router_scheduler = importutils.import_object(
+ cfg.CONF.router_scheduler_driver
+ )
LOG.debug(_("Mellanox Embedded Switch Plugin initialisation complete"))
def _setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
- self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
+ self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
+ self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+ dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+ )
+ self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+ l3_rpc_agent_api.L3AgentNotify
+ )
def _parse_network_vlan_ranges(self):
try:
raise q_exc.InvalidInput(error_message=msg)
return physical_network
+ def _check_port_binding_for_net_type(self, vnic_type, net_type):
+ if net_type == constants.TYPE_VLAN:
+ return vnic_type in (constants.VIF_TYPE_DIRECT,
+ constants.VIF_TYPE_HOSTDEV)
+ elif net_type == constants.TYPE_IB:
+ return vnic_type == constants.VIF_TYPE_HOSTDEV
+ return False
+
def _process_port_binding_create(self, context, attrs):
binding_profile = attrs.get(portbindings.PROFILE)
binding_profile_set = attributes.is_attr_set(binding_profile)
+
+ net_binding = db.get_network_binding(context.session,
+ attrs.get('network_id'))
+ net_type = net_binding.network_type
+
if not binding_profile_set:
return self.vnic_type
if constants.VNIC_TYPE in binding_profile:
- req_vnic_type = binding_profile[constants.VNIC_TYPE]
- if req_vnic_type in (constants.VIF_TYPE_DIRECT,
- constants.VIF_TYPE_HOSTDEV):
- return req_vnic_type
+ vnic_type = binding_profile[constants.VNIC_TYPE]
+ if vnic_type in (constants.VIF_TYPE_DIRECT,
+ constants.VIF_TYPE_HOSTDEV):
+ if self._check_port_binding_for_net_type(vnic_type,
+ net_type):
+ self.base_binding_dict[portbindings.VIF_TYPE] = vnic_type
+ return vnic_type
+ else:
+ msg = (_("unsupported vnic type %(vnic_type)s "
+ "for network type %(net_type)s") %
+ {'vnic_type': vnic_type, 'net_type': net_type})
else:
msg = _("invalid vnic_type on port_create")
else:
network['network'])
session = context.session
with session.begin(subtransactions=True):
+ #set up default security groups
+ tenant_id = self._get_tenant_id_for_create(
+ context, network['network'])
+ self._ensure_default_security_group(context, tenant_id)
+
if not network_type:
# tenant network
network_type = self.tenant_network_type
return net
def update_network(self, context, net_id, network):
+ LOG.debug(_("update network"))
provider._raise_if_updates_provider_attributes(network['network'])
+
session = context.session
with session.begin(subtransactions=True):
net = super(MellanoxEswitchPlugin, self).update_network(context,
self._extend_network_dict_provider(context, net)
return self._fields(net, fields)
- def get_networks(self, context, filters=None, fields=None):
+ def get_networks(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
session = context.session
with session.begin(subtransactions=True):
- nets = super(MellanoxEswitchPlugin, self).get_networks(context,
- filters,
- None)
+ nets = super(MellanoxEswitchPlugin,
+ self).get_networks(context, filters, None, sorts,
+ limit, marker, page_reverse)
for net in nets:
self._extend_network_dict_provider(context, net)
- # TODO(rkukura): Filter on extended provider attributes.
- nets = self._filter_nets_l3(context, nets, filters)
+
return [self._fields(net, fields) for net in nets]
def _extend_port_dict_binding(self, context, port):
port['id'])
if port_binding:
port[portbindings.VIF_TYPE] = port_binding.vnic_type
- port[portbindings.CAPABILITIES] = {
- portbindings.CAP_PORT_FILTER:
- 'security-group' in self.supported_extension_aliases}
binding = db.get_network_binding(context.session,
port['network_id'])
fabric = binding.physical_network
def create_port(self, context, port):
LOG.debug(_("create_port with %s"), port)
- vnic_type = self._process_port_binding_create(context, port['port'])
- port = super(MellanoxEswitchPlugin, self).create_port(context, port)
- db.add_port_profile_binding(context.session, port['id'], vnic_type)
+ session = context.session
+ port_data = port['port']
+ with session.begin(subtransactions=True):
+ self._ensure_default_security_group_on_port(context, port)
+ sgids = self._get_security_groups_on_port(context, port)
+ # Set port status as 'DOWN'. This will be updated by agent
+ port['port']['status'] = q_const.PORT_STATUS_DOWN
+
+ vnic_type = self._process_port_binding_create(context,
+ port['port'])
+
+ port = super(MellanoxEswitchPlugin,
+ self).create_port(context, port)
+
+ self._process_portbindings_create_and_update(context,
+ port_data,
+ port)
+ db.add_port_profile_binding(context.session, port['id'], vnic_type)
+
+ self._process_port_create_security_group(
+ context, port, sgids)
+ self.notify_security_groups_member_updated(context, port)
return self._extend_port_dict_binding(context, port)
def get_port(self, context, id, fields=None):
- port = super(MellanoxEswitchPlugin, self).get_port(context, id, fields)
- return self._fields(self._extend_port_dict_binding(context, port),
- fields)
-
- def get_ports(self, context, filters=None, fields=None):
- ports = super(MellanoxEswitchPlugin, self).get_ports(
- context, filters, fields)
- return [self._fields(self._extend_port_dict_binding(context, port),
- fields) for port in ports]
+ port = super(MellanoxEswitchPlugin, self).get_port(context,
+ id,
+ fields)
+ self._extend_port_dict_binding(context, port)
+ return self._fields(port, fields)
+
+ def get_ports(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
+ res_ports = []
+ ports = super(MellanoxEswitchPlugin,
+ self).get_ports(context, filters, fields, sorts,
+ limit, marker, page_reverse)
+ for port in ports:
+ port = self._extend_port_dict_binding(context, port)
+ res_ports.append(self._fields(port, fields))
+ return res_ports
def update_port(self, context, port_id, port):
- original_port = super(MellanoxEswitchPlugin, self).get_port(context,
- port_id)
+ original_port = self.get_port(context, port_id)
session = context.session
+ need_port_update_notify = False
+
with session.begin(subtransactions=True):
- port = super(MellanoxEswitchPlugin, self).update_port(context,
- port_id,
- port)
- if original_port['admin_state_up'] != port['admin_state_up']:
+ updated_port = super(MellanoxEswitchPlugin, self).update_port(
+ context, port_id, port)
+ self._process_portbindings_create_and_update(context,
+ port['port'],
+ updated_port)
+ need_port_update_notify = self.update_security_group_on_port(
+ context, port_id, port, original_port, updated_port)
+
+ need_port_update_notify |= self.is_security_group_member_updated(
+ context, original_port, updated_port)
+
+ if original_port['admin_state_up'] != updated_port['admin_state_up']:
+ need_port_update_notify = True
+
+ if need_port_update_notify:
binding = db.get_network_binding(context.session,
- port['network_id'])
- self.notifier.port_update(context, port,
+ updated_port['network_id'])
+ self.notifier.port_update(context, updated_port,
binding.physical_network,
binding.network_type,
binding.segmentation_id)
- return self._extend_port_dict_binding(context, port)
+ return self._extend_port_dict_binding(context, updated_port)
def delete_port(self, context, port_id, l3_port_check=True):
# if needed, check to see if this is a port owned by
session = context.session
with session.begin(subtransactions=True):
self.disassociate_floatingips(context, port_id)
+ port = self.get_port(context, port_id)
+ self._delete_port_security_group_bindings(context, port_id)
+ super(MellanoxEswitchPlugin, self).delete_port(context, port_id)
- return super(MellanoxEswitchPlugin, self).delete_port(context,
- port_id)
+ self.notify_security_groups_member_updated(context, port)
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
port['device'] = device
else:
port = db.get_port_from_device_mac(device)
+ if port:
+ port['device'] = device
return port
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
- LOG.debug("Device %s details requested from %s", device, agent_id)
+ LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
+ {'device': device, 'agent_id': agent_id})
port = self.get_port_from_device(device)
if port:
binding = db.get_network_binding(db_api.get_session(),
entry = {'device': device,
'physical_network': binding.physical_network,
'network_type': binding.network_type,
- 'vlan_id': binding.segmentation_id,
+ 'segmentation_id': binding.segmentation_id,
'network_id': port['network_id'],
'port_mac': port['mac_address'],
'port_id': port['id'],
'admin_state_up': port['admin_state_up']}
- # Set the port status to UP
- db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE)
+ if cfg.CONF.AGENT.rpc_support_old_agents:
+ entry['vlan_id'] = binding.segmentation_id
+ new_status = (q_const.PORT_STATUS_ACTIVE if port['admin_state_up']
+ else q_const.PORT_STATUS_DOWN)
+ if port['status'] != new_status:
+ db.set_port_status(port['id'], new_status)
else:
entry = {'device': device}
LOG.debug("%s can not be found in database", device)
device = kwargs.get('device')
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
- port = db.get_port_from_device(device)
+ port = self.get_port_from_device(device)
if port:
entry = {'device': device,
'exists': True}
- # Set port status to DOWN
- db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
+ if port['status'] != q_const.PORT_STATUS_DOWN:
+ # Set port status to DOWN
+ db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
else:
entry = {'device': device,
'exists': False}
--- /dev/null
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from neutron.tests.unit.mlnx import test_mlnx_plugin
+from neutron.tests.unit.openvswitch import test_agent_scheduler
+
+
+class MlnxAgentSchedulerTestCase(
+ test_agent_scheduler.OvsAgentSchedulerTestCase):
+ plugin_str = test_mlnx_plugin.PLUGIN_NAME
+
+
+class MlnxL3AgentNotifierTestCase(
+ test_agent_scheduler.OvsL3AgentNotifierTestCase):
+ plugin_str = test_mlnx_plugin.PLUGIN_NAME
+
+
+class MlnxDhcpAgentNotifierTestCase(
+ test_agent_scheduler.OvsDhcpAgentNotifierTestCase):
+ plugin_str = test_mlnx_plugin.PLUGIN_NAME
from neutron.plugins.mlnx.common import constants
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit import test_db_plugin as test_plugin
+from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
+
PLUGIN_NAME = ('neutron.plugins.mlnx.mlnx_plugin.MellanoxEswitchPlugin')
def setUp(self):
super(MlnxPluginV2TestCase, self).setUp(self._plugin_name)
+ self.port_create_status = 'DOWN'
class TestMlnxBasicGet(test_plugin.TestBasicGet, MlnxPluginV2TestCase):
test_bindings.PortBindingsTestCase):
VIF_TYPE = constants.VIF_TYPE_DIRECT
HAS_PORT_FILTER = False
+
+
+class TestMlnxPortBindingNoSG(TestMlnxPortBinding):
+ HAS_PORT_FILTER = False
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
+
+
+class TestMlnxPortBindingHost(
+ MlnxPluginV2TestCase,
+ test_bindings.PortBindingsHostTestCaseMixin):
+ pass
--- /dev/null
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+import webob.exc
+
+from neutron.api.v2 import attributes
+from neutron.extensions import securitygroup as ext_sg
+from neutron.plugins.mlnx.db import mlnx_db_v2 as mlnx_db
+from neutron.tests.unit import test_extension_security_group as test_sg
+from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
+
+
+PLUGIN_NAME = ('neutron.plugins.mlnx.'
+ 'mlnx_plugin.MellanoxEswitchPlugin')
+AGENT_NAME = ('neutron.plugins.mlnx.'
+ 'agent.eswitch_neutron_agent.MlnxEswitchNeutronAgent')
+NOTIFIER = ('neutron.plugins.mlnx.'
+ 'agent_notify_api.AgentNotifierApi')
+
+
+class MlnxSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
+ _plugin_name = PLUGIN_NAME
+
+ def setUp(self, plugin=None):
+ test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_IPTABLES_DRIVER)
+ notifier_p = mock.patch(NOTIFIER)
+ notifier_cls = notifier_p.start()
+ self.notifier = mock.Mock()
+ notifier_cls.return_value = self.notifier
+ self._attribute_map_bk_ = {}
+ for item in attributes.RESOURCE_ATTRIBUTE_MAP:
+ self._attribute_map_bk_[item] = (attributes.
+ RESOURCE_ATTRIBUTE_MAP[item].
+ copy())
+ super(MlnxSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
+ self.addCleanup(mock.patch.stopall)
+
+ def tearDown(self):
+ attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
+ super(MlnxSecurityGroupsTestCase, self).tearDown()
+
+
+class TestMlnxSecurityGroups(MlnxSecurityGroupsTestCase,
+ test_sg.TestSecurityGroups,
+ test_sg_rpc.SGNotificationTestMixin):
+ pass
+
+
+class TestMlnxSecurityGroupsXML(TestMlnxSecurityGroups):
+ fmt = 'xml'
+
+
+class TestMlnxSecurityGroupsDB(MlnxSecurityGroupsTestCase):
+ def test_security_group_get_port_from_device(self):
+ with self.network() as n:
+ with self.subnet(n):
+ with self.security_group() as sg:
+ security_group_id = sg['security_group']['id']
+ res = self._create_port(self.fmt, n['network']['id'])
+ port = self.deserialize(self.fmt, res)
+ fixed_ips = port['port']['fixed_ips']
+ data = {'port': {'fixed_ips': fixed_ips,
+ 'name': port['port']['name'],
+ ext_sg.SECURITYGROUPS:
+ [security_group_id]}}
+
+ req = self.new_update_request('ports', data,
+ port['port']['id'])
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ res = self.deserialize(self.fmt,
+ req.get_response(self.api))
+ port_id = res['port']['id']
+ device_id = port_id[:8]
+ port_dict = mlnx_db.get_port_from_device(device_id)
+ self.assertEqual(port_id, port_dict['id'])
+ self.assertEqual([security_group_id],
+ port_dict[ext_sg.SECURITYGROUPS])
+ self.assertEqual([], port_dict['security_group_rules'])
+ self.assertEqual([fixed_ips[0]['ip_address']],
+ port_dict['fixed_ips'])
+ self._delete('ports', port['port']['id'])
+
+ def test_security_group_get_port_from_device_with_no_port(self):
+ port_dict = mlnx_db.get_port_from_device('bad_device_id')
+ self.assertEqual(None, port_dict)
+
+
+class TestMlnxSecurityGroupsDBXML(TestMlnxSecurityGroupsDB):
+ fmt = 'xml'
Unit Tests for Mellanox RPC (major reuse of linuxbridge rpc unit tests)
"""
+from oslo.config import cfg
import stubout
from neutron.agent import rpc as agent_rpc
class rpcApiTestCase(base.BaseTestCase):
- def _test_mlnx_api(self, rpcapi, topic, method, rpc_method, **kwargs):
+ def _test_mlnx_api(self, rpcapi, topic, method, rpc_method,
+ expected_msg=None, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
- expected_msg = rpcapi.make_msg(method, **kwargs)
+ if not expected_msg:
+ expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
retval = getattr(rpcapi, method)(ctxt, **kwargs)
- self.assertEqual(retval, expected_retval)
+ self.assertEqual(expected_retval, retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
- self.assertEqual(arg, expected_arg)
+ self.assertEqual(expected_arg, arg)
def test_delete_network(self):
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
network_id='fake_request_spec')
def test_port_update(self):
+ cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
+ expected_msg = rpcapi.make_msg('port_update',
+ port='fake_port',
+ network_type='vlan',
+ physical_network='fake_net',
+ segmentation_id='fake_vlan_id')
self._test_mlnx_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
+ expected_msg=expected_msg,
+ port='fake_port',
+ network_type='vlan',
+ physical_network='fake_net',
+ vlan_id='fake_vlan_id')
+
+ def test_port_update_ib(self):
+ cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
+ rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
+ expected_msg = rpcapi.make_msg('port_update',
+ port='fake_port',
+ network_type='ib',
+ physical_network='fake_net',
+ segmentation_id='fake_vlan_id')
+ self._test_mlnx_api(rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.PORT,
+ topics.UPDATE),
+ 'port_update', rpc_method='fanout_cast',
+ expected_msg=expected_msg,
+ port='fake_port',
+ network_type='ib',
+ physical_network='fake_net',
+ vlan_id='fake_vlan_id')
+
+ def test_port_update_old_agent(self):
+ cfg.CONF.set_override('rpc_support_old_agents', True, 'AGENT')
+ rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
+ expected_msg = rpcapi.make_msg('port_update',
+ port='fake_port',
+ network_type='vlan',
+ physical_network='fake_net',
+ segmentation_id='fake_vlan_id',
+ vlan_id='fake_vlan_id')
+ self._test_mlnx_api(rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.PORT,
+ topics.UPDATE),
+ 'port_update', rpc_method='fanout_cast',
+ expected_msg=expected_msg,
port='fake_port',
network_type='vlan',
physical_network='fake_net',