Commit ffd0d930 authored by Alexandre's avatar Alexandre

Refactor code and functions

Use level of 30 (legal) for the truncated check in order to avoid
regression with the tests.
parent 7e2011e7
......@@ -161,108 +161,161 @@ def print_result(connection, request, prefix=None, display_err=True):
pass # Sometimes, msg can be binary, or Latin-1
print("HTTP error %i: %s" % (rcode, msg), file=sys.stderr)
def run_check_default(connection):
def print_check_result(test_name, ok, verbose=True):
if verbose:
print(test_name, end=' : ')
if ok:
print('OK')
else:
print('KO')
def check_dot_two_requests(connection, opts):
# not using a DoT connection -> exit the test
if not connection.dot:
return True
r1 = homer.RequestDOT('framagit.org', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r2 = homer.RequestDOT('afnic.fr', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
requests = []
requests.append(('Test 1', r1, homer.mandatory_levels["legal"]))
# RFC 7858 section 3.3, SHOULD accept several requests on one connection.
requests.append(('Test 2', r2, homer.mandatory_levels["necessary"]))
return do_check(connection, requests, opts)
def check_doh_methods(connection, opts):
# not using a DoH connection -> exit the test
if connection.dot:
return True
r1 = homer.RequestDOH('framagit.org', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r2 = homer.RequestDOH('afnic.fr', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r2.post = True
r3 = homer.RequestDOH('www.rfc-editor.org', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r3.head = True
requests = []
requests.append(('Test GET', r1, homer.mandatory_levels["legal"])) # RFC 8484, section 4.1
requests.append(('Test POST', r2, homer.mandatory_levels["legal"])) # RFC 8484, section 4.1
# HEAD method is not mentioned in RFC 8484 (see section 4.1), so just "nice to have".
requests.append(('Test HEAD', r3, homer.mandatory_levels["nicetohave"]))
return do_check(connection, requests, opts)
def check_doh_header(connection, opts, level=homer.mandatory_levels["nicetohave"],
accept="application/dns-message", content_type="application/dns-message"):
# change the MIME value and see what happens
# based on the RFC only application/dns-message must be supported, any
# other MIME type can be also supported, but nothing is said on that
# not using a DoH connection -> exit the test
if connection.dot:
return True
header = ["Accept: %s" % accept, "Content-type: %s" % content_type]
test_name = "Test Header MIME: %s " % ", ".join(h for h in header)
r1 = homer.RequestDOH('curl.haxx.se', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r1.post = True
requests = []
requests.append((test_name, r1, level))
handle = connection.curl_handle
handle.setopt(pycurl.HTTPHEADER, header)
ok = do_check(connection, requests, opts)
default_accept = "application/dns-message"
default_ct = "application/dns-message"
default_header = ["Accept: %s" % default_accept, "Content-type: %s" % default_ct]
handle.setopt(pycurl.HTTPHEADER, default_header)
return ok
def do_check(connection, requests, opts):
ok = True
req_args = { 'qname': name, 'qtype': opts.rtype, 'use_edns': opts.edns, 'want_dnssec': opts.dnssec, 'no_ecs': opts.no_ecs}
requests = homer.create_requests_list(dot=connection.dot, **req_args)
for request_pack in requests:
if connection.dot:
test_name, request, level = request_pack
else:
test_name, request, method, level = request_pack
if connection.verbose:
test_name, request, level = request_pack
# the test level is too small, therefore shouldn't be run
if level < opts.mandatory_level:
continue
request.to_wire()
if connection.debug:
print(test_name)
if connection.dot:
bundle = request
else:
if method == homer.DOH_POST:
request.post = True
elif method == homer.DOH_HEAD:
request.head = True
handle = connection.curl_handle
handle.prepare(handle, connection, request)
bundle = handle
try:
connection.send_and_receive(bundle)
except (homer.ConnectionException, homer.DOHException) as e:
# GET and POST are mandatory and therefore if an error is caught
# here, print it and exit
if method == homer.DOH_GET or method == homer.DOH_POST:
error(e)
else:
print(e, file=sys.stderr)
return False
print(e, file=sys.stderr)
ok = False
print_check_result(test_name, ok, verbose=connection.verbose)
continue
if level >= opts.mandatory_level:
ok = request.check_response(connection.debug)
print_result(connection, request, prefix=test_name, display_err=not ok)
if connection.verbose:
print()
print_check_result(test_name, ok, verbose=connection.verbose)
if not ok:
break
return ok
def run_check_mime(connection, accept="application/dns-message", content_type="application/dns-message"):
# change the MIME value and see what happens
# based on the RFC only application/dns-message must be supported, any
# other MIME type can be also supported, but nothing is said on that
if connection.dot:
return True
ok = True
header = [f"Accept: {accept}", f"Content-type: {content_type}"]
if connection.verbose:
test_name = f'Test mime: {", ".join(h for h in header)}'
print(test_name)
req_args = { 'qname': name, 'qtype': opts.rtype, 'use_edns': opts.edns, 'want_dnssec': opts.dnssec, 'no_ecs': opts.no_ecs}
request = homer.create_request(**req_args)
handle = connection.curl_handle
handle.setopt(pycurl.HTTPHEADER, header)
handle.prepare(handle, connection, request)
try:
connection.send_and_receive(handle)
except (homer.ConnectionException, homer.DOHException) as e:
print(e, file=sys.stderr)
return False
ok = request.check_response(connection.debug)
print_result(connection, request, prefix=f"Test Header {', '.join(header)}")
default = "application/dns-message"
default_header = [f"Accept: {default}", f"Content-type: {default}"]
handle.setopt(pycurl.HTTPHEADER, default_header)
if connection.verbose:
print()
return ok
def run_check_trunc(connection):
def check_truncated_query(connection, opts, level=homer.mandatory_levels["nicetohave"]):
# send truncated DNS request to the server and expect a HTTP return code
# either equal to 200 or in the 400 range
# in case the server answers with 200, look for a FORMERR error in the DNS
# response
# the test level is too small, therefore shouldn't be run
if level < opts.mandatory_level:
return True
ok = True
test_name = 'Test truncated data'
if opts.verbose:
if connection.dot:
request = homer.RequestDOT('example.com', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
else:
request = homer.RequestDOH('example.com', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
request.post = True
request.trunc_data()
if connection.debug:
print(test_name)
req_args = { 'qname': name, 'qtype': opts.rtype, 'use_edns': opts.edns, 'want_dnssec': opts.dnssec, 'no_ecs': opts.no_ecs}
if connection.dot:
request = homer.create_request(dot=connection.dot, trunc=True, **req_args)
bundle = request
else:
request = homer.create_request(trunc=True, **req_args)
request.post = True
handle = connection.curl_handle
handle.prepare(handle, connection, request)
bundle = handle
try:
# 8.8.8.8 replies FORMERR but most DoT servers violently shut down the connection (which is legal)
connection.send_and_receive(bundle, dump=connection.debug)
except OpenSSL.SSL.ZeroReturnError: # This is acceptable
return ok
return True
except dns.exception.FormError: # This is also acceptable
# Some DSN resolvers will echo mangled requests with
# the RCODE set to FORMERR
# so response can not be parsed in this case
return ok
return True
except homer.DOHException as e:
print(e, file=sys.stderr)
return False
if request.check_response(connection.debug): # FORMERR is expected
if connection.dot:
ok = request.rcode == dns.rcode.FORMERR
......@@ -277,27 +330,35 @@ def run_check_trunc(connection):
# error code (even so it means the server failed to process the
# input data)
ok = (request.rcode >= 400 and request.rcode < 500)
print_result(connection, request, prefix=test_name, display_err=not ok)
if connection.verbose:
print()
return ok
print_check_result(test_name, ok, verbose=connection.verbose)
def run_check_additionals(connection):
if not run_check_trunc(connection):
return False
# The DoH server is right to reject these (Example: 'HTTP
# error 415: only Content-Type: application/dns-message is
# supported')
run_check_mime(connection, accept="text/html")
run_check_mime(connection, content_type="text/html")
return True
return ok
def run_check(connection):
if not run_check_default(connection):
return False
if opts.check_additional and not run_check_additionals(connection):
ok = True
if connection.dot:
ok = check_dot_two_requests(connection, opts)
else:
ok = check_doh_methods(connection, opts)
if not ok:
return False
return True
# Test that different Header values are not breaking anything
# this can be added in a specific level 'donotbreak'
if not connection.dot:
# The DoH server is right to reject these (Example: 'HTTP
# error 415: only Content-Type: application/dns-message is
# supported')
ok = check_doh_header(connection, opts, level=10, accept="text/html") and ok
ok = check_doh_header(connection, opts, level=10, content_type="text/html") and ok
# test if a truncated query breaks anything
# again iwbn to have a level such as 'donotbreak' for it
ok = check_truncated_query(connection, opts, level=30) and ok
return ok
def resolved_ips(host, port, family, dot=False):
try:
......
......@@ -65,8 +65,8 @@ class RequestDOH(Request):
def __init__(self, qname, qtype='AAAA', use_edns=True, want_dnssec=False, no_ecs=True):
super().__init__(qname, qtype=qtype, use_edns=use_edns, want_dnssec=want_dnssec, no_ecs=no_ecs)
self.message.id = 0 # DoH requests that
self.post = False
self.head = False
self.post = False # TODO pass as argument
self.head = False # pass as argument
# raising custom exception for each unexpected response might be a good idea
def check_response(self, debug=False):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment