# Based on the structure of the OpenVSwitch agent in the
# Neutron OpenVSwitch Plugin.
+import collections
import sys
import time
from six import moves
from neutron._i18n import _LE, _LI, _LW
+from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import bridge_lib
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
# history
# 1.1 Support Security Group RPC
# 1.3 Added param devices_to_update to security_groups_provider_updated
- target = oslo_messaging.Target(version='1.3')
+ # 1.4 Added support for network_update
+ target = oslo_messaging.Target(version='1.4')
def __init__(self, context, agent, sg_agent):
super(LinuxBridgeRpcCallbacks, self).__init__()
self.agent.updated_devices.add(tap_name)
LOG.debug("port_update RPC received for port: %s", port_id)
+ def network_update(self, context, **kwargs):
+ network_id = kwargs['network']['id']
+ LOG.debug("network_update message processed for network "
+ "%(network_id)s, with ports: %(ports)s",
+ {'network_id': network_id,
+ 'ports': self.agent.network_ports[network_id]})
+ for port_data in self.agent.network_ports[network_id]:
+ self.agent.updated_devices.add(port_data['device'])
+
def fdb_add(self, context, fdb_entries):
LOG.debug("fdb_add received")
for network_id, values in fdb_entries.items():
def start(self):
self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
self.setup_linux_bridge(self.bridge_mappings, self.interface_mappings)
- configurations = {'bridge_mappings': self.bridge_mappings,
- 'interface_mappings': self.interface_mappings}
+
+ # stores received port_updates and port_deletes for
+ # processing by the main loop
+ self.updated_devices = set()
+
+ # stores all configured ports on agent
+ self.network_ports = collections.defaultdict(list)
+ # flag to do a sync after revival
+ self.fullsync = False
+ self.context = context.get_admin_context_without_session()
+ self.setup_rpc(self.interface_mappings.values())
+ self.init_extension_manager(self.connection)
+
+ configurations = {
+ 'bridge_mappings': self.bridge_mappings,
+ 'interface_mappings': self.interface_mappings,
+ 'extensions': self.ext_manager.names()
+ }
if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE:
configurations['tunneling_ip'] = self.br_mgr.local_ip
configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
'start_flag': True}
- # stores received port_updates for processing by the main loop
- self.updated_devices = set()
- # flag to do a sync after revival
- self.fullsync = False
- self.context = context.get_admin_context_without_session()
- self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
- self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
- self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
- self.sg_plugin_rpc, defer_refresh_firewall=True)
- self.setup_rpc(self.interface_mappings.values())
+ report_interval = cfg.CONF.AGENT.report_interval
+ if report_interval:
+ heartbeat = loopingcall.FixedIntervalLoopingCall(
+ self._report_state)
+ heartbeat.start(interval=report_interval)
self.daemon_loop()
def stop(self, graceful=True):
LOG.error(_LE("Unable to obtain MAC address for unique ID. "
"Agent terminated!"))
exit(1)
+
+ self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
+ self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
+ self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
+ self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
+
self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
+ [topics.NETWORK, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
+
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
- report_interval = cfg.CONF.AGENT.report_interval
- if report_interval:
- heartbeat = loopingcall.FixedIntervalLoopingCall(
- self._report_state)
- heartbeat.start(interval=report_interval)
+
+ def init_extension_manager(self, connection):
+ ext_manager.register_opts(cfg.CONF)
+ self.ext_manager = (
+ ext_manager.AgentExtensionsManager(cfg.CONF))
+ self.ext_manager.initialize(
+ connection, lconst.EXTENSION_DRIVER_TYPE)
def setup_linux_bridge(self, bridge_mappings, interface_mappings):
self.br_mgr = LinuxBridgeManager(bridge_mappings, interface_mappings)
else:
ip_lib.IPDevice(tap_name).link.set_down()
+ def _clean_network_ports(self, device):
+ for netid, ports_list in self.network_ports.items():
+ for port_data in ports_list:
+ if device == port_data['device']:
+ ports_list.remove(port_data)
+ if ports_list == []:
+ self.network_ports.pop(netid)
+ return port_data['port_id']
+
+ def _update_network_ports(self, network_id, port_id, device):
+ self._clean_network_ports(device)
+ self.network_ports[network_id].append({
+ "port_id": port_id,
+ "device": device
+ })
+
def process_network_devices(self, device_info):
resync_a = False
resync_b = False
device,
self.agent_id,
cfg.CONF.host)
+ self._update_network_ports(device_details['network_id'],
+ device_details['port_id'],
+ device_details['device'])
+ self.ext_manager.handle_port(self.context, device_details)
else:
LOG.info(_LI("Device %s not defined on plugin"), device)
return False
LOG.info(_LI("Port %s updated."), device)
else:
LOG.debug("Device %s not defined on plugin", device)
+ port_id = self._clean_network_ports(device)
+ self.ext_manager.delete_port(self.context,
+ {'device': device,
+ 'port_id': port_id})
if self.prevent_arp_spoofing:
arp_protect.delete_arp_spoofing_protection(devices)
return resync
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import sys
import mock
from neutron.tests import base
LOCAL_IP = '192.168.0.33'
+PORT_1 = 'abcdef01-12ddssdfds-fdsfsd'
DEVICE_1 = 'tapabcdef01-12'
+NETWORK_ID = '57653b20-ed5b-4ed0-a31d-06f84e3fd909'
BRIDGE_MAPPING_VALUE = 'br-eth2'
BRIDGE_MAPPINGS = {'physnet0': BRIDGE_MAPPING_VALUE}
INTERFACE_MAPPINGS = {'physnet1': 'eth1'}
FAKE_DEFAULT_DEV = mock.Mock()
FAKE_DEFAULT_DEV.name = 'eth1'
+PORT_DATA = {
+ "port_id": PORT_1,
+ "device": DEVICE_1
+}
class FakeIpLinkCommand(object):
agent = self.agent
agent._ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1]
+ agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
- "remove_devices_filter") as fn_rdf:
+ "remove_devices_filter") as fn_rdf,\
+ mock.patch.object(agent.ext_manager,
+ "delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': True}
with mock.patch.object(linuxbridge_neutron_agent.LOG,
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
+ self.assertTrue(ext_mgr_delete_port.called)
+ self.assertTrue(
+ PORT_DATA not in agent.network_ports[NETWORK_ID]
+ )
def test_treat_devices_removed_with_not_existed_device(self):
agent = self.agent
devices = [DEVICE_1]
+ agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
- "remove_devices_filter") as fn_rdf:
+ "remove_devices_filter") as fn_rdf,\
+ mock.patch.object(agent.ext_manager,
+ "delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': False}
with mock.patch.object(linuxbridge_neutron_agent.LOG,
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
+ self.assertTrue(ext_mgr_delete_port.called)
+ self.assertTrue(
+ PORT_DATA not in agent.network_ports[NETWORK_ID]
+ )
def test_treat_devices_removed_failed(self):
agent = self.agent
devices = [DEVICE_1]
+ agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
- "remove_devices_filter") as fn_rdf:
+ "remove_devices_filter") as fn_rdf,\
+ mock.patch.object(agent.ext_manager,
+ "delete_port") as ext_mgr_delete_port:
fn_udd.side_effect = Exception()
resync = agent.treat_devices_removed(devices)
self.assertTrue(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
+ self.assertTrue(ext_mgr_delete_port.called)
+ self.assertTrue(
+ PORT_DATA not in agent.network_ports[NETWORK_ID]
+ )
def _test_scan_devices(self, previous, updated,
fake_current, expected, sync):
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
+ def test_scan_devices_updated_deleted_concurrently(self):
+ previous = {
+ 'current': set([1, 2]),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set()
+ }
+ # Device 2 disappeared.
+ fake_current = set([1])
+ # Device 2 got an concurrent update via network_update
+ updated = set([2])
+ expected = {
+ 'current': set([1]),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set([2])
+ }
+ self._test_scan_devices(
+ previous, updated, fake_current, expected, sync=False
+ )
+
def test_scan_devices_updated_on_sync(self):
previous = {'current': set([1, 2]),
'updated': set([1]),
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
+ mock_port_data = {
+ 'port_id': mock_details['port_id'],
+ 'device': mock_details['device']
+ }
+ agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.br_mgr = mock.Mock()
100, 'port123',
constants.DEVICE_OWNER_NETWORK_PREFIX)
self.assertTrue(agent.plugin_rpc.update_device_up.called)
+ self.assertTrue(agent.ext_manager.handle_port.called)
+ self.assertTrue(
+ mock_port_data in agent.network_ports[mock_details['network_id']]
+ )
def test_set_rpc_timeout(self):
self.agent.stop()
def test_ensure_port_admin_state_down(self):
self._test_ensure_port_admin_state(False)
+ def test_update_network_ports(self):
+ port_1_data = PORT_DATA
+ NETWORK_2_ID = 'fake_second_network'
+ port_2_data = {
+ 'port_id': 'fake_port_2',
+ 'device': 'fake_port_2_device_name'
+ }
+ self.agent.network_ports[NETWORK_ID].append(
+ port_1_data
+ )
+ self.agent.network_ports[NETWORK_ID].append(
+ port_2_data
+ )
+ #check update port:
+ self.agent._update_network_ports(
+ NETWORK_2_ID, port_2_data['port_id'], port_2_data['device']
+ )
+ self.assertTrue(
+ port_2_data not in self.agent.network_ports[NETWORK_ID]
+ )
+ self.assertTrue(
+ port_2_data in self.agent.network_ports[NETWORK_2_ID]
+ )
+
+ def test_clean_network_ports(self):
+ port_1_data = PORT_DATA
+ port_2_data = {
+ 'port_id': 'fake_port_2',
+ 'device': 'fake_port_2_device_name'
+ }
+ self.agent.network_ports[NETWORK_ID].append(
+ port_1_data
+ )
+ self.agent.network_ports[NETWORK_ID].append(
+ port_2_data
+ )
+ #check removing port from network when other ports are still there:
+ cleaned_port_id = self.agent._clean_network_ports(DEVICE_1)
+ self.assertTrue(
+ NETWORK_ID in self.agent.network_ports.keys()
+ )
+ self.assertTrue(
+ port_1_data not in self.agent.network_ports[NETWORK_ID]
+ )
+ self.assertTrue(
+ port_2_data in self.agent.network_ports[NETWORK_ID]
+ )
+ self.assertEqual(cleaned_port_id, PORT_1)
+ #and now remove last port from network:
+ cleaned_port_id = self.agent._clean_network_ports(
+ port_2_data['device']
+ )
+ self.assertTrue(
+ NETWORK_ID not in self.agent.network_ports.keys()
+ )
+ self.assertEqual(cleaned_port_id, port_2_data['port_id'])
+
class TestLinuxBridgeManager(base.BaseTestCase):
def setUp(self):
segment.network_type = 'vxlan'
segment.segmentation_id = 1
self.br_mgr.network_map['net_id'] = segment
+ self.updated_devices = set()
+ self.network_ports = collections.defaultdict(list)
self.lb_rpc = linuxbridge_neutron_agent.LinuxBridgeRpcCallbacks(
object(),
mock_net = mock.Mock()
mock_net.physical_network = None
- self.lb_rpc.agent.br_mgr.network_map = {'123': mock_net}
+ self.lb_rpc.agent.br_mgr.network_map = {NETWORK_ID: mock_net}
with mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_bridge_name") as get_br_fn,\
mock.patch.object(self.lb_rpc.agent.br_mgr,
"delete_bridge") as del_fn:
get_br_fn.return_value = "br0"
- self.lb_rpc.network_delete("anycontext", network_id="123")
- get_br_fn.assert_called_with("123")
+ self.lb_rpc.network_delete("anycontext", network_id=NETWORK_ID)
+ get_br_fn.assert_called_with(NETWORK_ID)
del_fn.assert_called_with("br0")
+ def test_port_update(self):
+ port = {'id': PORT_1}
+ self.lb_rpc.port_update(context=None, port=port)
+ self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices)
+
+ def test_network_update(self):
+ updated_network = {'id': NETWORK_ID}
+ self.lb_rpc.agent.network_ports = {
+ NETWORK_ID: [PORT_DATA]
+ }
+ self.lb_rpc.network_update(context=None, network=updated_network)
+ self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices)
+
def test_network_delete_with_existed_brq(self):
mock_net = mock.Mock()
mock_net.physical_network = 'physnet0'