Commit cff5ba60 authored by Alexandre's avatar Alexandre
Browse files

[DoH] Use a unique curl handle bound to the connection

parent 5b2bdb53
......@@ -189,39 +189,11 @@ class RequestDoH:
self.data = self.message.to_wire()
self.head = False
def create_handle(self, curl_opt):
self.handle = pycurl.Curl()
for opt, value in curl_opt.items():
if opt == pycurl.NOBODY:
self.head = True
try:
self.handle.setopt(opt, value)
except AttributeError:
pass # Probably an old version of libcurl, without CONNECT_TO. It appeared with 7.49.0
def close_handle(self):
self.handle.close()
def prepare_query(self, curl_opt):
self.buffer = io.BytesIO()
curl_opt[pycurl.WRITEDATA] = self.buffer
self.create_handle(curl_opt)
def perform_query(self):
self.handle.perform()
def receive_query(self):
self.body = self.buffer.getvalue()
self.body_size = len(self.body)
self.buffer.close()
def check_response(self):
ok = True
self.rcode = self.handle.getinfo(pycurl.RESPONSE_CODE)
if self.rcode == 200:
ctype = self.handle.getinfo(pycurl.CONTENT_TYPE)
if ctype != "application/dns-message":
self.response = "Content type of the response (\"%s\") invalid" % ctype
if self.ctype != "application/dns-message":
self.response = "Content type of the response (\"%s\") invalid" % self.ctype
ok = False
else:
if not self.head:
......@@ -412,57 +384,94 @@ class ConnectionDoH(Connection):
self.url = server
self.connect = connect
def reset_conn_opt(self):
http_header = ["Content-type: application/dns-message"]
self.curl_opt = {}
self.curl_opt[pycurl.HTTP_VERSION] = pycurl.CURL_HTTP_VERSION_2 # Does not
# work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
self.curl_opt[pycurl.HTTPHEADER] = http_header
def create_handle(self):
self.curl = pycurl.Curl()
# Does not work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
self.curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
if self.verbose:
self.curl_opt[pycurl.VERBOSE] = True
self.curl.setopt(pycurl.VERBOSE, True)
if self.insecure:
self.curl_opt[pycurl.SSL_VERIFYPEER] = False
self.curl_opt[pycurl.SSL_VERIFYHOST] = False
self.curl.setopt(pycurl.SSL_VERIFYPEER, False)
self.curl.setopt(pycurl.SSL_VERIFYHOST, False)
if forceIPv4:
self.curl_opt[pycurl.IPRESOLVE] = pycurl.IPRESOLVE_V4
self.curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
if forceIPv6:
self.curl_opt[pycurl.IPRESOLVE] = pycurl.IPRESOLVE_V6
self.curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V6)
if self.connect is not None:
self.check_ip_address(self.connect)
self.curl_opt[pycurl.CONNECT_TO] = [f'::{self.repraddress}:443',]
self.curl.setopt(pycurl.CONNECT_TO, [f'::{self.repraddress}:443',])
self.curl.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message"])
def end(self):
self.request.close_handle()
self.curl.close()
def set_opt(self, opt, value):
self.curl.setopt(opt, value)
def reset_opt_default(self):
opts = {
pycurl.NOBODY: False,
pycurl.POST: False,
pycurl.POSTFIELDS: '',
pycurl.URL: ''
}
for opt, value in opts.items():
self.set_opt(opt, value)
def prepare(self, request):
try:
self.reset_opt_default()
except AttributeError:
self.create_handle()
if self.post:
self.prepare_post(request)
elif self.head:
self.prepare_head(request)
request.head = True
else:
self.prepare_get(request)
def prepare_test_get(self, qname, qtype):
self.request = RequestDoH(qname, qtype, want_dnssec=dnssec, use_edns=edns)
dns_req = base64.urlsafe_b64encode(self.request.data).decode('UTF8').rstrip('=')
self.curl_opt[pycurl.URL] = self.server + ("?dns=%s" % dns_req)
def prepare_get(self, request):
self.set_opt(pycurl.HTTPGET, True)
dns_req = base64.urlsafe_b64encode(request.data).decode('UTF8').rstrip('=')
self.set_opt(pycurl.URL, self.server + ("?dns=%s" % dns_req))
def prepare_test_post(self, qname, qtype):
self.request = RequestDoH(qname, qtype, want_dnssec=dnssec, use_edns=edns)
self.curl_opt[pycurl.POST] = True
self.curl_opt[pycurl.POSTFIELDS] = self.request.data
self.curl_opt[pycurl.URL] = self.server
def prepare_post(self, request):
self.set_opt(pycurl.POST, True)
self.set_opt(pycurl.POSTFIELDS, request.data)
self.set_opt(pycurl.URL, self.server)
def prepare_test_head(self, qname, qtype):
self.prepare_test_get(qname, qtype)
self.curl_opt[pycurl.NOBODY] = True
def prepare_head(self, request):
self.prepare_get(request)
self.set_opt(pycurl.NOBODY, True)
def perform(self):
self.buffer = io.BytesIO()
self.set_opt(pycurl.WRITEDATA, self.buffer)
self.curl.perform()
def receive(self, request):
body = self.buffer.getvalue()
body_size = len(body)
http_code = self.curl.getinfo(pycurl.RESPONSE_CODE)
content_type = self.curl.getinfo(pycurl.CONTENT_TYPE)
request.body = body
request.body_size = body_size
request.rcode = http_code
request.ctype = content_type
self.buffer.close()
def send_and_receive(self, request):
self.prepare(request)
self.perform()
self.receive(request)
def do_test(self, qname, qtype=rtype):
self.reset_conn_opt()
if self.post:
self.prepare_test_post(qname, qtype)
elif self.head:
self.prepare_test_head(qname, qtype)
else:
self.prepare_test_get(qname, qtype)
self.request.prepare_query(self.curl_opt)
self.request.perform_query()
self.request.receive_query()
self.request.check_response()
return (self.request.rcode, self.request.response, self.request.body_size)
request = RequestDoH(qname, qtype, want_dnssec=dnssec, use_edns=edns)
self.send_and_receive(request)
request.check_response()
return (request.rcode, request.response, request.body_size)
def get_next_domain(input_file):
......@@ -630,39 +639,26 @@ except ConnectionRefusedError:
error("Connection to server refused")
if ifile is not None:
input = open(ifile)
if check:
if dot:
tests = 2
else:
test_list = []
test_list.append({ 'name': 'GET', 'get': True, 'post': False, 'head': False })
test_list.append({ 'name': 'POST', 'get': False, 'post': True, 'head': False })
test_list.append({ 'name': 'HEAD', 'get': False, 'post': False, 'head': True })
tests = len(test_list)
for i in range (0, tests):
if tests > 1 and (not check or verbose):
if check:
test_name = test_list[i]['name'] if check else i
print(f"\nTest {test_name}")
if ifile is not None:
name, rtype = get_next_domain(input)
if check:
if not dot:
conn.get = test_list[i]['get']
conn.post = test_list[i]['post']
conn.head = test_list[i]['head']
try:
(rcode, msg, size) = conn.do_test(name, rtype)
except OpenSSL.SSL.Error as e:
ok = False
print(e)
break
if not conn.print_result(rcode, msg, size):
ok = False
if tests > 1 and i == 0:
start2 = time.time()
if delay is not None:
time.sleep(delay)
if not check:
for i in range (0, tests):
if tests > 1:
print("\nTest %i" % i)
if ifile is not None:
name, rtype = get_next_domain(input)
try:
(rcode, msg, size) = conn.do_test(name, rtype)
except OpenSSL.SSL.Error as e:
ok = False
error(e)
break
if not conn.print_result(rcode, msg, size):
ok = False
if tests > 1 and i == 0:
start2 = time.time()
if delay is not None:
time.sleep(delay)
else:
pass # TODO --check option running several tests
stop = time.time()
if tests > 1:
extra = ", %.2f ms/request if we ignore the first one" % ((stop-start2)*1000/(tests-1))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment