Commit c6cfbd96 authored by Alexandre's avatar Alexandre
Browse files

[DoH] Use a request class with a curl handle

parent 879ba955
......@@ -166,6 +166,28 @@ def validate_hostname(hostname, cert):
return True
return False
class RequestDoH:
def __init__(self, qname, qtype=rtype):
self.message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype), use_edns=True, want_dnssec=True)
self.message.id = 0 # DoH requests that
self.data = self.message.to_wire()
def create_handle(self, curl_opt):
self.handle = pycurl.Curl()
for opt, value in curl_opt.items():
try:
self.handle.setopt(opt, value)
except AttributeError:
pass # Probably an old version of libcurl, without CONNECT_TO. It appeared with 7.49.0
def close_handle(self):
self.handle.close()
def perform(self):
self.handle.perform()
class Connection:
def __init__(self, server, servername=None, connect=None, forceIPv4=False, forceIPv6=False,
dot=False, verbose=verbose, insecure=insecure, post=post, head=head):
......@@ -321,53 +343,60 @@ class ConnectionDoH(Connection):
verbose=verbose, insecure=insecure, post=post, head=head)
self.post = post
self.head = head
self.curl = pycurl.Curl()
self.url = server
# Does not work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
self.curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
self.curl.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message"])
http_header = ["Content-type: application/dns-message"]
self.curl_opt = {}
self.curl_opt[pycurl.HTTP_VERSION] = pycurl.CURL_HTTP_VERSION_2 # Does not
# work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
self.curl_opt[pycurl.HTTPHEADER] = http_header
if self.verbose:
self.curl.setopt(self.curl.VERBOSE, True)
self.curl_opt[pycurl.VERBOSE] = True
if self.insecure:
self.curl.setopt(pycurl.SSL_VERIFYPEER, False)
self.curl.setopt(pycurl.SSL_VERIFYHOST, False)
if self.head:
self.curl.setopt(pycurl.NOBODY, True)
if self.post:
self.curl.setopt(pycurl.POST, True)
self.curl_opt[pycurl.SSL_VERIFYPEER] = False
self.curl_opt[pycurl.SSL_VERIFYHOST] = False
if forceIPv4:
self.curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
self.curl_opt[pycurl.IPRESOLVE] = pycurl.IPRESOLVE_V4
if forceIPv6:
self.curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V6)
self.curl_opt[pycurl.IPRESOLVE] = pycurl.IPRESOLVE_V6
if connect is not None:
self.check_ip_address(connect)
try:
self.curl.setopt(pycurl.CONNECT_TO, ["::%s:" % self.repraddress,])
except AttributeError:
pass # Probably an old version of libcurl, without CONNECT_TO. It appeared with 7.49.0
self.curl_opt[pycurl.CONNECT_TO] = [f'::{self.repraddress}:443',]
def end(self):
self.curl.close()
self.request.close_handle()
def prepare_test_get(self, qname, qtype):
self.request = RequestDoH(qname, qtype)
dns_req = base64.urlsafe_b64encode(self.request.data).decode('UTF8').rstrip('=')
self.curl_opt[pycurl.URL] = self.server + ("?dns=%s" % dns_req)
def prepare_test_post(self, qname, qtype):
self.request = RequestDoH(qname, qtype)
self.curl_opt[pycurl.POST] = True
self.curl_opt[pycurl.POSTFIELDS] = self.request.data
self.curl_opt[pycurl.URL] = self.server
def prepare_test_head(self, qname, qtype):
self.prepare_test_get(qname, qtype)
self.curl_opt[pycurl.NOBODY] = True
def do_test(self, qname, qtype=rtype):
message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype), use_edns=True, want_dnssec=True)
size = None
message.id = 0 # DoH requests that
if self.post:
self.curl.setopt(self.curl.URL, self.server)
data = message.to_wire()
self.curl.setopt(pycurl.POSTFIELDS, data)
self.prepare_test_post(qname, qtype)
elif self.head:
self.prepare_test_head(qname, qtype)
else:
dns_req = base64.urlsafe_b64encode(message.to_wire()).decode('UTF8').rstrip('=')
self.curl.setopt(self.curl.URL, self.server + ("?dns=%s" % dns_req))
self.prepare_test_get(qname, qtype)
buffer = io.BytesIO()
self.curl.setopt(self.curl.WRITEDATA, buffer)
self.curl.perform()
rcode = self.curl.getinfo(pycurl.RESPONSE_CODE)
self.curl_opt[pycurl.WRITEDATA] = buffer
self.request.create_handle(self.curl_opt)
self.request.perform()
size = None
rcode = self.request.handle.getinfo(pycurl.RESPONSE_CODE)
ok = True
if rcode == 200:
ctype = self.curl.getinfo(pycurl.CONTENT_TYPE)
ctype = self.request.handle.getinfo(pycurl.CONTENT_TYPE)
if ctype != "application/dns-message":
response = "Content type of the response (\"%s\") invalid" % ctype
ok = False
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment