Commit 749902b2 authored by Stephane Bortzmeyer's avatar Stephane Bortzmeyer
Browse files

* [DoT] Validation of the host name in the certificate

* [DoH] Some bugs fixed (for instance with --POST)
parent 15dfc91e
......@@ -19,6 +19,7 @@ import urllib.parse
import time
import socket
import ctypes
import re
# Values that can be changed from the command line
dot = False # DoH by default
......@@ -31,6 +32,9 @@ tests = 1 # Number of repeated tests
ifile = None # Input file
delay = None
# Do not change these
re_host = re.compile(r'^([0-9a-z][0-9a-z-\.]*)|([0-9:]+)|([0-9\.])$')
def error(msg=None):
if msg is None:
msg = "Unknown error"
......@@ -42,9 +46,61 @@ def usage(msg=None):
print(msg,file=sys.stderr)
print("Usage: %s [-P] [-k] url domain-name [DNS type]" % sys.argv[0], file=sys.stderr)
def is_valid_hostname(name):
name = str(name.encode('idna').lower())
return re_host.search(name)
def is_valid_url(url):
try:
result = urllib.parse.urlparse(url) # A very poor validation, many
# errors (for instance whitespaces, IPv6 address litterals without
# brackets...) are ignored.
return (result.scheme=="https" and result.netloc != "")
except ValueError:
return False
def get_certificate_san(x509cert):
san = ""
ext_count = x509cert.get_extension_count()
for i in range(0, ext_count):
ext = x509cert.get_extension(i)
if "subjectAltName" in str(ext.get_short_name()):
san = str(ext)
return san
def validate_hostname(hostname, cert):
hostname = hostname.lower()
cn = cert.get_subject().commonName.lower()
if cn.startswith("*."): # Wildcard
(start, base) = cn.split("*.")
if hostname.endswith(base):
return True
else:
if hostname == cn:
return True
for alt_name in get_certificate_san(cert).split(", "):
if alt_name.startswith("DNS:"):
(start, base) = alt_name.split("DNS:")
base = base.lower()
if hostname == base:
return True
elif alt_name.startswith("IP Address:"):
(start, base) = alt_name.split("IP Address:")
base = base.lower()
if base.endswith("\n"):
base = base[:-1]
if hostname == base: # TODO better canonicaliztion of IP addresses with the netaddr module
return True
else:
pass # Ignore unknown alternative name types
return False
class Connection:
def __init__(self, server, dot=False, verbose=verbose, insecure=insecure, post=post, head=head):
# TODO test that server is a hostname is dot is True and an URL otherwise?
if dot and not is_valid_hostname(server):
error("DoT requires a host name, not \"%s\"" % server)
if not dot and not is_valid_url(url):
error("DoH requires a valid HTTPS URL, not \"%s\"" % server)
self.server = server
self.dot = dot
if not self.dot:
......@@ -53,17 +109,31 @@ class Connection:
self.verbose = verbose
self.insecure = insecure
if self.dot:
# TODO family-independant sockets
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addrinfo = socket.getaddrinfo(server, 853)
# May be loop over the results of getaddrinfo, to test all the IP addresses?
self.sock = socket.socket(addrinfo[0][0], socket.SOCK_STREAM)
# With typical DoT servers, we *must* use TLS 1.2 (otherwise,
# do_handshake fails with "OpenSSL.SSL.SysCallError: (-1, 'Unexpected
# EOF')" Typical HTTP servers are more lax.
self.context = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
# TODO set_tlsext_host_name(name) for SNI?
if self.insecure:
self.context.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda *x: True)
else:
self.context.set_default_verify_paths()
self.context.set_verify_depth(4) # Seems ignored
self.context.set_verify(OpenSSL.SSL.VERIFY_PEER | OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT | \
OpenSSL.SSL.VERIFY_CLIENT_ONCE,
lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
self.session = OpenSSL.SSL.Connection(self.context, self.sock)
self.session.connect((self.server, 853))
self.session.do_handshake()
# TODO is certificate checked by default? If yes, disable it if insecure
self.cert = self.session.get_peer_certificate()
if not insecure:
valid = validate_hostname(server, self.cert)
if not valid:
error("Certificate error: \"%s\" is not in the certificate" % server)
# TODO validate with SPKI?
else: # DoH
self.curl = pycurl.Curl()
self.url = server
......@@ -90,10 +160,10 @@ class Connection:
self.curl.close()
# Routine doing one actual test. Returns a tuple, first member is a
# result (boolean for DoT, HTTP status code for DoH), second member is
# a DNS message (or a string if there is an error).
# result (boolean indicating success for DoT, HTTP status code for
# DoH), second member is a DNS message (or a string if there is an
# error).
def do_test(connection, qname, qtype=rtype):
print("Test of \"%s\" to %s" % (qname, connection))
message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype))
if connection.dot:
# TODO Check what the Query ID is. Really random?
......@@ -108,6 +178,7 @@ def do_test(connection, qname, qtype=rtype):
else: # DoH
message.id = 0 # DoH requests that
if connection.post:
connection.curl.setopt(connection.curl.URL, connection.server)
data = message.to_wire()
connection.curl.setopt(pycurl.POSTFIELDS, data)
else:
......@@ -129,11 +200,13 @@ def do_test(connection, qname, qtype=rtype):
else:
response = "HEAD successful"
else:
ok = False
body = buffer.getvalue()
if len(body) == 0:
body = b"[No details]"
ok = False
buffer.close()
response = "[No details]"
else:
response = body
buffer.close()
return (rcode, response)
# Main program
......@@ -191,7 +264,7 @@ if ifile is None:
if len(args) == 3:
rtype = args[2]
ok = True
conn = Connection(url, dot=dot)
conn = Connection(url, dot=dot, verbose=verbose, insecure=insecure, post=post, head=head)
start = time.time()
if ifile is not None:
input = open(ifile)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment