+# coding: utf-8
+"""Tests for the eventlet.support.greendns module"""
+
+import os
+import socket
+import tempfile
+import time
+
+import tests
+from tests import mock
+try:
+ import dns.rdatatype
+ import dns.rdtypes.IN.A
+ import dns.rdtypes.IN.AAAA
+ import dns.resolver
+ import dns.reversename
+ import dns.rrset
+ from eventlet.support import greendns
+ greendns_available = True
+except ImportError:
+ greendns_available = False
+ greendns = mock.Mock()
+
+
+def greendns_requirement(_f):
+ """We want to skip tests if greendns is not installed.
+ """
+ return greendns_available
+
+
+class TestHostsResolver(tests.LimitedTestCase):
+
+ def _make_host_resolver(self):
+ """Returns a HostResolver instance
+
+ The hosts file will be empty but accessible as a py.path.local
+ instance using the ``hosts`` attribute.
+ """
+ hosts = tempfile.NamedTemporaryFile()
+ hr = greendns.HostsResolver(fname=hosts.name)
+ hr.hosts = hosts
+ hr._last_stat = 0
+ return hr
+
+ @tests.skip_unless(greendns_requirement)
+ def test_default_fname(self):
+ hr = greendns.HostsResolver()
+ assert os.path.exists(hr.fname)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_readlines_lines(self):
+ hr = self._make_host_resolver()
+ hr.hosts.write(b'line0\n')
+ hr.hosts.flush()
+ assert hr._readlines() == ['line0']
+ hr._last_stat = 0
+ hr.hosts.write(b'line1\n')
+ hr.hosts.flush()
+ assert hr._readlines() == ['line0', 'line1']
+ hr._last_stat = 0
+ hr.hosts.write(b'#comment0\nline0\n #comment1\nline1')
+ assert hr._readlines() == ['line0', 'line1']
+
+ @tests.skip_unless(greendns_requirement)
+ def test_readlines_missing_file(self):
+ hr = self._make_host_resolver()
+ hr.hosts.close()
+ hr._last_stat = 0
+ assert hr._readlines() == []
+
+ @tests.skip_unless(greendns_requirement)
+ def test_load_no_contents(self):
+ hr = self._make_host_resolver()
+ hr._load()
+ assert not hr._v4
+ assert not hr._v6
+ assert not hr._aliases
+
+ @tests.skip_unless(greendns_requirement)
+ def test_load_v4_v6_cname_aliases(self):
+ hr = self._make_host_resolver()
+ hr.hosts.write(b'1.2.3.4 v4.example.com v4\n'
+ b'dead:beef::1 v6.example.com v6\n')
+ hr.hosts.flush()
+ hr._load()
+ assert hr._v4 == {'v4.example.com': '1.2.3.4', 'v4': '1.2.3.4'}
+ assert hr._v6 == {'v6.example.com': 'dead:beef::1',
+ 'v6': 'dead:beef::1'}
+ assert hr._aliases == {'v4': 'v4.example.com',
+ 'v6': 'v6.example.com'}
+
+ @tests.skip_unless(greendns_requirement)
+ def test_load_v6_link_local(self):
+ hr = self._make_host_resolver()
+ hr.hosts.write(b'fe80:: foo\n'
+ b'fe80:dead:beef::1 bar\n')
+ hr.hosts.flush()
+ hr._load()
+ assert not hr._v4
+ assert not hr._v6
+
+ @tests.skip_unless(greendns_requirement)
+ def test_query_A(self):
+ hr = self._make_host_resolver()
+ hr._v4 = {'v4.example.com': '1.2.3.4'}
+ ans = hr.query('v4.example.com')
+ assert ans[0].address == '1.2.3.4'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_query_ans_types(self):
+ # This assumes test_query_A above succeeds
+ hr = self._make_host_resolver()
+ hr._v4 = {'v4.example.com': '1.2.3.4'}
+ hr._last_stat = time.time()
+ ans = hr.query('v4.example.com')
+ assert isinstance(ans, greendns.dns.resolver.Answer)
+ assert ans.response is None
+ assert ans.qname == dns.name.from_text('v4.example.com')
+ assert ans.rdtype == dns.rdatatype.A
+ assert ans.rdclass == dns.rdataclass.IN
+ assert ans.canonical_name == dns.name.from_text('v4.example.com')
+ assert ans.expiration
+ assert isinstance(ans.rrset, dns.rrset.RRset)
+ assert ans.rrset.rdtype == dns.rdatatype.A
+ assert ans.rrset.rdclass == dns.rdataclass.IN
+ ttl = greendns.HOSTS_TTL
+ assert ttl - 1 <= ans.rrset.ttl <= ttl + 1
+ rr = ans.rrset[0]
+ assert isinstance(rr, greendns.dns.rdtypes.IN.A.A)
+ assert rr.rdtype == dns.rdatatype.A
+ assert rr.rdclass == dns.rdataclass.IN
+ assert rr.address == '1.2.3.4'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_query_AAAA(self):
+ hr = self._make_host_resolver()
+ hr._v6 = {'v6.example.com': 'dead:beef::1'}
+ ans = hr.query('v6.example.com', dns.rdatatype.AAAA)
+ assert ans[0].address == 'dead:beef::1'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_query_unknown_raises(self):
+ hr = self._make_host_resolver()
+ with tests.assert_raises(greendns.dns.resolver.NoAnswer):
+ hr.query('example.com')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_query_unknown_no_raise(self):
+ hr = self._make_host_resolver()
+ ans = hr.query('example.com', raise_on_no_answer=False)
+ assert isinstance(ans, greendns.dns.resolver.Answer)
+ assert ans.response is None
+ assert ans.qname == dns.name.from_text('example.com')
+ assert ans.rdtype == dns.rdatatype.A
+ assert ans.rdclass == dns.rdataclass.IN
+ assert ans.canonical_name == dns.name.from_text('example.com')
+ assert ans.expiration
+ assert isinstance(ans.rrset, greendns.dns.rrset.RRset)
+ assert ans.rrset.rdtype == dns.rdatatype.A
+ assert ans.rrset.rdclass == dns.rdataclass.IN
+ assert len(ans.rrset) == 0
+
+ @tests.skip_unless(greendns_requirement)
+ def test_query_CNAME(self):
+ hr = self._make_host_resolver()
+ hr._aliases = {'host': 'host.example.com'}
+ ans = hr.query('host', dns.rdatatype.CNAME)
+ assert ans[0].target == dns.name.from_text('host.example.com')
+ assert str(ans[0].target) == 'host.example.com.'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_query_unknown_type(self):
+ hr = self._make_host_resolver()
+ with tests.assert_raises(greendns.dns.resolver.NoAnswer):
+ hr.query('example.com', dns.rdatatype.MX)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaliases(self):
+ hr = self._make_host_resolver()
+ hr._aliases = {'host': 'host.example.com',
+ 'localhost': 'host.example.com'}
+ res = set(hr.getaliases('host'))
+ assert res == set(['host.example.com', 'localhost'])
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaliases_unknown(self):
+ hr = self._make_host_resolver()
+ assert hr.getaliases('host.example.com') == []
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaliases_fqdn(self):
+ hr = self._make_host_resolver()
+ hr._aliases = {'host': 'host.example.com'}
+ res = set(hr.getaliases('host.example.com'))
+ assert res == set(['host'])
+
+
+def _make_mock_base_resolver():
+ """A mocked base resolver class"""
+ class RR(object):
+ pass
+
+ class Resolver(object):
+ aliases = ['cname.example.com']
+ raises = None
+ rr = RR()
+
+ def query(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+ if self.raises:
+ raise self.raises()
+ if hasattr(self, 'rrset'):
+ rrset = self.rrset
+ else:
+ rrset = [self.rr]
+ return greendns.HostsAnswer('foo', 1, 1, rrset, False)
+
+ def getaliases(self, *args, **kwargs):
+ return self.aliases
+
+ return Resolver
+
+
+class TestProxyResolver(tests.LimitedTestCase):
+
+ @tests.skip_unless(greendns_requirement)
+ def test_clear(self):
+ rp = greendns.ResolverProxy()
+ resolver = rp._resolver
+ rp.clear()
+ assert rp._resolver != resolver
+
+ @tests.skip_unless(greendns_requirement)
+ def _make_mock_hostsresolver(self):
+ """A mocked HostsResolver"""
+ base_resolver = _make_mock_base_resolver()
+ base_resolver.rr.address = '1.2.3.4'
+ return base_resolver()
+
+ @tests.skip_unless(greendns_requirement)
+ def _make_mock_resolver(self):
+ """A mocked Resolver"""
+ base_resolver = _make_mock_base_resolver()
+ base_resolver.rr.address = '5.6.7.8'
+ return base_resolver()
+
+ @tests.skip_unless(greendns_requirement)
+ def test_hosts(self):
+ hostsres = self._make_mock_hostsresolver()
+ rp = greendns.ResolverProxy(hostsres)
+ ans = rp.query('host.example.com')
+ assert ans[0].address == '1.2.3.4'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_hosts_noanswer(self):
+ hostsres = self._make_mock_hostsresolver()
+ res = self._make_mock_resolver()
+ rp = greendns.ResolverProxy(hostsres)
+ rp._resolver = res
+ hostsres.raises = greendns.dns.resolver.NoAnswer
+ ans = rp.query('host.example.com')
+ assert ans[0].address == '5.6.7.8'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_resolver(self):
+ res = self._make_mock_resolver()
+ rp = greendns.ResolverProxy()
+ rp._resolver = res
+ ans = rp.query('host.example.com')
+ assert ans[0].address == '5.6.7.8'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_noanswer(self):
+ res = self._make_mock_resolver()
+ rp = greendns.ResolverProxy()
+ rp._resolver = res
+ res.raises = greendns.dns.resolver.NoAnswer
+ with tests.assert_raises(greendns.dns.resolver.NoAnswer):
+ rp.query('host.example.com')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_nxdomain(self):
+ res = self._make_mock_resolver()
+ rp = greendns.ResolverProxy()
+ rp._resolver = res
+ res.raises = greendns.dns.resolver.NXDOMAIN
+ with tests.assert_raises(greendns.dns.resolver.NXDOMAIN):
+ rp.query('host.example.com')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_noanswer_hosts(self):
+ hostsres = self._make_mock_hostsresolver()
+ res = self._make_mock_resolver()
+ rp = greendns.ResolverProxy(hostsres)
+ rp._resolver = res
+ hostsres.raises = greendns.dns.resolver.NoAnswer
+ res.raises = greendns.dns.resolver.NoAnswer
+ with tests.assert_raises(greendns.dns.resolver.NoAnswer):
+ rp.query('host.example.com')
+
+ def _make_mock_resolver_aliases(self):
+
+ class RR(object):
+ target = 'host.example.com'
+
+ class Resolver(object):
+ call_count = 0
+ exc_type = greendns.dns.resolver.NoAnswer
+
+ def query(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+ self.call_count += 1
+ if self.call_count < 2:
+ return greendns.HostsAnswer(args[0], 1, 5, [RR()], False)
+ else:
+ raise self.exc_type()
+
+ return Resolver()
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaliases(self):
+ aliases_res = self._make_mock_resolver_aliases()
+ rp = greendns.ResolverProxy()
+ rp._resolver = aliases_res
+ aliases = set(rp.getaliases('alias.example.com'))
+ assert aliases == set(['host.example.com'])
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaliases_fqdn(self):
+ aliases_res = self._make_mock_resolver_aliases()
+ rp = greendns.ResolverProxy()
+ rp._resolver = aliases_res
+ rp._resolver.call_count = 1
+ assert rp.getaliases('host.example.com') == []
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaliases_nxdomain(self):
+ aliases_res = self._make_mock_resolver_aliases()
+ rp = greendns.ResolverProxy()
+ rp._resolver = aliases_res
+ rp._resolver.call_count = 1
+ rp._resolver.exc_type = greendns.dns.resolver.NXDOMAIN
+ assert rp.getaliases('host.example.com') == []
+
+
+class TestResolve(tests.LimitedTestCase):
+
+ def setUp(self):
+ base_resolver = _make_mock_base_resolver()
+ base_resolver.rr.address = '1.2.3.4'
+ self._old_resolver = greendns.resolver
+ greendns.resolver = base_resolver()
+
+ def tearDown(self):
+ greendns.resolver = self._old_resolver
+
+ @tests.skip_unless(greendns_requirement)
+ def test_A(self):
+ ans = greendns.resolve('host.example.com', socket.AF_INET)
+ assert ans[0].address == '1.2.3.4'
+ assert greendns.resolver.args == ('host.example.com', dns.rdatatype.A)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_AAAA(self):
+ greendns.resolver.rr.address = 'dead:beef::1'
+ ans = greendns.resolve('host.example.com', socket.AF_INET6)
+ assert ans[0].address == 'dead:beef::1'
+ assert greendns.resolver.args == ('host.example.com', dns.rdatatype.AAAA)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_unknown_rdtype(self):
+ with tests.assert_raises(socket.gaierror):
+ greendns.resolve('host.example.com', socket.AF_INET6 + 1)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_timeout(self):
+ greendns.resolver.raises = greendns.dns.exception.Timeout
+ with tests.assert_raises(socket.gaierror):
+ greendns.resolve('host.example.com')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_exc(self):
+ greendns.resolver.raises = greendns.dns.exception.DNSException
+ with tests.assert_raises(socket.gaierror):
+ greendns.resolve('host.example.com')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_noraise_noanswer(self):
+ greendns.resolver.rrset = None
+ ans = greendns.resolve('example.com', raises=False)
+ assert not ans.rrset
+
+ @tests.skip_unless(greendns_requirement)
+ def test_noraise_nxdomain(self):
+ greendns.resolver.raises = greendns.dns.resolver.NXDOMAIN
+ ans = greendns.resolve('example.com', raises=False)
+ assert not ans.rrset
+
+
+class TestResolveCname(tests.LimitedTestCase):
+
+ def setUp(self):
+ base_resolver = _make_mock_base_resolver()
+ base_resolver.rr.target = 'cname.example.com'
+ self._old_resolver = greendns.resolver
+ greendns.resolver = base_resolver()
+
+ def tearDown(self):
+ greendns.resolver = self._old_resolver
+
+ @tests.skip_unless(greendns_requirement)
+ def test_success(self):
+ cname = greendns.resolve_cname('alias.example.com')
+ assert cname == 'cname.example.com'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_timeout(self):
+ greendns.resolver.raises = greendns.dns.exception.Timeout
+ with tests.assert_raises(socket.gaierror):
+ greendns.resolve_cname('alias.example.com')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_nodata(self):
+ greendns.resolver.raises = greendns.dns.exception.DNSException
+ with tests.assert_raises(socket.gaierror):
+ greendns.resolve_cname('alias.example.com')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_no_answer(self):
+ greendns.resolver.raises = greendns.dns.resolver.NoAnswer
+ assert greendns.resolve_cname('host.example.com') == 'host.example.com'
+
+
+def _make_mock_resolve():
+ """A stubbed out resolve function
+
+ This monkeypatches the greendns.resolve() function with a mock.
+ You must give it answers by calling .add().
+ """
+
+ class MockAnswer(list):
+ pass
+
+ class MockResolve(object):
+
+ def __init__(self):
+ self.answers = {}
+
+ def __call__(self, name, family=socket.AF_INET, raises=True):
+ qname = dns.name.from_text(name)
+ try:
+ rrset = self.answers[name][family]
+ except KeyError:
+ if raises:
+ raise greendns.dns.resolver.NoAnswer()
+ rrset = dns.rrset.RRset(qname, 1, 1)
+ ans = MockAnswer()
+ ans.qname = qname
+ ans.rrset = rrset
+ ans.extend(rrset.items)
+ return ans
+
+ def add(self, name, addr):
+ """Add an address to a name and family"""
+ try:
+ rdata = dns.rdtypes.IN.A.A(dns.rdataclass.IN,
+ dns.rdatatype.A, addr)
+ family = socket.AF_INET
+ except (socket.error, dns.exception.SyntaxError):
+ rdata = dns.rdtypes.IN.AAAA.AAAA(dns.rdataclass.IN,
+ dns.rdatatype.AAAA, addr)
+ family = socket.AF_INET6
+ family_dict = self.answers.setdefault(name, {})
+ rrset = family_dict.get(family)
+ if not rrset:
+ family_dict[family] = rrset = dns.rrset.RRset(
+ dns.name.from_text(name), rdata.rdclass, rdata.rdtype)
+ rrset.add(rdata)
+
+ resolve = MockResolve()
+ return resolve
+
+
+class TestGetaddrinfo(tests.LimitedTestCase):
+
+ def _make_mock_resolve_cname(self):
+ """A stubbed out cname function"""
+
+ class ResolveCname(object):
+ qname = None
+ cname = 'cname.example.com'
+
+ def __call__(self, host):
+ self.qname = host
+ return self.cname
+
+ resolve_cname = ResolveCname()
+ return resolve_cname
+
+ def setUp(self):
+ self._old_resolve = greendns.resolve
+ self._old_resolve_cname = greendns.resolve_cname
+ self._old_orig_getaddrinfo = greendns.socket.getaddrinfo
+
+ def tearDown(self):
+ greendns.resolve = self._old_resolve
+ greendns.resolve_cname = self._old_resolve_cname
+ greendns.socket.getaddrinfo = self._old_orig_getaddrinfo
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaddrinfo(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '127.0.0.2')
+ greendns.resolve.add('example.com', '::1')
+ res = greendns.getaddrinfo('example.com', 'ssh')
+ addr = ('127.0.0.2', 22)
+ tcp = (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, addr)
+ udp = (socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP, addr)
+ addr = ('::1', 22, 0, 0)
+ tcp6 = (socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, addr)
+ udp6 = (socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP, addr)
+ filt_res = [ai[:3] + (ai[4],) for ai in res]
+ assert tcp in filt_res
+ assert udp in filt_res
+ assert tcp6 in filt_res
+ assert udp6 in filt_res
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaddrinfo_idn(self):
+ greendns.resolve = _make_mock_resolve()
+ idn_name = u'евентлет.com'
+ greendns.resolve.add(idn_name.encode('idna').decode('ascii'), '127.0.0.2')
+ res = greendns.getaddrinfo(idn_name, 'ssh')
+ addr = ('127.0.0.2', 22)
+ tcp = (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, addr)
+ udp = (socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP, addr)
+ filt_res = [ai[:3] + (ai[4],) for ai in res]
+ assert tcp in filt_res
+ assert udp in filt_res
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaddrinfo_inet(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '127.0.0.2')
+ res = greendns.getaddrinfo('example.com', 'ssh', socket.AF_INET)
+ addr = ('127.0.0.2', 22)
+ tcp = (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, addr)
+ udp = (socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP, addr)
+ assert tcp in [ai[:3] + (ai[4],) for ai in res]
+ assert udp in [ai[:3] + (ai[4],) for ai in res]
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaddrinfo_inet6(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '::1')
+ res = greendns.getaddrinfo('example.com', 'ssh', socket.AF_INET6)
+ addr = ('::1', 22, 0, 0)
+ tcp = (socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, addr)
+ udp = (socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP, addr)
+ assert tcp in [ai[:3] + (ai[4],) for ai in res]
+ assert udp in [ai[:3] + (ai[4],) for ai in res]
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaddrinfo_only_a_ans(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '1.2.3.4')
+ res = greendns.getaddrinfo('example.com', 0)
+ addr = [('1.2.3.4', 0)] * len(res)
+ assert addr == [ai[-1] for ai in res]
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaddrinfo_only_aaaa_ans(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', 'dead:beef::1')
+ res = greendns.getaddrinfo('example.com', 0)
+ addr = [('dead:beef::1', 0, 0, 0)] * len(res)
+ assert addr == [ai[-1] for ai in res]
+
+ @tests.skip_unless(greendns_requirement)
+ def test_canonname(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('host.example.com', '1.2.3.4')
+ greendns.resolve_cname = self._make_mock_resolve_cname()
+ res = greendns.getaddrinfo('host.example.com', 0,
+ 0, 0, 0, socket.AI_CANONNAME)
+ assert res[0][3] == 'cname.example.com'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_host_none(self):
+ res = greendns.getaddrinfo(None, 80)
+ for addr in set(ai[-1] for ai in res):
+ assert addr in [('127.0.0.1', 80), ('::1', 80, 0, 0)]
+
+ @tests.skip_unless(greendns_requirement)
+ def test_host_none_passive(self):
+ res = greendns.getaddrinfo(None, 80, 0, 0, 0, socket.AI_PASSIVE)
+ for addr in set(ai[-1] for ai in res):
+ assert addr in [('0.0.0.0', 80), ('::', 80, 0, 0)]
+
+ @tests.skip_unless(greendns_requirement)
+ def test_v4mapped(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '1.2.3.4')
+ res = greendns.getaddrinfo('example.com', 80,
+ socket.AF_INET6, 0, 0, socket.AI_V4MAPPED)
+ addrs = set(ai[-1] for ai in res)
+ assert addrs == set([('::ffff:1.2.3.4', 80, 0, 0)])
+
+ @tests.skip_unless(greendns_requirement)
+ def test_v4mapped_all(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '1.2.3.4')
+ greendns.resolve.add('example.com', 'dead:beef::1')
+ res = greendns.getaddrinfo('example.com', 80, socket.AF_INET6, 0, 0,
+ socket.AI_V4MAPPED | socket.AI_ALL)
+ addrs = set(ai[-1] for ai in res)
+ for addr in addrs:
+ assert addr in [('::ffff:1.2.3.4', 80, 0, 0),
+ ('dead:beef::1', 80, 0, 0)]
+
+ @tests.skip_unless(greendns_requirement)
+ def test_numericserv(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '1.2.3.4')
+ with tests.assert_raises(socket.gaierror):
+ greendns.getaddrinfo('example.com', 'www', 0, 0, 0, socket.AI_NUMERICSERV)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_numerichost(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '1.2.3.4')
+ with tests.assert_raises(socket.gaierror):
+ greendns.getaddrinfo('example.com', 80, 0, 0, 0, socket.AI_NUMERICHOST)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_noport(self):
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('example.com', '1.2.3.4')
+ ai = greendns.getaddrinfo('example.com', None)
+ assert ai[0][-1][1] == 0
+
+ @tests.skip_unless(greendns_requirement)
+ def test_AI_ADDRCONFIG(self):
+ # When the users sets AI_ADDRCONFIG but only has an IPv4
+ # address configured we will iterate over the results, but the
+ # call for the IPv6 address will fail rather then return an
+ # empty list. In that case we should catch the exception and
+ # only return the ones which worked.
+ def getaddrinfo(addr, port, family, socktype, proto, aiflags):
+ if addr == '127.0.0.1':
+ return [(socket.AF_INET, 1, 0, '', ('127.0.0.1', 0))]
+ elif addr == '::1' and aiflags & socket.AI_ADDRCONFIG:
+ raise socket.error(socket.EAI_ADDRFAMILY,
+ 'Address family for hostname not supported')
+ elif addr == '::1' and not aiflags & socket.AI_ADDRCONFIG:
+ return [(socket.AF_INET6, 1, 0, '', ('::1', 0, 0, 0))]
+ greendns.socket.getaddrinfo = getaddrinfo
+ greendns.resolve = _make_mock_resolve()
+ greendns.resolve.add('localhost', '127.0.0.1')
+ greendns.resolve.add('localhost', '::1')
+ res = greendns.getaddrinfo('localhost', None,
+ 0, 0, 0, socket.AI_ADDRCONFIG)
+ assert res == [(socket.AF_INET, 1, 0, '', ('127.0.0.1', 0))]
+
+ @tests.skip_unless(greendns_requirement)
+ def test_AI_ADDRCONFIG_noaddr(self):
+ # If AI_ADDRCONFIG is used but there is no address we need to
+ # get an exception, not an empty list.
+ def getaddrinfo(addr, port, family, socktype, proto, aiflags):
+ raise socket.error(socket.EAI_ADDRFAMILY,
+ 'Address family for hostname not supported')
+ greendns.socket.getaddrinfo = getaddrinfo
+ greendns.resolve = _make_mock_resolve()
+ try:
+ greendns.getaddrinfo('::1', None, 0, 0, 0, socket.AI_ADDRCONFIG)
+ except socket.error as e:
+ assert e.errno == socket.EAI_ADDRFAMILY
+
+
+class TestIsIpAddr(tests.LimitedTestCase):
+
+ @tests.skip_unless(greendns_requirement)
+ def test_isv4(self):
+ assert greendns.is_ipv4_addr('1.2.3.4')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_isv4_false(self):
+ assert not greendns.is_ipv4_addr('260.0.0.0')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_isv6(self):
+ assert greendns.is_ipv6_addr('dead:beef::1')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_isv6_invalid(self):
+ assert not greendns.is_ipv6_addr('foobar::1')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_v4(self):
+ assert greendns.is_ip_addr('1.2.3.4')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_v4_illegal(self):
+ assert not greendns.is_ip_addr('300.0.0.1')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_v6_addr(self):
+ assert greendns.is_ip_addr('::1')
+
+ @tests.skip_unless(greendns_requirement)
+ def test_isv4_none(self):
+ assert not greendns.is_ipv4_addr(None)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_isv6_none(self):
+ assert not greendns.is_ipv6_addr(None)
+
+ @tests.skip_unless(greendns_requirement)
+ def test_none(self):
+ assert not greendns.is_ip_addr(None)
+
+
+class TestGethostbyname(tests.LimitedTestCase):
+
+ def setUp(self):
+ self._old_resolve = greendns.resolve
+ greendns.resolve = _make_mock_resolve()
+
+ def tearDown(self):
+ greendns.resolve = self._old_resolve
+
+ @tests.skip_unless(greendns_requirement)
+ def test_ipaddr(self):
+ assert greendns.gethostbyname('1.2.3.4') == '1.2.3.4'
+
+ @tests.skip_unless(greendns_requirement)
+ def test_name(self):
+ greendns.resolve.add('host.example.com', '1.2.3.4')
+ assert greendns.gethostbyname('host.example.com') == '1.2.3.4'
+
+
+class TestGetaliases(tests.LimitedTestCase):
+
+ def _make_mock_resolver(self):
+ base_resolver = _make_mock_base_resolver()
+ resolver = base_resolver()
+ resolver.aliases = ['cname.example.com']
+ return resolver
+
+ def setUp(self):
+ self._old_resolver = greendns.resolver
+ greendns.resolver = self._make_mock_resolver()
+
+ def tearDown(self):
+ greendns.resolver = self._old_resolver
+
+ @tests.skip_unless(greendns_requirement)
+ def test_getaliases(self):
+ assert greendns.getaliases('host.example.com') == ['cname.example.com']
+
+
+class TestGethostbyname_ex(tests.LimitedTestCase):
+
+ def _make_mock_getaliases(self):
+
+ class GetAliases(object):
+ aliases = ['cname.example.com']
+
+ def __call__(self, *args, **kwargs):
+ return self.aliases
+
+ getaliases = GetAliases()
+ return getaliases
+
+ def setUp(self):
+ self._old_resolve = greendns.resolve
+ greendns.resolve = _make_mock_resolve()
+ self._old_getaliases = greendns.getaliases
+
+ def tearDown(self):
+ greendns.resolve = self._old_resolve
+ greendns.getaliases = self._old_getaliases
+
+ @tests.skip_unless(greendns_requirement)
+ def test_ipaddr(self):
+ res = greendns.gethostbyname_ex('1.2.3.4')
+ assert res == ('1.2.3.4', [], ['1.2.3.4'])
+
+ @tests.skip_unless(greendns_requirement)
+ def test_name(self):
+ greendns.resolve.add('host.example.com', '1.2.3.4')
+ greendns.getaliases = self._make_mock_getaliases()
+ greendns.getaliases.aliases = []
+ res = greendns.gethostbyname_ex('host.example.com')
+ assert res == ('host.example.com', [], ['1.2.3.4'])
+
+ @tests.skip_unless(greendns_requirement)
+ def test_multiple_addrs(self):
+ greendns.resolve.add('host.example.com', '1.2.3.4')
+ greendns.resolve.add('host.example.com', '1.2.3.5')
+ greendns.getaliases = self._make_mock_getaliases()
+ greendns.getaliases.aliases = []
+ res = greendns.gethostbyname_ex('host.example.com')
+ assert res == ('host.example.com', [], ['1.2.3.4', '1.2.3.5'])
+
+
+def test_reverse_name():
+ tests.run_isolated('greendns_from_address_203.py')