--- /dev/null
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.common import rpc as n_rpc
+from neutron import manager
+
+
+# TODO(amotoki): Move security group RPC API and agent callback
+# from securitygroups_rpc.py.
+
+
+class SecurityGroupServerRpcCallback(n_rpc.RpcCallback):
+ """Callback for SecurityGroup agent RPC in plugin implementations.
+
+ Subclass which inherits this class must implement get_port_from_device().
+ """
+
+ # API version history:
+ # 1.1 - Initial version
+
+ # NOTE: RPC_API_VERSION must not be overridden in subclasses
+ # to keep RPC API version consistent across plugins.
+ RPC_API_VERSION = '1.1'
+
+ @property
+ def plugin(self):
+ return manager.NeutronManager.get_plugin()
+
+ def security_group_rules_for_devices(self, context, **kwargs):
+ """Callback method to return security group rules for each port.
+
+ also convert remote_group_id rule
+ to source_ip_prefix and dest_ip_prefix rule
+
+ :params devices: list of devices
+ :returns: port correspond to the devices with security group rules
+ """
+ devices = kwargs.get('devices')
+
+ ports = {}
+ for device in devices:
+ port = self.plugin.get_port_from_device(device)
+ if not port:
+ continue
+ if port['device_owner'].startswith('network:'):
+ continue
+ ports[port['id']] = port
+ return self.plugin.security_group_rules_for_ports(context, ports)
class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
+ """Mixin class to add agent-based security group implementation."""
+
+ def get_port_from_device(self, device):
+ """Get port dict from device name on an agent.
+
+ Subclass must provide this method.
+
+ :param device: device name which identifies a port on the agent side.
+ What is specified in "device" depends on a plugin agent implementation.
+ For example, it is a port ID in OVS agent and netdev name in Linux
+ Bridge agent.
+ :return: port dict returned by DB plugin get_port(). In addition,
+ it must contain the following fields in the port dict returned.
+ - device
+ - security_groups
+ - security_group_rules,
+ - security_group_source_groups
+ - fixed_ips
+ """
+ raise NotImplementedError(_("%s must implement get_port_from_device.")
+ % self.__class__.__name__)
def create_security_group_rule(self, context, security_group_rule):
bulk_rule = {'security_group_rules': [security_group_rule]}
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
-
-class SecurityGroupServerRpcCallbackMixin(object):
- """A mix-in that enable SecurityGroup agent support in plugin
- implementations.
- """
-
- def security_group_rules_for_devices(self, context, **kwargs):
- """Return security group rules for each port.
-
- also convert remote_group_id rule
- to source_ip_prefix and dest_ip_prefix rule
-
- :params devices: list of devices
- :returns: port correspond to the devices with security group rules
- """
- devices = kwargs.get('devices')
-
- ports = {}
- for device in devices:
- port = self.get_port_from_device(device)
- if not port:
- continue
- if port['device_owner'].startswith('network:'):
- continue
- ports[port['id']] = port
- return self._security_group_rules_for_ports(context, ports)
-
def _select_rules_for_ports(self, context, ports):
if not ports:
return []
self._add_ingress_ra_rule(port, ips_ra)
self._add_ingress_dhcp_rule(port, ips_dhcp)
- def _security_group_rules_for_ports(self, context, ports):
+ def security_group_rules_for_ports(self, context, ports):
rules_in_db = self._select_rules_for_ports(context, ports)
for (binding, rule_in_db) in rules_in_db:
port_id = binding['port_id']
from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as const
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db
-from neutron.db import securitygroups_rpc_base as sg_rpc_base
+from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import external_net
from neutron.extensions import extra_dhcp_opt as edo_ext
topic=self.topic_port_update)
-class RestProxyCallbacks(n_rpc.RpcCallback,
- sg_rpc_base.SecurityGroupServerRpcCallbackMixin):
-
- RPC_API_VERSION = '1.1'
+class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
def get_port_from_device(self, device):
port_id = re.sub(r"^tap", "", device)
addr_pair_db.AllowedAddressPairsMixin,
extradhcpopt_db.ExtraDhcpOptMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
- sg_rpc_base.SecurityGroupServerRpcMixin):
+ SecurityGroupServerRpcMixin):
_supported_extension_aliases = ["external-net", "router", "binding",
"router_rules", "extra_dhcp_opt", "quotas",
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
self._dhcp_agent_notifier
)
- self.endpoints = [RestProxyCallbacks(),
+ self.endpoints = [securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
PLUGIN_VERSION = 0.88
AGENT_OWNER_PREFIX = "network:"
NOS_DRIVER = 'neutron.plugins.brocade.nos.nosdriver.NOSdriver'
+TAP_PREFIX_LEN = 3
SWITCH_OPTS = [cfg.StrOpt('address', default='',
help=_('The address of the host to SSH to')),
cfg.CONF.register_opts(PHYSICAL_INTERFACE_OPTS, "PHYSICAL_INTERFACE")
-class BridgeRpcCallbacks(n_rpc.RpcCallback,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+class BridgeRpcCallbacks(n_rpc.RpcCallback):
"""Agent callback."""
RPC_API_VERSION = '1.2'
# history
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
- TAP_PREFIX_LEN = 3
-
- @classmethod
- def get_port_from_device(cls, device):
- """Get port from the brocade specific db."""
-
- # TODO(shh) context is not being passed as
- # an argument to this function;
- #
- # need to be fixed in:
- # file: neutron/db/securtygroups_rpc_base.py
- # function: securitygroup_rules_for_devices()
- # which needs to pass context to us
-
- # Doing what other plugins are doing
- session = db.get_session()
- port = brocade_db.get_port_from_device(
- session, device[cls.TAP_PREFIX_LEN:])
-
- # TODO(shiv): need to extend the db model to include device owners
- # make it appears that the device owner is of type network
- if port:
- port['device'] = device
- port['device_owner'] = AGENT_OWNER_PREFIX
- port['binding:vif_type'] = 'bridge'
- return port
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
- port = brocade_db.get_port(rpc_context, device[self.TAP_PREFIX_LEN:])
+ port = brocade_db.get_port(rpc_context, device[TAP_PREFIX_LEN:])
if port:
entry = {'device': device,
'vlan_id': port.vlan_id,
return entry
+class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
+
+ @classmethod
+ def get_port_from_device(cls, device):
+ """Get port from the brocade specific db."""
+
+ # TODO(shh) context is not being passed as
+ # an argument to this function;
+ #
+ # need to be fixed in:
+ # file: neutron/db/securtygroups_rpc_base.py
+ # function: securitygroup_rules_for_devices()
+ # which needs to pass context to us
+
+ # Doing what other plugins are doing
+ session = db.get_session()
+ port = brocade_db.get_port_from_device(
+ session, device[TAP_PREFIX_LEN:])
+
+ # TODO(shiv): need to extend the db model to include device owners
+ # make it appears that the device owner is of type network
+ if port:
+ port['device'] = device
+ port['device_owner'] = AGENT_OWNER_PREFIX
+ port['binding:vif_type'] = 'bridge'
+ return port
+
+
class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
"""Agent side of the linux bridge rpc API.
class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
- sg_db_rpc.SecurityGroupServerRpcMixin,
+ SecurityGroupServerRpcMixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_base.PortBindingBaseMixin):
is_admin=False)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [BridgeRpcCallbacks(),
+ securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
LOG = logging.getLogger(__name__)
+# Device names start with "tap"
+TAP_PREFIX_LEN = 3
-class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin
- ):
+
+class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback):
# history
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
RPC_API_VERSION = '1.2'
- # Device names start with "tap"
- TAP_PREFIX_LEN = 3
-
- @classmethod
- def get_port_from_device(cls, device):
- port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
- if port:
- port['device'] = device
- return port
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
- port = self.get_port_from_device(device)
+ plugin = manager.NeutronManager.get_plugin()
+ port = plugin.get_port_from_device(device)
if port:
binding = db.get_network_binding(db_api.get_session(),
port['network_id'])
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
- port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
+ port = plugin.get_port_from_device(device)
if port:
entry = {'device': device,
'exists': True}
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
- port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
+ port = plugin.get_port_from_device(device)
if port:
if (host and
not plugin.get_port_host(rpc_context, port['id']) == host):
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [LinuxBridgeRpcCallbacks(),
+ securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
self.notifier.port_update(context, port,
binding.physical_network,
binding.vlan_id)
+
+ @classmethod
+ def get_port_from_device(cls, device):
+ port = db.get_port_from_device(device[TAP_PREFIX_LEN:])
+ if port:
+ port['device'] = device
+ return port
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import dvr_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions as exc
from neutron.openstack.common import jsonutils
from neutron.openstack.common import lockutils
from neutron.openstack.common import log
+from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import config # noqa
# providernet.py?
TYPE_MULTI_SEGMENT = 'multi-segment'
+TAP_DEVICE_PREFIX = 'tap'
+TAP_DEVICE_PREFIX_LENGTH = 3
+
class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
dvr_mac_db.DVRDbMixin,
def start_rpc_listeners(self):
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
+ securitygroups_rpc.SecurityGroupServerRpcCallback(),
dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback()]
else:
port_host = db.get_port_binding_host(port_id)
return (port_host == host)
+
+ def get_port_from_device(self, device):
+ port_id = self._device_to_port_id(device)
+ port = db.get_port_and_sgs(port_id)
+ if port:
+ port['device'] = device
+ return port
+
+ def _device_to_port_id(self, device):
+ # REVISIT(rkukura): Consider calling into MechanismDrivers to
+ # process device names, or having MechanismDrivers supply list
+ # of device prefixes to strip.
+ if device.startswith(TAP_DEVICE_PREFIX):
+ return device[TAP_DEVICE_PREFIX_LENGTH:]
+ else:
+ # REVISIT(irenab): Consider calling into bound MD to
+ # handle the get_device_details RPC, then remove the 'else' clause
+ if not uuidutils.is_uuid_like(device):
+ port = db.get_port_from_device_mac(device)
+ if port:
+ return port.id
+ return device
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
-from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
from neutron import manager
from neutron.openstack.common import log
-from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as service_constants
-from neutron.plugins.ml2 import db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the
LOG = log.getLogger(__name__)
-TAP_DEVICE_PREFIX = 'tap'
-TAP_DEVICE_PREFIX_LENGTH = 3
-
class RpcCallbacks(n_rpc.RpcCallback,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin,
type_tunnel.TunnelRpcCallbackMixin):
RPC_API_VERSION = '1.3'
self.setup_tunnel_callback_mixin(notifier, type_manager)
super(RpcCallbacks, self).__init__()
- @classmethod
- def _device_to_port_id(cls, device):
- # REVISIT(rkukura): Consider calling into MechanismDrivers to
- # process device names, or having MechanismDrivers supply list
- # of device prefixes to strip.
- if device.startswith(TAP_DEVICE_PREFIX):
- return device[TAP_DEVICE_PREFIX_LENGTH:]
- else:
- # REVISIT(irenab): Consider calling into bound MD to
- # handle the get_device_details RPC, then remove the 'else' clause
- if not uuidutils.is_uuid_like(device):
- port = db.get_port_from_device_mac(device)
- if port:
- return port.id
- return device
-
- @classmethod
- def get_port_from_device(cls, device):
- port_id = cls._device_to_port_id(device)
- port = db.get_port_and_sgs(port_id)
- if port:
- port['device'] = device
- return port
-
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
LOG.debug("Device %(device)s details requested by agent "
"%(agent_id)s with host %(host)s",
{'device': device, 'agent_id': agent_id, 'host': host})
- port_id = self._device_to_port_id(device)
plugin = manager.NeutronManager.get_plugin()
+ port_id = plugin._device_to_port_id(device)
port_context = plugin.get_bound_port_context(rpc_context,
port_id,
host)
"%(agent_id)s"),
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
- port_id = self._device_to_port_id(device)
+ port_id = plugin._device_to_port_id(device)
port_exists = True
if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)):
LOG.debug(_("Device %(device)s up at agent %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
- port_id = self._device_to_port_id(device)
+ port_id = plugin._device_to_port_id(device)
if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
LOG = logging.getLogger(__name__)
+#to be compatible with Linux Bridge Agent on Network Node
+TAP_PREFIX_LEN = 3
+
class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
+ securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
# now that we've left db transaction, we are safe to notify
self.notify_routers_updated(context, router_ids)
self.notify_security_groups_member_updated(context, port)
+
+ @classmethod
+ def get_port_from_device(cls, device):
+ """Get port according to device.
+
+ To maintain compatibility with Linux Bridge L2 Agent for DHCP/L3
+ services get device either by linux bridge plugin
+ device name convention or by mac address
+ """
+ port = db.get_port_from_device(device[TAP_PREFIX_LEN:])
+ if port:
+ port['device'] = device
+ else:
+ port = db.get_port_from_device_mac(device)
+ if port:
+ port['device'] = device
+ return port
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.db import api as db_api
-from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.openstack.common import log as logging
from neutron.plugins.mlnx.db import mlnx_db_v2 as db
LOG = logging.getLogger(__name__)
-class MlnxRpcCallbacks(n_rpc.RpcCallback,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+class MlnxRpcCallbacks(n_rpc.RpcCallback):
# History
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
RPC_API_VERSION = '1.2'
- #to be compatible with Linux Bridge Agent on Network Node
- TAP_PREFIX_LEN = 3
-
- @classmethod
- def get_port_from_device(cls, device):
- """Get port according to device.
-
- To maintain compatibility with Linux Bridge L2 Agent for DHCP/L3
- services get device either by linux bridge plugin
- device name convention or by mac address
- """
- port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
- if port:
- port['device'] = device
- else:
- port = db.get_port_from_device_mac(device)
- if port:
- port['device'] = device
- return port
-
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const
from neutron.common import exceptions as n_exc
LOG = logging.getLogger(__name__)
+class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
+
+ @staticmethod
+ def get_port_from_device(device):
+ port = ndb.get_port_from_device(device)
+ if port:
+ port['device'] = device
+ LOG.debug("NECPluginV2.get_port_from_device() called, "
+ "device=%(device)s => %(ret)s.",
+ {'device': device, 'ret': port})
+ return port
+
+
class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin,
nec_router.RouterMixin,
- sg_db_rpc.SecurityGroupServerRpcMixin,
+ SecurityGroupServerRpcMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
nec_router.L3AgentSchedulerDbMixin,
packet_filter.PacketFilterMixin,
)
# NOTE: callback_sg is referred to from the sg unit test.
- self.callback_sg = SecurityGroupServerRpcCallback()
+ self.callback_sg = securitygroups_rpc.SecurityGroupServerRpcCallback()
self.endpoints = [
NECPluginV2RPCCallbacks(self.safe_reference),
dhcp_rpc.DhcpRpcCallback(),
topic=self.topic_port_update)
-class SecurityGroupServerRpcCallback(
- n_rpc.RpcCallback,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
-
- RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
-
- @staticmethod
- def get_port_from_device(device):
- port = ndb.get_port_from_device(device)
- if port:
- port['device'] = device
- LOG.debug(_("NECPluginV2RPCCallbacks.get_port_from_device() called, "
- "device=%(device)s => %(ret)s."),
- {'device': device, 'ret': port})
- return port
-
-
class NECPluginV2RPCCallbacks(n_rpc.RpcCallback):
RPC_API_VERSION = '1.0'
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const
from neutron.common import exceptions as nexception
from neutron.common import rpc as n_rpc
IPv6 = 6
-class NVSDPluginRpcCallbacks(n_rpc.RpcCallback,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
-
- RPC_API_VERSION = '1.1'
+class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
@staticmethod
def get_port_from_device(device):
external_net_db.External_net_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
portbindings_base.PortBindingBaseMixin,
- sg_db_rpc.SecurityGroupServerRpcMixin):
+ SecurityGroupServerRpcMixin):
"""L2 Virtual Network Plugin.
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotifyAPI()
)
- self.endpoints = [NVSDPluginRpcCallbacks(),
+ self.endpoints = [securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
LOG = logging.getLogger(__name__)
-class OVSRpcCallbacks(n_rpc.RpcCallback,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+class OVSRpcCallbacks(n_rpc.RpcCallback):
# history
# 1.0 Initial version
self.notifier = notifier
self.tunnel_type = tunnel_type
- @classmethod
- def get_port_from_device(cls, device):
- port = ovs_db_v2.get_port_from_device(device)
- if port:
- port['device'] = device
- return port
-
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
return entry
+class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
+
+ @classmethod
+ def get_port_from_device(cls, device):
+ port = ovs_db_v2.get_port_from_device(device)
+ if port:
+ port['device'] = device
+ return port
+
+
class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
'''Agent side of the openvswitch rpc API.
external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
- sg_db_rpc.SecurityGroupServerRpcMixin,
+ SecurityGroupServerRpcMixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_db.PortBindingMixin,
l3_rpc_agent_api.L3AgentNotifyAPI()
)
self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
+ securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
LOG = logging.getLogger(__name__)
-class RyuRpcCallbacks(n_rpc.RpcCallback,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
+
+ @classmethod
+ def get_port_from_device(cls, device):
+ port = db_api_v2.get_port_from_device(device)
+ if port:
+ port['device'] = device
+ return port
+
+
+class RyuRpcCallbacks(n_rpc.RpcCallback):
RPC_API_VERSION = '1.1'
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
return self.ofp_rest_api_addr
- @classmethod
- def get_port_from_device(cls, device):
- port = db_api_v2.get_port_from_device(device)
- if port:
- port['device'] = device
- return port
-
class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
- sg_db_rpc.SecurityGroupServerRpcMixin,
+ SecurityGroupServerRpcMixin,
portbindings_base.PortBindingBaseMixin):
_supported_extension_aliases = ["external-net", "router", "ext-gw-mode",
self.conn = n_rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host),
+ securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback()]
for svc_topic in self.service_topics.values():
self.startHttpPatch()
-class TestSecServerRpcCallBack(test_sg_rpc.SGServerRpcCallBackMixinTestCase,
+class TestSecServerRpcCallBack(test_sg_rpc.SGServerRpcCallBackTestCase,
RestProxySecurityGroupsTestCase):
pass
# See the License for the specific language governing permissions and
# limitations under the License.
-import contextlib
-
import mock
from oslo.config import cfg
self.callbacks = lb_neutron_plugin.LinuxBridgeRpcCallbacks()
def test_update_device_down(self):
- with contextlib.nested(
- mock.patch.object(self.callbacks, "get_port_from_device",
- return_value=None),
- mock.patch.object(manager.NeutronManager, "get_plugin")
- ) as (gpfd, gp):
+ with mock.patch.object(manager.NeutronManager, "get_plugin") as gp:
+ plugin = gp.return_value
+ plugin.get_port_from_device.return_value = None
self.assertEqual(
self.callbacks.update_device_down("fake_context",
agent_id="123",
host="host"),
{'device': 'device', 'exists': False}
)
- gpfd.return_value = {'id': 'fakeid',
- 'status': q_const.PORT_STATUS_ACTIVE}
+ plugin.get_port_from_device.return_value = {
+ 'id': 'fakeid',
+ 'status': q_const.PORT_STATUS_ACTIVE}
self.assertEqual(
self.callbacks.update_device_down("fake_context",
agent_id="123",
)
def test_update_device_up(self):
- with contextlib.nested(
- mock.patch.object(self.callbacks, "get_port_from_device",
- return_value=None),
- mock.patch.object(manager.NeutronManager, "get_plugin")
- ) as (gpfd, gp):
- gpfd.return_value = {'id': 'fakeid',
- 'status': q_const.PORT_STATUS_ACTIVE}
+ with mock.patch.object(manager.NeutronManager, "get_plugin") as gp:
+ plugin = gp.return_value
+ plugin.get_port_from_device.return_value = {
+ 'id': 'fakeid',
+ 'status': q_const.PORT_STATUS_ACTIVE}
self.callbacks.update_device_up("fake_context",
agent_id="123",
device="device",
host="host")
- gpfd.assert_called_once_with('device')
+ plugin.get_port_from_device.assert_called_once_with('device')
}
def _test_update_device_up(self, extensions, kwargs):
- with mock.patch.object(self.callbacks, '_device_to_port_id'):
+ with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
+ '._device_to_port_id'):
type(self.l3plugin).supported_extension_aliases = (
mock.PropertyMock(return_value=extensions))
self.callbacks.update_device_up(mock.ANY, **kwargs)
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
- callbacks = plugin.endpoints[0]
- port_dict = callbacks.get_port_from_device(port_id)
+ port_dict = plugin.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
+ port_dict = plugin.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
class TestMl2SGServerRpcCallBack(
Ml2SecurityGroupsTestCase,
- test_sg_rpc.SGServerRpcCallBackMixinTestCase):
+ test_sg_rpc.SGServerRpcCallBackTestCase):
pass
class TestMl2SGServerRpcCallBackXML(
Ml2SecurityGroupsTestCase,
- test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML):
+ test_sg_rpc.SGServerRpcCallBackTestCaseXML):
pass
class TestNecSGServerRpcCallBack(
- test_sg_rpc.SGServerRpcCallBackMixinTestCase,
+ test_sg_rpc.SGServerRpcCallBackTestCase,
NecSecurityGroupsTestCase):
pass
class TestNecSGServerRpcCallBackXML(
- test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML,
+ test_sg_rpc.SGServerRpcCallBackTestCaseXML,
NecSecurityGroupsTestCase):
pass
req.get_response(self.api))
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callback_sg.get_port_from_device(port_id)
+ port_dict = plugin.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([sg_id],
port_dict[ext_sg.SECURITYGROUPS])
class TestOneConvergenceSGServerRpcCallBack(
OneConvergenceSecurityGroupsTestCase,
- test_sg_rpc.SGServerRpcCallBackMixinTestCase):
-
+ test_sg_rpc.SGServerRpcCallBackTestCase):
pass
class TestOneConvergenceSGServerRpcCallBackXML(
OneConvergenceSecurityGroupsTestCase,
- test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML):
-
+ test_sg_rpc.SGServerRpcCallBackTestCaseXML):
pass
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
- callbacks = plugin.endpoints[0]
- port_dict = callbacks.get_port_from_device(port_id)
+ port_dict = plugin.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
+ port_dict = plugin.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
class TestOpenvswitchSGServerRpcCallBack(
OpenvswitchSecurityGroupsTestCase,
- test_sg_rpc.SGServerRpcCallBackMixinTestCase):
+ test_sg_rpc.SGServerRpcCallBackTestCase):
pass
class TestOpenvswitchSGServerRpcCallBackXML(
OpenvswitchSecurityGroupsTestCase,
- test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML):
+ test_sg_rpc.SGServerRpcCallBackTestCaseXML):
pass
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
- callbacks = plugin.endpoints[0]
- port_dict = callbacks.get_port_from_device(port_id)
+ port_dict = plugin.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
+ port_dict = plugin.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.endpoints[0].get_port_from_device(port_id)
+ port_dict = plugin.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
+ port_dict = plugin.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
from neutron.agent.linux import iptables_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
+from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as const
from neutron.common import ipv6_utils as ipv6
from neutron.common import rpc as n_rpc
'IPv6_LLA': 'fe80::123'}
-class FakeSGCallback(sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+TEST_PLUGIN_CLASS = ('neutron.tests.unit.test_security_groups_rpc.'
+ 'SecurityGroupRpcTestPlugin')
+
+
+class SecurityGroupRpcTestPlugin(test_sg.SecurityGroupTestPlugin,
+ sg_db_rpc.SecurityGroupServerRpcMixin):
+ def __init__(self):
+ super(SecurityGroupRpcTestPlugin, self).__init__()
+ self.notifier = mock.Mock()
+ self.devices = {}
+
+ def create_port(self, context, port):
+ result = super(SecurityGroupRpcTestPlugin,
+ self).create_port(context, port)
+ self.devices[result['id']] = result
+ self.notify_security_groups_member_updated(context, result)
+ return result
+
+ def update_port(self, context, id, port):
+ original_port = self.get_port(context, id)
+ updated_port = super(SecurityGroupRpcTestPlugin,
+ self).update_port(context, id, port)
+ self.devices[id] = updated_port
+ self.update_security_group_on_port(
+ context, id, port, original_port, updated_port)
+
+ def delete_port(self, context, id):
+ port = self.get_port(context, id)
+ super(SecurityGroupRpcTestPlugin, self).delete_port(context, id)
+ self.notify_security_groups_member_updated(context, port)
+ del self.devices[id]
+
def get_port_from_device(self, device):
device = self.devices.get(device)
if device:
return device
-class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
+class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
def setUp(self, plugin=None):
+ plugin = plugin or TEST_PLUGIN_CLASS
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
- super(SGServerRpcCallBackMixinTestCase, self).setUp(plugin)
- self.rpc = FakeSGCallback()
+ super(SGServerRpcCallBackTestCase, self).setUp(plugin)
+ self.notifier = manager.NeutronManager.get_plugin().notifier
+ self.rpc = securitygroups_rpc.SecurityGroupServerRpcCallback()
def _test_security_group_port(self, device_owner, gw_ip,
cidr, ip_version, ip_address):
gateway_ip=gw_ip,
cidr=cidr,
ip_version=ip_version) as subnet:
- with mock.patch.object(
- self.notifier,
- 'security_groups_provider_updated') as mock_notifier:
- kwargs = {
- 'fixed_ips': [{'subnet_id': subnet['subnet']['id'],
- 'ip_address': ip_address}]}
- if device_owner:
- kwargs['device_owner'] = device_owner
- res = self._create_port(
- self.fmt, net['network']['id'], **kwargs)
- res = self.deserialize(self.fmt, res)
- port_id = res['port']['id']
- if device_owner == const.DEVICE_OWNER_ROUTER_INTF:
- data = {'port': {'fixed_ips': []}}
- req = self.new_update_request('ports', data, port_id)
- res = self.deserialize(self.fmt,
- req.get_response(self.api))
- self._delete('ports', port_id)
- return mock_notifier
+ kwargs = {
+ 'fixed_ips': [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': ip_address}]}
+ if device_owner:
+ kwargs['device_owner'] = device_owner
+ res = self._create_port(
+ self.fmt, net['network']['id'], **kwargs)
+ res = self.deserialize(self.fmt, res)
+ port_id = res['port']['id']
+ if device_owner == const.DEVICE_OWNER_ROUTER_INTF:
+ data = {'port': {'fixed_ips': []}}
+ req = self.new_update_request('ports', data, port_id)
+ res = self.deserialize(self.fmt,
+ req.get_response(self.api))
+ self._delete('ports', port_id)
def test_notify_security_group_ipv6_gateway_port_added(self):
- if getattr(self, "notifier", None) is None:
- self.skipTest("Notifier mock is not set so security group "
- "RPC calls can't be tested")
-
- mock_notifier = self._test_security_group_port(
+ self._test_security_group_port(
const.DEVICE_OWNER_ROUTER_INTF,
'2001:0db8::1',
'2001:0db8::/64',
6,
'2001:0db8::1')
- self.assertTrue(mock_notifier.called)
+ self.assertTrue(self.notifier.security_groups_provider_updated.called)
def test_notify_security_group_ipv6_normal_port_added(self):
- if getattr(self, "notifier", None) is None:
- self.skipTest("Notifier mock is not set so security group "
- "RPC calls can't be tested")
- mock_notifier = self._test_security_group_port(
+ self._test_security_group_port(
None,
'2001:0db8::1',
'2001:0db8::/64',
6,
'2001:0db8::3')
- self.assertFalse(mock_notifier.called)
+ self.assertFalse(self.notifier.security_groups_provider_updated.called)
def test_notify_security_group_ipv4_dhcp_port_added(self):
- if getattr(self, "notifier", None) is None:
- self.skipTest("Notifier mock is not set so security group "
- "RPC calls can't be tested")
- mock_notifier = self._test_security_group_port(
+ self._test_security_group_port(
const.DEVICE_OWNER_DHCP,
'192.168.1.1',
'192.168.1.0/24',
4,
'192.168.1.2')
- self.assertTrue(mock_notifier.called)
+ self.assertTrue(self.notifier.security_groups_provider_updated.called)
def test_notify_security_group_ipv4_gateway_port_added(self):
- if getattr(self, "notifier", None) is None:
- self.skipTest("Notifier mock is not set so security group "
- "RPC calls can't be tested")
- mock_notifier = self._test_security_group_port(
+ self._test_security_group_port(
const.DEVICE_OWNER_ROUTER_INTF,
'192.168.1.1',
'192.168.1.0/24',
4,
'192.168.1.1')
- self.assertFalse(mock_notifier.called)
+ self.assertFalse(self.notifier.security_groups_provider_updated.called)
def test_notify_security_group_ipv4_normal_port_added(self):
- if getattr(self, "notifier", None) is None:
- self.skipTest("Notifier mock is not set so security group "
- "RPC calls can't be tested")
- mock_notifier = self._test_security_group_port(
+ self._test_security_group_port(
None,
'192.168.1.1',
'192.168.1.0/24',
4,
'192.168.1.3')
- self.assertFalse(mock_notifier.called)
+ self.assertFalse(self.notifier.security_groups_provider_updated.called)
def test_security_group_rules_for_devices_ipv4_ingress(self):
fake_prefix = FAKE_PREFIX[const.IPv4]
with self.network() as n:
- with contextlib.nested(self.subnet(n),
- self.security_group()) as (subnet_v4,
- sg1):
+ with contextlib.nested(
+ self.subnet(n),
+ self.security_group()) as (subnet_v4, sg1):
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
self._delete('ports', port_id2)
-class SGServerRpcCallBackMixinTestCaseXML(SGServerRpcCallBackMixinTestCase):
+class SGServerRpcCallBackTestCaseXML(SGServerRpcCallBackTestCase):
fmt = 'xml'