Commit f8965f96 authored by Alexandre's avatar Alexandre
Browse files

Split do_test routine

parent 2441c466
......@@ -205,6 +205,14 @@ class Connection:
else:
self.family = 0
def do_test(self, qname, qtype=rtype):
# Routine doing one actual test. Returns a tuple, first member is a
# result (boolean indicating success for DoT, HTTP status code for
# DoH), second member is a DNS message (or a string if there is an
# error), third member is the size of the DNS message (or None if no
# proper response).
pass
class ConnectionDoT(Connection):
def __init__(self, server, servername=None, connect=None, forceIPv4=False, forceIPv6=False,
......@@ -260,6 +268,20 @@ class ConnectionDoT(Connection):
self.session.shutdown()
self.session.close()
def do_test(self, qname, qtype=rtype):
message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype), use_edns=True, want_dnssec=True)
messagew = message.to_wire()
length = len(messagew)
id = message.id
n = self.session.send(length.to_bytes(2, byteorder='big') + messagew)
buf = self.session.recv(2)
received = int.from_bytes(buf, byteorder='big')
buf = self.session.recv(received)
response = dns.message.from_wire(buf)
if response.id != message.id:
raise Exception("The ID in the answer does not match the one in the query")
return (True, response, received)
class ConnectionDoH(Connection):
def __init__(self, server, servername=None, connect=None, forceIPv4=False, forceIPv6=False,
......@@ -298,42 +320,24 @@ class ConnectionDoH(Connection):
def end(self):
self.curl.close()
# Routine doing one actual test. Returns a tuple, first member is a
# result (boolean indicating success for DoT, HTTP status code for
# DoH), second member is a DNS message (or a string if there is an
# error), third member is the size of the DNS message (or None if no
# proper response).
def do_test(connection, qname, qtype=rtype):
message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype), use_edns=True, want_dnssec=True)
size = None
if connection.dot:
messagew = message.to_wire()
length = len(messagew)
id = message.id
n = connection.session.send(length.to_bytes(2, byteorder='big') + messagew)
buf = connection.session.recv(2)
received = int.from_bytes(buf, byteorder='big')
buf = connection.session.recv(received)
response = dns.message.from_wire(buf)
if response.id != message.id:
raise Exception("The ID in the answer does not match the one in the query")
return (True, response, received)
else: # DoH
def do_test(self, qname, qtype=rtype):
message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype), use_edns=True, want_dnssec=True)
size = None
message.id = 0 # DoH requests that
if connection.post:
connection.curl.setopt(connection.curl.URL, connection.server)
if self.post:
self.curl.setopt(self.curl.URL, self.server)
data = message.to_wire()
connection.curl.setopt(pycurl.POSTFIELDS, data)
self.curl.setopt(pycurl.POSTFIELDS, data)
else:
dns_req = base64.urlsafe_b64encode(message.to_wire()).decode('UTF8').rstrip('=')
connection.curl.setopt(connection.curl.URL, connection.server + ("?dns=%s" % dns_req))
self.curl.setopt(self.curl.URL, self.server + ("?dns=%s" % dns_req))
buffer = io.BytesIO()
connection.curl.setopt(connection.curl.WRITEDATA, buffer)
connection.curl.perform()
rcode = connection.curl.getinfo(pycurl.RESPONSE_CODE)
self.curl.setopt(self.curl.WRITEDATA, buffer)
self.curl.perform()
rcode = self.curl.getinfo(pycurl.RESPONSE_CODE)
ok = True
if rcode == 200:
ctype = connection.curl.getinfo(pycurl.CONTENT_TYPE)
ctype = self.curl.getinfo(pycurl.CONTENT_TYPE)
if ctype != "application/dns-message":
response = "Content type of the response (\"%s\") invalid" % ctype
ok = False
......@@ -521,7 +525,7 @@ for i in range (0, tests):
rtype = 'AAAA'
else:
(name, rtype) = line.split()
(rcode, msg, size) = do_test(conn, name, rtype)
(rcode, msg, size) = conn.do_test(name, rtype)
if (dot and rcode) or (not dot and rcode == 200):
if not monitoring:
print(msg)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment