import hmac
import httplib2
-from neutronclient.v2_0 import client
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
def __init__(self, conf):
self.conf = conf
- self.auth_info = {}
if self.conf.cache_url:
self._cache = cache.get_cache(self.conf.cache_url)
else:
self.plugin_rpc = MetadataPluginAPI(topics.PLUGIN)
self.context = context.get_admin_context_without_session()
- # Use RPC by default
- self.use_rpc = True
-
- def _get_neutron_client(self):
- params = {
- 'username': self.conf.admin_user,
- 'password': self.conf.admin_password,
- 'tenant_name': self.conf.admin_tenant_name,
- 'auth_url': self.conf.auth_url,
- 'auth_strategy': self.conf.auth_strategy,
- 'region_name': self.conf.auth_region,
- 'token': self.auth_info.get('auth_token'),
- 'insecure': self.conf.auth_insecure,
- 'ca_cert': self.conf.auth_ca_cert,
- }
- if self.conf.endpoint_url:
- params['endpoint_url'] = self.conf.endpoint_url
- else:
- params['endpoint_url'] = self.auth_info.get('endpoint_url')
- params['endpoint_type'] = self.conf.endpoint_type
-
- return client.Client(**params)
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
def _get_ports_from_server(self, router_id=None, ip_address=None,
networks=None):
- """Either get ports from server by RPC or fallback to neutron client"""
+ """Get ports from server."""
filters = self._get_port_filters(router_id, ip_address, networks)
- if self.use_rpc:
- try:
- return self.plugin_rpc.get_ports(self.context, filters)
- except (oslo_messaging.MessagingException, AttributeError):
- # TODO(obondarev): remove fallback once RPC is proven
- # to work fine with metadata agent (K or L release at most)
- LOG.warning(_LW('Server does not support metadata RPC, '
- 'fallback to using neutron client'))
- self.use_rpc = False
-
- return self._get_ports_using_client(filters)
+ return self.plugin_rpc.get_ports(self.context, filters)
def _get_port_filters(self, router_id=None, ip_address=None,
networks=None):
return self._get_ports_from_server(networks=networks,
ip_address=remote_address)
- def _get_ports_using_client(self, filters):
- # reformat filters for neutron client
- if 'device_id' in filters:
- filters['device_id'] = filters['device_id'][0]
- if 'fixed_ips' in filters:
- filters['fixed_ips'] = [
- 'ip_address=%s' % filters['fixed_ips']['ip_address'][0]]
-
- client = self._get_neutron_client()
- ports = client.list_ports(**filters)
- self.auth_info = client.get_auth_info()
- return ports['ports']
-
def _get_ports(self, remote_address, network_id=None, router_id=None):
"""Search for all ports that contain passed ip address and belongs to
given network.
from neutron.agent.metadata import agent
from neutron.agent.metadata import config
from neutron.agent import metadata_agent
-from neutron.common import constants
+from neutron.common import constants as n_const
from neutron.common import utils
from neutron.tests import base
-from neutronclient.v2_0 import client
class FakeConf(object):
- admin_user = 'neutron'
- admin_password = 'password'
- admin_tenant_name = 'tenant'
- auth_url = 'http://127.0.0.1'
- auth_strategy = 'keystone'
- auth_region = 'region'
- auth_insecure = False
auth_ca_cert = None
- endpoint_type = 'adminURL'
nova_metadata_ip = '9.9.9.9'
nova_metadata_port = 8775
metadata_proxy_shared_secret = 'secret'
nova_client_cert = 'nova_cert'
nova_client_priv_key = 'nova_priv_key'
cache_url = ''
- endpoint_url = None
class FakeConfCache(FakeConf):
cache_url = 'memory://?default_ttl=5'
-class FakeConfEndpoint(FakeConf):
- endpoint_url = 'http://127.0.0.0:8776'
-
-
-class TestNeutronClient(base.BaseTestCase):
- fake_conf = FakeConf
- expected_params = {
- 'username': 'neutron',
- 'region_name': 'region',
- 'ca_cert': None,
- 'tenant_name': 'tenant',
- 'insecure': False,
- 'token': None,
- 'endpoint_type': 'adminURL',
- 'auth_url': 'http://127.0.0.1',
- 'password': 'password',
- 'endpoint_url': None,
- 'auth_strategy': 'keystone',
- }
-
- def test_client_params(self):
- handler = agent.MetadataProxyHandler(self.fake_conf)
-
- with mock.patch.object(
- client.Client, "__init__", return_value=None) as mock_init:
- handler._get_neutron_client()
- mock_init.assert_called_once_with(**self.expected_params)
-
- def test_client_with_endpoint_url(self):
- fake_conf = FakeConfEndpoint
- handler = agent.MetadataProxyHandler(fake_conf)
-
- expected_params = self.expected_params.copy()
- del expected_params['endpoint_type']
- expected_params['endpoint_url'] = 'http://127.0.0.0:8776'
-
- with mock.patch.object(
- client.Client, "__init__", return_value=None) as mock_init:
- handler._get_neutron_client()
- mock_init.assert_called_once_with(**expected_params)
-
-
class TestMetadataProxyHandlerBase(base.BaseTestCase):
fake_conf = FakeConf
ip = '1.2.3.4'
networks = ('net_id1', 'net_id2')
expected = {'device_id': [router_id],
- 'device_owner': constants.ROUTER_INTERFACE_OWNERS,
+ 'device_owner': n_const.ROUTER_INTERFACE_OWNERS,
'network_id': networks,
'fixed_ips': {'ip_address': [ip]}}
actual = self.handler._get_port_filters(router_id, ip, networks)
ports = self.handler._get_ports_for_remote_address(ip, networks)
self.assertEqual(expected, ports)
- def test_get_ports_using_rpc_fallback_to_client(self):
- ip = '1.1.1.1'
- networks = ('network_id1', 'network_id2')
- self.handler.plugin_rpc.get_ports.side_effect = AttributeError
- with mock.patch('neutronclient.v2_0.client.Client') as neutron_client:
- mock_list_ports = neutron_client.return_value.list_ports
- expected_ports = {'ports': ['expected_port']}
- mock_list_ports.return_value = expected_ports
- ports = self.handler._get_ports_from_server(ip_address=ip,
- networks=networks)
- self.assertEqual(expected_ports['ports'], ports)
-
class TestMetadataProxyHandlerCache(TestMetadataProxyHandlerBase):
fake_conf = FakeConfCache
- def setUp(self):
- super(TestMetadataProxyHandlerCache, self).setUp()
- self.qclient_p = mock.patch('neutronclient.v2_0.client.Client')
- self.qclient = self.qclient_p.start()
- self.handler.use_rpc = False
-
def test_call(self):
req = mock.Mock()
with mock.patch.object(self.handler,
def test_get_router_networks(self):
router_id = 'router-id'
expected = ('network_id1', 'network_id2')
- ports = {'ports': [{'network_id': 'network_id1', 'something': 42},
- {'network_id': 'network_id2',
- 'something_else': 32}],
- 'not_used': [1, 2, 3]}
- mock_list_ports = self.qclient.return_value.list_ports
- mock_list_ports.return_value = ports
+ ports = [{'network_id': 'network_id1', 'something': 42},
+ {'network_id': 'network_id2', 'something_else': 32}]
+ mock_get_ports = self.handler.plugin_rpc.get_ports
+ mock_get_ports.return_value = ports
networks = self.handler._get_router_networks(router_id)
- mock_list_ports.assert_called_once_with(
- device_id=router_id,
- device_owner=constants.ROUTER_INTERFACE_OWNERS)
+ mock_get_ports.assert_called_once_with(
+ mock.ANY,
+ {'device_id': [router_id],
+ 'device_owner': n_const.ROUTER_INTERFACE_OWNERS})
self.assertEqual(expected, networks)
def _test_get_router_networks_twice_helper(self):
router_id = 'router-id'
- ports = {'ports': [{'network_id': 'network_id1', 'something': 42}],
- 'not_used': [1, 2, 3]}
+ ports = [{'network_id': 'network_id1', 'something': 42}]
expected_networks = ('network_id1',)
with mock.patch(
'oslo_utils.timeutils.utcnow_ts', return_value=0):
- mock_list_ports = self.qclient.return_value.list_ports
- mock_list_ports.return_value = ports
+ mock_get_ports = self.handler.plugin_rpc.get_ports
+ mock_get_ports.return_value = ports
networks = self.handler._get_router_networks(router_id)
- mock_list_ports.assert_called_once_with(
- device_id=router_id,
- device_owner=constants.ROUTER_INTERFACE_OWNERS)
+ mock_get_ports.assert_called_once_with(
+ mock.ANY,
+ {'device_id': [router_id],
+ 'device_owner': n_const.ROUTER_INTERFACE_OWNERS})
self.assertEqual(expected_networks, networks)
networks = self.handler._get_router_networks(router_id)
def test_get_router_networks_twice(self):
self._test_get_router_networks_twice_helper()
self.assertEqual(
- 1, self.qclient.return_value.list_ports.call_count)
+ 1, self.handler.plugin_rpc.get_ports.call_count)
def _get_ports_for_remote_address_cache_hit_helper(self):
remote_address = 'remote_address'
networks = ('net1', 'net2')
- fixed_ips = ["ip_address=%s" % remote_address]
- mock_list_ports = self.qclient.return_value.list_ports
- mock_list_ports.return_value = {'ports': [{'network_id': 'net1',
- 'something': 42}]}
+ mock_get_ports = self.handler.plugin_rpc.get_ports
+ mock_get_ports.return_value = [{'network_id': 'net1', 'something': 42}]
self.handler._get_ports_for_remote_address(remote_address, networks)
- mock_list_ports.assert_called_once_with(
- network_id=networks, fixed_ips=fixed_ips)
- self.assertEqual(1, mock_list_ports.call_count)
+ mock_get_ports.assert_called_once_with(
+ mock.ANY,
+ {'network_id': networks,
+ 'fixed_ips': {'ip_address': [remote_address]}}
+ )
+ self.assertEqual(1, mock_get_ports.call_count)
self.handler._get_ports_for_remote_address(remote_address,
networks)
def test_get_ports_for_remote_address_cache_hit(self):
self._get_ports_for_remote_address_cache_hit_helper()
self.assertEqual(
- 1, self.qclient.return_value.list_ports.call_count)
+ 1, self.handler.plugin_rpc.get_ports.call_count)
def test_get_ports_network_id(self):
network_id = 'network-id'
headers['X-Forwarded-For'] = remote_address
req = mock.Mock(headers=headers)
- def mock_list_ports(*args, **kwargs):
- return {'ports': list_ports_retval.pop(0)}
+ def mock_get_ports(*args, **kwargs):
+ return list_ports_retval.pop(0)
- self.qclient.return_value.list_ports.side_effect = mock_list_ports
- self.qclient.return_value.get_auth_info.return_value = {
- 'auth_token': None, 'endpoint_url': None}
+ self.handler.plugin_rpc.get_ports.side_effect = mock_get_ports
instance_id, tenant_id = self.handler._get_instance_and_tenant_id(req)
- new_qclient_call = mock.call(
- username=FakeConf.admin_user,
- tenant_name=FakeConf.admin_tenant_name,
- region_name=FakeConf.auth_region,
- auth_url=FakeConf.auth_url,
- password=FakeConf.admin_password,
- auth_strategy=FakeConf.auth_strategy,
- token=None,
- insecure=FakeConf.auth_insecure,
- ca_cert=FakeConf.auth_ca_cert,
- endpoint_url=None,
- endpoint_type=FakeConf.endpoint_type)
expected = []
if router_id:
- expected.extend([
- new_qclient_call,
- mock.call().list_ports(
- device_id=router_id,
- device_owner=constants.ROUTER_INTERFACE_OWNERS
- ),
- mock.call().get_auth_info()
- ])
-
- expected.extend([
- new_qclient_call,
- mock.call().list_ports(
- network_id=networks, fixed_ips=['ip_address=192.168.1.1']),
- mock.call().get_auth_info()
- ])
-
- self.qclient.assert_has_calls(expected)
+ expected.append(
+ mock.call(
+ mock.ANY,
+ {'device_id': [router_id],
+ 'device_owner': n_const.ROUTER_INTERFACE_OWNERS}
+ )
+ )
+
+ expected.append(
+ mock.call(
+ mock.ANY,
+ {'network_id': networks,
+ 'fixed_ips': {'ip_address': ['192.168.1.1']}}
+ )
+ )
+
+ self.handler.plugin_rpc.get_ports.assert_has_calls(expected)
return (instance_id, tenant_id)
(None, None)
)
- def test_auth_info_cache(self):
- router_id = 'the_id'
- list_ports = [
- [{'network_id': 'net1'}],
- [{'device_id': 'did', 'tenant_id': 'tid', 'network_id': 'net1'}]]
-
- def update_get_auth_info(*args, **kwargs):
- self.qclient.return_value.get_auth_info.return_value = {
- 'auth_token': 'token', 'endpoint_url': 'uri'}
- return {'ports': list_ports.pop(0)}
-
- self.qclient.return_value.list_ports.side_effect = update_get_auth_info
-
- new_qclient_call = mock.call(
- username=FakeConf.admin_user,
- tenant_name=FakeConf.admin_tenant_name,
- region_name=FakeConf.auth_region,
- auth_url=FakeConf.auth_url,
- password=FakeConf.admin_password,
- auth_strategy=FakeConf.auth_strategy,
- token=None,
- insecure=FakeConf.auth_insecure,
- ca_cert=FakeConf.auth_ca_cert,
- endpoint_url=None,
- endpoint_type=FakeConf.endpoint_type)
-
- cached_qclient_call = mock.call(
- username=FakeConf.admin_user,
- tenant_name=FakeConf.admin_tenant_name,
- region_name=FakeConf.auth_region,
- auth_url=FakeConf.auth_url,
- password=FakeConf.admin_password,
- auth_strategy=FakeConf.auth_strategy,
- token='token',
- insecure=FakeConf.auth_insecure,
- ca_cert=FakeConf.auth_ca_cert,
- endpoint_url='uri',
- endpoint_type=FakeConf.endpoint_type)
-
- headers = {'X-Forwarded-For': '192.168.1.10',
- 'X-Neutron-Router-ID': router_id}
- req = mock.Mock(headers=headers)
- self.handler._get_instance_and_tenant_id(req)
-
- expected = [
- new_qclient_call,
- mock.call().list_ports(
- device_id=router_id,
- device_owner=constants.ROUTER_INTERFACE_OWNERS
- ),
- mock.call().get_auth_info(),
- cached_qclient_call,
- mock.call().list_ports(network_id=('net1',),
- fixed_ips=['ip_address=192.168.1.10']),
- mock.call().get_auth_info(),
- ]
-
- self.qclient.assert_has_calls(expected)
-
def _proxy_request_test_helper(self, response_code=200, method='GET'):
hdrs = {'X-Forwarded-For': '8.8.8.8'}
body = 'body'
def test_get_router_networks_twice(self):
self._test_get_router_networks_twice_helper()
self.assertEqual(
- 2, self.qclient.return_value.list_ports.call_count)
+ 2, self.handler.plugin_rpc.get_ports.call_count)
def test_get_ports_for_remote_address_cache_hit(self):
self._get_ports_for_remote_address_cache_hit_helper()
self.assertEqual(
- 2, self.qclient.return_value.list_ports.call_count)
+ 2, self.handler.plugin_rpc.get_ports.call_count)
class TestUnixDomainMetadataProxy(base.BaseTestCase):