Commit d2162789 authored by Stephane Bortzmeyer's avatar Stephane Bortzmeyer
Browse files

* Add DoT support. Closes #10

* Big refactoring for #9
parent be608fa4
#!/usr/bin/env python3
# http://pycurl.io/docs/latest
import pycurl
# http://www.dnspython.org/
import dns.message
# Octobre 2019: the Python GnuTLS bindongs don't work with Python 3. So we use OpenSSL.
# https://www.pyopenssl.org/
# https://pyopenssl.readthedocs.io/
import OpenSSL
import io
import sys
import base64
import getopt
import urllib.parse
import time
import socket
import ctypes
# Values that can be changed from the command line
dot = False # DoH by default
post = False
verbose = False
insecure = False
head = False
rtype = 'AAAA'
tests = 1 # Number of repeated tests
ifile = None # Input file
delay = None
def error(msg=None):
if msg is None:
msg = "Unknown error"
print(msg,file=sys.stderr)
sys.exit(1)
def usage(msg=None):
if msg:
print(msg,file=sys.stderr)
print("Usage: %s [-P] [-k] url domain-name [DNS type]" % sys.argv[0], file=sys.stderr)
class Connection:
def __init__(self, server, dot=False, verbose=verbose, insecure=insecure, post=post, head=head):
# TODO test that server is a hostname is dot is True and an URL otherwise?
self.server = server
self.dot = dot
if not self.dot:
self.post = post
self.head = head
self.verbose = verbose
self.insecure = insecure
if self.dot:
# TODO family-independant sockets
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# With typical DoT servers, we *must* use TLS 1.2 (otherwise,
# do_handshake fails with "OpenSSL.SSL.SysCallError: (-1, 'Unexpected
# EOF')" Typical HTTP servers are more lax.
self.context = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
self.session = OpenSSL.SSL.Connection(self.context, self.sock)
self.session.connect((self.server, 853))
self.session.do_handshake()
self.cert = self.session.get_peer_certificate()
else: # DoH
self.curl = pycurl.Curl()
self.url = server
# Does not work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
self.curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
if self.verbose:
self.curl.setopt(self.curl.VERBOSE, True)
if self.insecure:
self.curl.setopt(pycurl.SSL_VERIFYPEER, False)
self.curl.setopt(pycurl.SSL_VERIFYHOST, False)
def __str__(self):
return self.server
def end(self):
if self.dot:
self.session.shutdown()
self.session.close()
else: # DoH
pass # TODO
# Routine doing one actual test. Returns a tuple, first member is a
# result (boolean for DoT, HTTP status code for DoH), second member is
# a DNS message (or a string if there is an error).
def do_test(connection, qname, qtype=rtype):
print("Test of \"%s\" to %s" % (qname, connection))
message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype))
if connection.dot:
# TODO Check what the Query ID is. Really random?
messagew = message.to_wire()
length = len(messagew)
n = connection.session.send(length.to_bytes(2, byteorder='big') + messagew)
buf = connection.session.recv(2)
received = int.from_bytes(buf, byteorder='big')
buf = connection.session.recv(received)
response = dns.message.from_wire(buf) # TODO check the Query ID
return (True, response)
else: # DoH
message.id = 0 # DoH requests that
if connection.post:
data = message.to_wire()
connection.curl.setopt(pycurl.POSTFIELDS, data)
else:
dns_req = base64.urlsafe_b64encode(message.to_wire()).decode('UTF8').rstrip('=')
connection.curl.setopt(connection.curl.URL, connection.server + ("?dns=%s" % dns_req))
buffer = io.BytesIO()
connection.curl.setopt(connection.curl.WRITEDATA, buffer)
connection.curl.perform()
rcode = connection.curl.getinfo(pycurl.RESPONSE_CODE)
ok = True
if rcode == 200:
if not head:
body = buffer.getvalue()
try:
response = dns.message.from_wire(body)
except dns.message.TrailingJunk: # Not DNS.
response = "ERROR Not proper DNS data \"%s\"" % body
ok = False
else:
response = "HEAD successful"
else:
body = buffer.getvalue()
if len(body) == 0:
body = b"[No details]"
ok = False
buffer.close()
return (rcode, response)
c = Connection("https://doh.bortzmeyer.fr/")
(code, msg) = do_test(c, "www.afnic.fr")
print("%s: %s" % (code, msg))
c.end()
c = Connection("dot.bortzmeyer.fr", dot=True)
(ok, msg) = do_test(c, "www.afnic.fr")
print("%s: %s" % (ok, msg))
c.end()
sys.exit(0)
# Main program
name = None
message = None
try:
optlist, args = getopt.getopt (sys.argv[1:], "hvPker:f:d:",
["help", "verbose", "head", "insecure", "POST", "repeat=", "file=", "delay="])
for option, value in optlist:
if option == "--help" or option == "-h":
usage()
sys.exit(0)
elif option == "--verbose" or option == "-v":
verbose = True
elif option == "--head" or option == "-e":
head = True
elif option == "--insecure" or option == "-k":
insecure = True
elif option == "--POST" or option == "-P":
post = True
elif option == "--repeat" or option == "-r":
tests = int(value)
if tests <= 1:
error("--repeat needs a value > 1")
elif option == "--delay" or option == "-d":
delay = float(value)
if delay <= 0:
error("--delay needs a value > 0")
elif option == "--file" or option == "-f":
ifile = value
else:
error("Unknown option %s" % option)
except getopt.error as reason:
usage(reason)
sys.exit(1)
if tests <= 1 and delay is not None:
error("--delay makes no sense if there is no repetition")
if post and head:
usage("POST or HEAD but not both")
sys.exit(1)
if ifile is None and (len(args) != 2 and len(args) != 3):
usage("Wrong number of arguments")
sys.exit(1)
if ifile is not None and len(args) != 1:
usage("Wrong number of arguments (if --file is used, do not indicate the domain name)")
sys.exit(1)
url = args[0]
if ifile is None:
name = args[1]
if len(args) == 3:
rtype = args[2]
c = pycurl.Curl()
if ifile is None:
message = dns.message.make_query(name, dns.rdatatype.from_text(rtype))
message.id = 0 # DoH requests that
if head:
c.setopt(pycurl.NOBODY, True)
if post:
c.setopt(c.URL, url)
c.setopt(pycurl.POST, True)
if ifile is None:
data = message.to_wire()
c.setopt(pycurl.POSTFIELDS, data)
else:
if ifile is None:
dns_req = base64.urlsafe_b64encode(message.to_wire()).decode('UTF8').rstrip('=')
c.setopt(c.URL, url + ("?dns=%s" % dns_req))
c.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message"])
# libcurl sets HTTP persistence automatically, thus handling the case if tests > 1
if verbose:
c.setopt(c.VERBOSE, True)
if insecure:
c.setopt(pycurl.SSL_VERIFYPEER, False)
c.setopt(pycurl.SSL_VERIFYHOST, False)
# Does not work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
c.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
ok = True
start = time.time()
if ifile is not None:
input = open(ifile)
for i in range (0, tests):
if tests > 1:
print("\nTest %i" % i)
if ifile is not None:
line = input.readline()
if line[:-1] == "":
error("Not enough data in %s for the %i tests" % (ifile, tests))
if line.find(' ') == -1:
name = line[:-1]
rtype = 'AAAA'
else:
(name, rtype) = line.split()
message = dns.message.make_query(name, dns.rdatatype.from_text(rtype))
message.id = 0 # DoH requests that
if post:
data = message.to_wire()
c.setopt(pycurl.POSTFIELDS, data)
else:
dns_req = base64.urlsafe_b64encode(message.to_wire()).decode('UTF8').rstrip('=')
c.setopt(c.URL, url + ("?dns=%s" % dns_req))
buffer = io.BytesIO()
c.setopt(c.WRITEDATA, buffer)
c.perform()
rcode = c.getinfo(pycurl.RESPONSE_CODE)
if rcode == 200:
if not head:
body = buffer.getvalue()
try:
response = dns.message.from_wire(body)
except dns.message.TrailingJunk: # Not DNS.
response = "ERROR Not proper DNS data \"%s\"" % body
ok = False
print(response)
else:
print("HEAD request successful")
else:
body = buffer.getvalue()
if len(body) == 0:
body = b"[No details]"
print("HTTP error %i: %s" % (rcode, body[0:1000].decode()), file=sys.stderr)
ok = False
buffer.close()
if tests > 1 and i == 0:
start2 = time.time()
if delay is not None:
time.sleep(delay)
c.close()
stop = time.time()
if tests > 1:
extra = ", %.2f ms/request if we ignore the first one" % ((stop-start2)*1000/(tests-1))
else:
extra = ""
print("\nTotal elapsed time: %.2f seconds (%.2f ms/request %s)" % (stop-start, (stop-start)*1000/tests, extra))
if ifile is not None:
input.close()
if ok:
sys.exit(0)
else:
sys.exit(1)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment