self.sg_agent.security_groups_provider_updated()
-class SecurityGroupAgentRpcMixin(object):
- """A mix-in that enable SecurityGroup agent
- support in agent implementations.
- """
+class SecurityGroupAgentRpc(object):
+ """Enables SecurityGroup agent support in agent implementations."""
+
+ def __init__(self, context, plugin_rpc, root_helper,
+ defer_refresh_firewall=False):
+ self.context = context
+ self.plugin_rpc = plugin_rpc
+ self.root_helper = root_helper
+ self.init_firewall(defer_refresh_firewall)
def init_firewall(self, defer_refresh_firewall=False):
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
return False
-class SecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
- def __init__(self, context, plugin_rpc, root_helper):
- self.context = context
- self.plugin_rpc = plugin_rpc
- self.root_helper = root_helper
- self.init_firewall()
-
-
class RestProxyAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
target = messaging.Target(version='1.1')
super(RestProxyAgent, self).__init__()
self.polling_interval = polling_interval
self._setup_rpc()
- self.sg_agent = SecurityGroupAgent(self.context,
- self.sg_plugin_rpc,
- root_helper)
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
+ self.sg_plugin_rpc,
+ root_helper)
if vs == 'ivs':
self.int_br = IVSBridge(integ_br, root_helper)
else:
config.register_agent_state_opts_helper(cfg.CONF)
-class HyperVSecurityAgent(sg_rpc.SecurityGroupAgentRpcMixin):
-
- def __init__(self, context, plugin_rpc):
- super(HyperVSecurityAgent, self).__init__()
- self.context = context
- self.plugin_rpc = plugin_rpc
+class HyperVSecurityAgent(sg_rpc.SecurityGroupAgentRpc):
+ def __init__(self, context, plugin_rpc, root_helper):
+ super(HyperVSecurityAgent, self).__init__(context, plugin_rpc,
+ root_helper)
if sg_rpc.is_firewall_enabled():
- self.init_firewall()
self._setup_rpc()
def _setup_rpc(self):
# Set RPC API version to 1.1 by default.
target = messaging.Target(version='1.1')
- def __init__(self):
+ def __init__(self, root_helper):
super(HyperVNeutronAgent, self).__init__()
self._utils = utilsfactory.get_hypervutils()
self._polling_interval = CONF.AGENT.polling_interval
self._network_vswitch_map = {}
self._port_metric_retries = {}
self._set_agent_state()
- self._setup_rpc()
+ self._setup_rpc(root_helper)
def _set_agent_state(self):
self.agent_state = {
except Exception:
LOG.exception(_LE("Failed reporting state!"))
- def _setup_rpc(self):
+ def _setup_rpc(self, root_helper):
self.agent_id = 'hyperv_%s' % platform.node()
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
consumers)
self.sec_groups_agent = HyperVSecurityAgent(
- self.context, self.sg_plugin_rpc)
+ self.context, self.sg_plugin_rpc, root_helper)
report_interval = CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
common_config.init(sys.argv[1:])
common_config.setup_logging()
- plugin = HyperVNeutronAgent()
+ root_helper = cfg.CONF.AGENT.root_helper
+ plugin = HyperVNeutronAgent(root_helper)
# Start everything.
LOG.info(_LI("Agent initialized successfully, now running... "))
getattr(self, method)(context, values)
-class LinuxBridgeSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
- def __init__(self, context, plugin_rpc, root_helper):
- self.context = context
- self.plugin_rpc = plugin_rpc
- self.root_helper = root_helper
- self.init_firewall()
-
-
class LinuxBridgeNeutronAgentRPC(object):
def __init__(self, interface_mappings, polling_interval,
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
- self.sg_agent = LinuxBridgeSecurityGroupAgent(self.context,
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc, self.root_helper)
self.setup_rpc(interface_mappings.values())
port['mac_address'])
-class MlnxEswitchSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
- def __init__(self, context, plugin_rpc, root_helper):
- self.context = context
- self.plugin_rpc = plugin_rpc
- self.root_helper = root_helper
- self.init_firewall()
-
-
class MlnxEswitchNeutronAgent(object):
def __init__(self, interface_mapping, root_helper):
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
- self.sg_agent = MlnxEswitchSecurityGroupAgent(self.context,
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc, root_helper)
self._setup_rpc()
self.sg_agent = sg_agent
-class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin):
-
- def __init__(self, context):
- self.context = context
- self.plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
- self.init_firewall()
-
-
class NECNeutronAgent(object):
def __init__(self, integ_br, root_helper, polling_interval):
'agent_type': q_const.AGENT_TYPE_NEC,
'start_flag': True}
- self.setup_rpc()
+ self.setup_rpc(root_helper)
- def setup_rpc(self):
+ def setup_rpc(self, root_helper):
self.host = socket.gethostname()
self.agent_id = 'nec-q-agent.%s' % self.host
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
self.plugin_rpc = NECPluginApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
- self.sg_agent = SecurityGroupAgentRpc(self.context)
+ self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
+ self.sg_plugin_rpc, root_helper)
# RPC network init
# Handle updates from service
self.get_datapath(retry_max)
-class OFASecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
- def __init__(self, context, plugin_rpc, root_helper):
- self.context = context
- self.plugin_rpc = plugin_rpc
- self.root_helper = root_helper
- self.init_firewall(defer_refresh_firewall=True)
-
-
class OFANeutronAgentRyuApp(app_manager.RyuApp):
OFP_VERSIONS = [ryu_ofp13.OFP_VERSION]
self.dont_fragment = cfg.CONF.AGENT.dont_fragment
# Security group agent support
- self.sg_agent = OFASecurityGroupAgent(self.context,
- self.sg_plugin_rpc,
- self.root_helper)
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
+ self.sg_plugin_rpc, self.root_helper,
+ defer_refresh_firewall=True)
# Initialize iteration counter
self.iter_num = 0
self.sg_agent = sg_agent
-class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin):
-
- def __init__(self, context, root_helper):
- self.context = context
-
- self.plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
- self.root_helper = root_helper
- self.init_firewall()
-
-
class NVSDNeutronAgent(object):
# history
# 1.0 Initial version
self.topic = topics.AGENT
self.context = n_context.get_admin_context_without_session()
- self.sg_agent = SecurityGroupAgentRpc(self.context,
- self.root_helper)
+ self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
+ self.sg_plugin_rpc,
+ self.root_helper)
# RPC network init
# Handle updates from service
pass
-class OVSSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
- def __init__(self, context, plugin_rpc, root_helper):
- self.context = context
- self.plugin_rpc = plugin_rpc
- self.root_helper = root_helper
- self.init_firewall(defer_refresh_firewall=True)
-
-
class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
l2population_rpc.L2populationRpcCallBackTunnelMixin,
dvr_rpc.DVRAgentRpcCallbackMixin):
self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)
# Security group agent support
- self.sg_agent = OVSSecurityGroupAgent(self.context,
- self.sg_plugin_rpc,
- root_helper)
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
+ self.sg_plugin_rpc, root_helper, defer_refresh_firewall=True)
+
# Initialize iteration counter
self.iter_num = 0
self.run_daemon_loop = True
LOG.debug("port_update RPC received for port: %s", port['id'])
-class SriovNicSwitchSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
- def __init__(self, context, plugin_rpc, root_helper):
- self.context = context
- self.plugin_rpc = plugin_rpc
- self.root_helper = root_helper
- self.init_firewall()
-
-
class SriovNicSwitchAgent(object):
def __init__(self, physical_devices_mappings, exclude_devices,
polling_interval, root_helper):
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
- self.sg_agent = SriovNicSwitchSecurityGroupAgent(self.context,
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc, self.root_helper)
self._setup_rpc()
# Initialize iteration counter
CONTEXT = 'neutron.context'
CONSUMERCREATE = 'neutron.agent.rpc.create_consumers'
SGRPC = 'neutron.agent.securitygroups_rpc'
-SGAGENT = 'neutron.plugins.bigswitch.agent.restproxy_agent.SecurityGroupAgent'
+SGAGENT = 'neutron.agent.securitygroups_rpc.SecurityGroupAgentRpc'
AGENTMOD = 'neutron.plugins.bigswitch.agent.restproxy_agent'
NEUTRONCFG = 'neutron.common.config'
PLCONFIG = 'neutron.plugins.bigswitch.config'
self.ovsbridge = mock.patch(OVSBRIDGE).start()
self.context = mock.patch(CONTEXT).start()
self.rpc = mock.patch(CONSUMERCREATE).start()
- self.sg_rpc = mock.patch(SGRPC).start()
self.sg_agent = mock.patch(SGAGENT).start()
+ self.sg_rpc = mock.patch(SGRPC).start()
def mock_agent(self):
mock_context = mock.Mock(return_value='abc')
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
- self.agent = hyperv_neutron_agent.HyperVNeutronAgent()
+ self.agent = hyperv_neutron_agent.HyperVNeutronAgent(None)
self.agent.plugin_rpc = mock.Mock()
self.agent.sec_groups_agent = mock.MagicMock()
self.agent.context = mock.Mock()
import testtools
from neutron.agent.linux import ovs_lib
+from neutron.agent import securitygroups_rpc as sg_rpc
+from neutron.common import topics
from neutron.extensions import securitygroup as ext_sg
from neutron.plugins.oneconvergence.agent import nvsd_neutron_agent
from neutron.tests import base
'polling_interval': 5}
context = mock.Mock()
self.agent = nvsd_neutron_agent.NVSDNeutronAgent(**kwargs)
- self.sg_agent = nvsd_neutron_agent.SecurityGroupAgentRpc(
- context, 'dummy_wrapper')
+ sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(context,
+ sg_plugin_rpc, 'dummy_wrapper')
self.callback_nvsd = nvsd_neutron_agent.NVSDAgentRpcCallback(
context, self.agent, self.sg_agent)
self.loopingcall = loopingcall
class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
def test_init_firewall_with_none_driver(self):
set_enable_security_groups(False)
- agent = sg_rpc.SecurityGroupAgentRpcMixin()
- agent.plugin_rpc = mock.Mock()
- agent.context = None
- agent.init_firewall()
+ agent = sg_rpc.SecurityGroupAgentRpc(
+ context=None, plugin_rpc=mock.Mock(), root_helper=None)
self.assertEqual(agent.firewall.__class__.__name__,
'NoopFirewallDriver')
def setUp(self, defer_refresh_firewall=False):
super(BaseSecurityGroupAgentRpcTestCase, self).setUp()
set_firewall_driver(FIREWALL_NOOP_DRIVER)
- self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
- self.agent.context = None
+ self.agent = sg_rpc.SecurityGroupAgentRpc(
+ context=None, plugin_rpc=mock.Mock(), root_helper='sudo',
+ defer_refresh_firewall=defer_refresh_firewall)
mock.patch('neutron.agent.linux.iptables_manager').start()
- self.agent.root_helper = 'sudo'
- self.agent.plugin_rpc = mock.Mock()
- self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.default_firewall = self.agent.firewall
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
cfg.CONF.set_override('enable_ipset', False, group='SECURITYGROUP')
cfg.CONF.set_override('comment_iptables_rules', False, group='AGENT')
- self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
- self.agent.context = None
-
- self.root_helper = 'sudo'
- self.agent.root_helper = 'sudo'
self.rpc = mock.Mock()
- self.agent.plugin_rpc = self.rpc
+ self.agent = sg_rpc.SecurityGroupAgentRpc(
+ context=None, plugin_rpc=self.rpc, root_helper='sudo',
+ defer_refresh_firewall=defer_refresh_firewall)
+ self.root_helper = 'sudo'
if test_rpc_v1_1:
self.rpc.security_group_info_for_devices.side_effect = (
messaging.UnsupportedVersion('1.2'))
- self.agent.init_firewall(
- defer_refresh_firewall=defer_refresh_firewall)
self.iptables = self.agent.firewall.iptables
# TODO(jlibosva) Get rid of mocking iptables execute and mock out
# firewall instead