Commit 103fd2f1 authored by Gaël Berthaud-Müller's avatar Gaël Berthaud-Müller
Browse files

make authority resolution check parents

parent 0919a81f
Pipeline #164 passed with stage
in 1 minute and 8 seconds
......@@ -50,6 +50,7 @@ KEY_TYPES = {
def check_dns_sec(domain):
# NOTE: this is broken
try:
domain_authority = resolver.query(domain, 'SOA')
response = resolver.query(domain_authority, 'NS')
......@@ -79,34 +80,50 @@ def check_dns_sec(domain):
# raise Unauthorized('DNSSEC check failed for {}'.format(domain))
def get_identity_authority(domain):
# TODO: Resolution of authority should continue for parent domains according to spec
hostname = '_oidc.{}.'.format(domain)
def _get_identity_authority(hostname):
print('Resolving "{}"'.format(hostname))
try:
dns = resolver.query(hostname, 'TXT')
# enforce strict DNSSEC policy here
check_dns_sec(domain)
for txt in dns:
value = str(txt).replace('"', '')
print('Checking TXT record "{}"'.format(value))
if not value.startswith('v=OID1;'):
continue
for item in value.split(';'):
if item.startswith('iau=') or item.startswith('iss='):
return item[4:]
except Timeout:
print('Timeout. Failed to resolve "{}"'.format(hostname))
raise Unauthorized('Timeout. Failed to resolve "{}"'.format(hostname))
except NXDOMAIN or YXDOMAIN:
print('Failed to resolve "{}"'.format(hostname))
raise Unauthorized('Failed to resolve "{}"'.format(hostname))
except NoAnswer:
print('Failed to find TXT records for "{}"'.format(hostname))
raise Unauthorized('Failed to find TXT records for "{}"'.format(hostname))
except NoNameservers:
print('No nameservers avalaible to dig "{}"'.format(hostname))
raise Unauthorized('No nameservers avalaible to dig "{}"'.format(hostname))
dns = resolver.resolve(hostname, 'TXT')
# TODO: enforce strict DNSSEC policy here
#check_dns_sec(hostname)
for txt in dns:
value = str(txt).replace('"', '')
print('Checking TXT record "{}"'.format(value))
if not value.startswith('v=OID1;'):
continue
for item in value.split(';'):
if item.startswith('iau=') or item.startswith('iss='):
return item[4:]
def get_identity_authority(domain):
labels = list(dns.name.from_text(domain).labels)
labels.reverse()
parent = labels[0]
labels = labels[1::]
domains = []
for label in labels:
parent = b'%s.%s' % (label, parent)
domains.append(parent)
domains.reverse()
for domain in domains:
try:
hostname = '_oidc.{}'.format(domain.decode('utf8'))
if authority := _get_identity_authority(hostname):
return authority
except Timeout:
print('Timeout. Failed to resolve "{}"'.format(hostname))
except NXDOMAIN or YXDOMAIN:
print('Failed to resolve "{}"'.format(hostname))
except NoAnswer:
print('Failed to find TXT records for "{}"'.format(hostname))
raise Unauthorized('Failed to find TXT records for "{}"'.format(hostname))
except NoNameservers:
print('No nameservers avalaible to dig "{}"'.format(hostname))
raise Unauthorized('No nameservers avalaible to dig "{}"'.format(hostname))
print('No suitable TXT DNS entry found for {}'.format(domain))
raise Unauthorized('No suitable TXT DNS entry found for {}'.format(domain))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment