also, use ofport.name as device identifiers.
sprinkle some TODO comments.
Implements: blueprint ofagent-port-monitor
Change-Id: I4995964f37729f954fec71c4a2e61e463a430b1a
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
+from neutron.plugins.ofagent.agent import ports
from neutron.plugins.ofagent.common import config # noqa
from neutron.plugins.openvswitch.common import constants
self._report_state)
heartbeat.start(interval=report_interval)
+ @staticmethod
+ def _get_ofport_name(interface_id):
+ """Convert from neutron device id (uuid) to OpenFlow port name.
+
+ This needs to be synced with ML2 plugin's _device_to_port_id().
+
+ An assumption: The switch uses an OS's interface name as the
+ corresponding OpenFlow port name.
+ NOTE(yamamoto): While it's true for Open vSwitch, it isn't
+ necessarily true everywhere. For example, LINC uses something
+ like "LogicalSwitch0-Port2".
+ """
+ return "tap" + interface_id[0:11]
+
+ def _get_ports(self, br):
+ """Generate ports.Port instances for the given bridge."""
+ datapath = br.datapath
+ ofpp = datapath.ofproto_parser
+ msg = ofpp.OFPPortDescStatsRequest(datapath=datapath)
+ descs = ryu_api.send_msg(app=self.ryuapp, msg=msg,
+ reply_cls=ofpp.OFPPortDescStatsReply,
+ reply_multi=True)
+ for d in descs:
+ for p in d.body:
+ yield ports.Port.from_ofp_port(p)
+
+ def _get_ofport_names(self, br):
+ """Return a set of OpenFlow port names for the given bridge."""
+ return set(p.port_name for p in self._get_ports(br))
+
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
if vif_id in vlan_mapping.vif_ports:
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
- self.updated_ports.add(port['id'])
+ self.updated_ports.add(self._get_ofport_name(port['id']))
LOG.debug(_("port_update received port %s"), port['id'])
def tunnel_update(self, context, **kwargs):
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.local_vlan_map[net_uuid]
- lvm.vif_ports[port.vif_id] = port
+ lvm.vif_ports[port.port_name] = port
# Do not bind a port if it's already bound
cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag")
if cur_tag != str(lvm.vlan):
br, physical_network, bridge, ip_wrapper)
def scan_ports(self, registered_ports, updated_ports=None):
- cur_ports = self.int_br.get_vif_port_set()
+ cur_ports = self._get_ofport_names(self.int_br)
self.int_br_device_count = len(cur_ports)
port_info = {'current': cur_ports}
if updated_ports is None:
The returned value is a set of port ids of the ports concerned by a
vlan tag loss.
"""
+ # TODO(yamamoto): stop using ovsdb
+ # an idea is to use metadata instead of tagged vlans.
+ # cf. blueprint ofagent-merge-bridges
port_tags = self.int_br.get_port_tag_dict()
changed_ports = set()
for lvm in self.local_vlan_map.values():
for port in registered_ports:
if (
port in lvm.vif_ports
- and lvm.vif_ports[port].port_name in port_tags
- and port_tags[lvm.vif_ports[port].port_name] != lvm.vlan
+ and port in port_tags
+ and port_tags[port] != lvm.vlan
):
LOG.info(
_("Port '%(port_name)s' has lost "
"its vlan tag '%(vlan_tag)d'!"),
- {'port_name': lvm.vif_ports[port].port_name,
+ {'port_name': port,
'vlan_tag': lvm.vlan}
)
changed_ports.add(port)
return changed_ports
def update_ancillary_ports(self, registered_ports):
+ # TODO(yamamoto): stop using ovsdb
+ # - do the same as scan_ports
+ # - or, find a way to update status of ancillary ports differently
+ # eg. let interface drivers mark ports up
ports = set()
for bridge in self.ancillary_brs:
ports |= bridge.get_vif_port_set()
# error condition of which operators should be aware
if not vif_port.ofport:
LOG.warn(_("VIF port: %s has no ofport configured, and might "
- "not be able to transmit"), vif_port.vif_id)
+ "not be able to transmit"), vif_port.port_name)
if admin_state_up:
self.port_bound(vif_port, network_id, network_type,
physical_network, segmentation_id)
def treat_devices_added_or_updated(self, devices):
resync = False
+ all_ports = dict((p.port_name, p) for p in self._get_ports())
for device in devices:
LOG.debug(_("Processing port %s"), device)
- port = self.int_br.get_vif_port_by_id(device)
- if not port:
+ if device not in all_ports:
# The port has disappeared and should not be processed
# There is no need to put the port DOWN in the plugin as
# it never went up in the first place
LOG.info(_("Port %s was not found on the integration bridge "
"and will therefore not be processed"), device)
continue
+ port = all_ports[device]
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
self.iter_num = self.iter_num + 1
def daemon_loop(self):
+ # TODO(yamamoto): make polling logic stop using ovsdb monitor
+ # - make it a dumb periodic polling
+ # - or, monitor port status async messages
with polling.get_polling_manager(
self.minimize_polling,
self.root_helper,
--- /dev/null
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
+
+
+class Port(object):
+ def __init__(self, port_name, ofport):
+ self.port_name = port_name
+ self.ofport = ofport
+
+ @classmethod
+ def from_ofp_port(cls, ofp_port):
+ """Convert from ryu OFPPort."""
+ return cls(port_name=ofp_port.name, ofport=ofp_port.port_no)
self._hist = []
def __getattr__(self, name):
- return name
+ return self._kwargs[name]
def __repr__(self):
args = map(repr, self._args)
class _Mod(object):
+ _cls_cache = {}
+
def __init__(self, name):
self._name = name
fullname = '%s.%s' % (self._name, name)
if '_' in name: # constants are named like OFPxxx_yyy_zzz
return _SimpleValue(fullname)
- return _mkcls(fullname)
+ try:
+ return self._cls_cache[fullname]
+ except KeyError:
+ pass
+ cls = _mkcls(fullname)
+ self._cls_cache[fullname] = cls
+ return cls
def __repr__(self):
return 'Mod(%s)' % (self._name,)
import testtools
from neutron.agent.linux import ip_lib
-from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.openstack.common import importutils
from neutron.plugins.common import constants as p_const
def test_port_dead_with_port_already_dead(self):
self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
- def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
+ def mock_scan_ports(self, port_set=None, registered_ports=None,
updated_ports=None, port_tags_dict=None):
port_tags_dict = port_tags_dict or {}
with contextlib.nested(
- mock.patch.object(self.agent.int_br, 'get_vif_port_set',
- return_value=vif_port_set),
+ mock.patch.object(self.agent, '_get_ofport_names',
+ return_value=port_set),
mock.patch.object(self.agent.int_br, 'get_port_tag_dict',
return_value=port_tags_dict)
):
self.assertEqual(expected, actual)
def test_update_ports_returns_lost_vlan_port(self):
- br = self.mod_agent.OVSBridge('br-int', 'fake_helper', self.ryuapp)
- mac = "ca:fe:de:ad:be:ef"
- port = ovs_lib.VifPort(1, 1, 1, mac, br)
+ port = mock.Mock(port_name='tap00000001-00', ofport=1)
lvm = self.mod_agent.LocalVLANMapping(
- 1, '1', None, 1, {port.vif_id: port})
+ vlan=1, network_type='1', physical_network=None, segmentation_id=1,
+ vif_ports={port.port_name: port})
local_vlan_map = {'1': lvm}
- vif_port_set = set([1, 3])
- registered_ports = set([1, 2])
- port_tags_dict = {1: []}
+ port_set = set(['tap00000001-00',
+ 'tap00000003-00'])
+ registered_ports = set(['tap00000001-00', 'tap00000002-00'])
+ port_tags_dict = {'tap00000001-00': []}
expected = dict(
- added=set([3]), current=vif_port_set,
- removed=set([2]), updated=set([1])
+ added=set(['tap00000003-00']),
+ current=set(['tap00000001-00', 'tap00000003-00']),
+ removed=set(['tap00000002-00']),
+ updated=set(['tap00000001-00'])
)
with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map):
actual = self.mock_scan_ports(
- vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
+ port_set, registered_ports, port_tags_dict=port_tags_dict)
self.assertEqual(expected, actual)
def test_treat_devices_added_returns_true_for_missing_device(self):
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
side_effect=Exception()),
- mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
- return_value=mock.Mock())):
- self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
+ mock.patch.object(self.agent, '_get_ports',
+ return_value=[mock.Mock(port_name='xxx')])):
+ self.assertTrue(self.agent.treat_devices_added_or_updated(['xxx']))
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
return_value=details),
- mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
- return_value=port),
+ mock.patch.object(self.agent, '_get_ports',
+ return_value=[port]),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
- self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+ self.assertFalse(self.agent.treat_devices_added_or_updated(
+ [port.port_name]))
return func.called
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
return_value=fake_details_dict),
- mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
- return_value=mock.MagicMock()),
+ mock.patch.object(self.agent, '_get_ports',
+ return_value=[mock.Mock(port_name='xxx')]),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, 'treat_vif_port')
) as (get_dev_fn, get_vif_func, upd_dev_up,
upd_dev_down, treat_vif_port):
- self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+ self.assertFalse(self.agent.treat_devices_added_or_updated(
+ ['xxx']))
self.assertTrue(treat_vif_port.called)
self.assertTrue(upd_dev_down.called)
recl_fn.assert_called_with("123")
def test_port_update(self):
- port = {"id": "123",
+ port = {"id": "b1981919-f516-11e3-a8f4-08606e7f74e7",
"network_id": "124",
"admin_state_up": False}
self.agent.port_update("unused_context",
network_type="vlan",
segmentation_id="1",
physical_network="physnet")
- self.assertEqual(set(['123']), self.agent.updated_ports)
+ self.assertEqual(set(['tapb1981919-f5']), self.agent.updated_ports)
def test_setup_physical_bridges(self):
with contextlib.nested(
table_id=ofp.OFPTT_ALL)
sendmsg.assert_has_calls([mock.call(expected_msg)])
+ def test__get_ports(self):
+ ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
+ reply = [ofpp.OFPPortDescStatsReply(body=[ofpp.OFPPort(name='hoge',
+ port_no=8)])]
+ sendmsg = mock.Mock(return_value=reply)
+ self.mod_agent.ryu_api.send_msg = sendmsg
+ result = self.agent._get_ports(self.agent.int_br)
+ result = list(result) # convert generator to list.
+ self.assertEqual(1, len(result))
+ self.assertEqual('hoge', result[0].port_name)
+ self.assertEqual(8, result[0].ofport)
+ expected_msg = ofpp.OFPPortDescStatsRequest(
+ datapath=self.agent.int_br.datapath)
+ sendmsg.assert_has_calls([mock.call(app=self.agent.ryuapp,
+ msg=expected_msg, reply_cls=ofpp.OFPPortDescStatsReply,
+ reply_multi=True)])
+
+ def test__get_ofport_names(self):
+ names = ['p111', 'p222', 'p333']
+ ps = [mock.Mock(port_name=x, ofport=names.index(x)) for x in names]
+ with mock.patch.object(self.agent, '_get_ports',
+ return_value=ps) as _get_ports:
+ result = self.agent._get_ofport_names('hoge')
+ _get_ports.assert_called_once_with('hoge')
+ self.assertEqual(set(names), result)
+
class AncillaryBridgesTest(OFAAgentTestCase):
--- /dev/null
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
+
+
+import mock
+
+from neutron.plugins.ofagent.agent import ports
+from neutron.tests import base
+
+
+class TestOFAgentPorts(base.BaseTestCase):
+ def test_port(self):
+ p1 = ports.Port(port_name='foo', ofport=999)
+ ryu_ofp_port = mock.Mock(port_no=999)
+ ryu_ofp_port.name = 'foo'
+ p2 = ports.Port.from_ofp_port(ofp_port=ryu_ofp_port)
+ self.assertEqual(p1.port_name, p2.port_name)
+ self.assertEqual(p1.ofport, p2.ofport)