from neutron.agent import rpc as agent_rpc
from neutron.common import config
from neutron.common import constants as n_const
+from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import context
from neutron.openstack.common.cache import cache
from neutron.openstack.common import excutils
+from neutron.openstack.common.gettextutils import _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron import wsgi
LOG = logging.getLogger(__name__)
+class MetadataPluginAPI(n_rpc.RpcProxy):
+ """Agent-side RPC (stub) for agent-to-plugin interaction.
+
+ API version history:
+ 1.0 - Initial version.
+ """
+
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic):
+ super(MetadataPluginAPI, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def get_ports(self, context, filters):
+ return self.call(context,
+ self.make_msg('get_ports',
+ filters=filters))
+
+
class MetadataProxyHandler(object):
OPTS = [
cfg.StrOpt('admin_user',
else:
self._cache = False
+ self.plugin_rpc = MetadataPluginAPI(topics.PLUGIN)
+ self.context = context.get_admin_context_without_session()
+ # Use RPC by default
+ self.use_rpc = True
+
def _get_neutron_client(self):
qclient = client.Client(
username=self.conf.admin_user,
'Please try your request again.')
return webob.exc.HTTPInternalServerError(explanation=unicode(msg))
+ def _get_ports_from_server(self, router_id=None, ip_address=None,
+ networks=None):
+ """Either get ports from server by RPC or fallback to neutron client"""
+ filters = self._get_port_filters(router_id, ip_address, networks)
+ if self.use_rpc:
+ try:
+ return self.plugin_rpc.get_ports(self.context, filters)
+ except (n_rpc.RPCException, AttributeError):
+ # TODO(obondarev): remove fallback once RPC is proven
+ # to work fine with metadata agent (K or L release at most)
+ LOG.warning(_LW('Server does not support metadata RPC, '
+ 'fallback to using neutron client'))
+ self.use_rpc = False
+
+ return self._get_ports_using_client(filters)
+
+ def _get_port_filters(self, router_id=None, ip_address=None,
+ networks=None):
+ filters = {}
+ if router_id:
+ filters['device_id'] = [router_id]
+ filters['device_owner'] = [
+ n_const.DEVICE_OWNER_ROUTER_INTF,
+ n_const.DEVICE_OWNER_DVR_INTERFACE]
+ if ip_address:
+ filters['fixed_ips'] = {'ip_address': [ip_address]}
+ if networks:
+ filters['network_id'] = networks
+
+ return filters
+
@utils.cache_method_results
def _get_router_networks(self, router_id):
"""Find all networks connected to given router."""
- qclient = self._get_neutron_client()
-
- internal_ports = qclient.list_ports(
- device_id=router_id,
- device_owner=[n_const.DEVICE_OWNER_ROUTER_INTF,
- n_const.DEVICE_OWNER_DVR_INTERFACE])['ports']
- self.auth_info = qclient.get_auth_info()
+ internal_ports = self._get_ports_from_server(router_id=router_id)
return tuple(p['network_id'] for p in internal_ports)
@utils.cache_method_results
searched for
"""
- qclient = self._get_neutron_client()
- all_ports = qclient.list_ports(
- fixed_ips=['ip_address=%s' % remote_address])['ports']
-
- self.auth_info = qclient.get_auth_info()
- networks = set(networks)
- return [p for p in all_ports if p['network_id'] in networks]
+ return self._get_ports_from_server(networks=networks,
+ ip_address=remote_address)
+
+ def _get_ports_using_client(self, filters):
+ # reformat filters for neutron client
+ if 'device_id' in filters:
+ filters['device_id'] = filters['device_id'][0]
+ if 'fixed_ips' in filters:
+ filters['fixed_ips'] = [
+ 'ip_address=%s' % filters['fixed_ips']['ip_address'][0]]
+
+ client = self._get_neutron_client()
+ ports = client.list_ports(**filters)
+ self.auth_info = client.get_auth_info()
+ return ports['ports']
def _get_ports(self, remote_address, network_id=None, router_id=None):
"""Search for all ports that contain passed ip address and belongs to
--- /dev/null
+# Copyright (c) 2014 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from neutron.common import rpc as n_rpc
+from neutron import manager
+
+
+class MetadataRpcCallback(n_rpc.RpcCallback):
+ """Metadata agent RPC callback in plugin implementations."""
+
+ # 1.0 MetadataPluginAPI BASE_RPC_API_VERSION
+ RPC_API_VERSION = '1.0'
+
+ @property
+ def plugin(self):
+ if not hasattr(self, '_plugin'):
+ self._plugin = manager.NeutronManager.get_plugin()
+ return self._plugin
+
+ def get_ports(self, context, filters):
+ return self.plugin.get_ports(context, filters=filters)
from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as const
from neutron.common import exceptions
)
self.endpoints = [securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
# Consume from all consumers in threads
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
self.service_topics = {svc_constants.CORE: topics.PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [dhcp_rpc.DhcpRpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
from webob import exc as w_exc
from neutron.api.rpc.handlers import dhcp_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [dhcp_rpc.DhcpRpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
# Consume from all consumers in threads
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import dvr_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as const
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints,
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
self.callback_sg,
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const
from neutron.common import exceptions as nexception
self.endpoints = [securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
- l3_rpc.L3RpcCallback()]
+ l3_rpc.L3RpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
+from neutron.api.rpc.handlers import metadata_rpc
from neutron.common import constants as const
from neutron.common import rpc as n_rpc
from neutron.common import topics
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [dhcp_rpc.DhcpRpcCallback(),
- agents_db.AgentExtRpcCallback()]
+ agents_db.AgentExtRpcCallback(),
+ metadata_rpc.MetadataRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
cache_url = 'memory://?default_ttl=5'
-class TestMetadataProxyHandlerCache(base.BaseTestCase):
+class TestMetadataProxyHandlerBase(base.BaseTestCase):
+ fake_conf = FakeConf
+
+ def setUp(self):
+ super(TestMetadataProxyHandlerBase, self).setUp()
+ self.log_p = mock.patch.object(agent, 'LOG')
+ self.log = self.log_p.start()
+ self.handler = agent.MetadataProxyHandler(self.fake_conf)
+ self.handler.plugin_rpc = mock.Mock()
+ self.handler.context = mock.Mock()
+
+
+class TestMetadataProxyHandlerRpc(TestMetadataProxyHandlerBase):
+ def test_get_port_filters(self):
+ router_id = 'test_router_id'
+ ip = '1.2.3.4'
+ networks = ('net_id1', 'net_id2')
+ expected = {'device_id': [router_id],
+ 'device_owner': EXPECTED_OWNER_ROUTERS,
+ 'network_id': networks,
+ 'fixed_ips': {'ip_address': [ip]}}
+ actual = self.handler._get_port_filters(router_id, ip, networks)
+ self.assertEqual(expected, actual)
+
+ def test_get_router_networks(self):
+ router_id = 'router-id'
+ expected = ('network_id1', 'network_id2')
+ ports = [{'network_id': 'network_id1', 'something': 42},
+ {'network_id': 'network_id2', 'something_else': 32}]
+ self.handler.plugin_rpc.get_ports.return_value = ports
+ networks = self.handler._get_router_networks(router_id)
+ self.assertEqual(expected, networks)
+
+ def test_get_ports_for_remote_address(self):
+ ip = '1.1.1.1'
+ networks = ('network_id1', 'network_id2')
+ expected = [{'port_id': 'port_id1'},
+ {'port_id': 'port_id2'}]
+ self.handler.plugin_rpc.get_ports.return_value = expected
+ ports = self.handler._get_ports_for_remote_address(ip, networks)
+ self.assertEqual(expected, ports)
+
+ def test_get_ports_using_rpc_fallback_to_client(self):
+ ip = '1.1.1.1'
+ networks = ('network_id1', 'network_id2')
+ self.handler.plugin_rpc.get_ports.side_effect = AttributeError
+ with mock.patch('neutronclient.v2_0.client.Client') as neutron_client:
+ mock_list_ports = neutron_client.return_value.list_ports
+ expected_ports = {'ports': ['expected_port']}
+ mock_list_ports.return_value = expected_ports
+ ports = self.handler._get_ports_from_server(ip_address=ip,
+ networks=networks)
+ self.assertEqual(expected_ports['ports'], ports)
+
+
+class TestMetadataProxyHandlerCache(TestMetadataProxyHandlerBase):
fake_conf = FakeConfCache
def setUp(self):
super(TestMetadataProxyHandlerCache, self).setUp()
self.qclient_p = mock.patch('neutronclient.v2_0.client.Client')
self.qclient = self.qclient_p.start()
-
- self.log_p = mock.patch.object(agent, 'LOG')
- self.log = self.log_p.start()
-
- self.handler = agent.MetadataProxyHandler(self.fake_conf)
+ self.handler.use_rpc = False
def test_call(self):
req = mock.Mock()
self.assertEqual(
1, self.qclient.return_value.list_ports.call_count)
- def test_get_ports_for_remote_address(self):
- remote_address = 'remote_address'
- networks = ('network_id1', 'network_id3')
- fixed_ips = ["ip_address=%s" % remote_address]
- expected_ports = [{'network_id': 'network_id1', 'something': 42}]
- mock_list_ports = self.qclient.return_value.list_ports
- mock_list_ports.return_value = {'ports': [{'network_id': 'network_id1',
- 'something': 42},
- {'network_id': 'network_id2',
- 'something_else': 64}]}
- ports = self.handler._get_ports_for_remote_address(remote_address,
- networks)
- mock_list_ports.assert_called_once_with(fixed_ips=fixed_ips)
- self.assertEqual(expected_ports, ports)
-
def _get_ports_for_remote_address_cache_hit_helper(self):
remote_address = 'remote_address'
networks = ('net1', 'net2')
'something': 42}]}
self.handler._get_ports_for_remote_address(remote_address, networks)
mock_list_ports.assert_called_once_with(
- fixed_ips=fixed_ips)
+ network_id=networks, fixed_ips=fixed_ips)
self.assertEqual(1, mock_list_ports.call_count)
self.handler._get_ports_for_remote_address(remote_address,
networks)
expected.extend([
new_qclient_call,
mock.call().list_ports(
- fixed_ips=['ip_address=192.168.1.1']),
+ network_id=networks, fixed_ips=['ip_address=192.168.1.1']),
mock.call().get_auth_info()
])
),
mock.call().get_auth_info(),
cached_qclient_call,
- mock.call().list_ports(fixed_ips=['ip_address=192.168.1.10']),
+ mock.call().list_ports(network_id=('net1',),
+ fixed_ips=['ip_address=192.168.1.10']),
mock.call().get_auth_info(),
]