Commit 6b627037 authored by Alexandre's avatar Alexandre

Merge branch 'check-option' into 'master'

Improve --check

* refactor
* better output (number of resolved IPs, error after test status)
* new mandatory level : `nocrash`

See merge request bortzmeyer/homer!27
parents 7e2011e7 ebdd15eb
......@@ -100,7 +100,7 @@ N lines of the FILE
(read the first line only, use --repeat N to read up to N lines of the file)
* `--check` : perform a set of predefined tests
* `--mandatory-level LEVEL` : define the LEVEL of test to perform (only
with --check). Available LEVEL : `legal`, `necessary`, `nicetohave`
with --check). Available LEVEL : `legal`, `necessary`, `nicetohave`, `nocrash`
* `--no-display-results` : disable output of DNS response, this can be combined
with `-v` to keep only part of the output
* `-V --vhost <vhost>` : define a specific virtual host
......@@ -200,7 +200,7 @@ the choosen level is lower than the level of the test.
| level | test |
| ----- | ---- |
| legal | two queries on the same connection |
| nocrash¹ | truncated query |
| nocrash | truncated query |
#### List of tests for DoH
......@@ -209,13 +209,9 @@ the choosen level is lower than the level of the test.
| legal | HTTP GET method |
| legal | HTTP POST method |
| nicetohave | HTTP HEAD method |
| nocrash¹ | truncated query |
| nocrash¹ | Accept-header: text/html |
| nocrash¹ | Content-type: text/html |
¹ The `nocrash` level is not defined as such in Homer, it is just to
show that other tests are performed to assess the robustess of the
DoT/DoH server.
| nocrash | truncated query |
| nocrash | Accept-header: text/html |
| nocrash | Content-type: text/html |
### Multistreams
......
......@@ -161,108 +161,160 @@ def print_result(connection, request, prefix=None, display_err=True):
pass # Sometimes, msg can be binary, or Latin-1
print("HTTP error %i: %s" % (rcode, msg), file=sys.stderr)
def run_check_default(connection):
def print_check_result(test_name, ok, verbose=True):
if verbose:
print(test_name, end=' : ')
if ok:
print('OK')
else:
print('KO')
def check_dot_two_requests(connection, opts):
# not using a DoT connection -> exit the test
if not connection.dot:
return True
r1 = homer.RequestDOT('framagit.org', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r2 = homer.RequestDOT('afnic.fr', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
requests = []
requests.append(('Test 1', r1, homer.mandatory_levels["legal"]))
# RFC 7858 section 3.3, SHOULD accept several requests on one connection.
requests.append(('Test 2', r2, homer.mandatory_levels["necessary"]))
return do_check(connection, requests, opts)
def check_doh_methods(connection, opts):
# not using a DoH connection -> exit the test
if connection.dot:
return True
r1 = homer.RequestDOH('framagit.org', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r2 = homer.RequestDOH('afnic.fr', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r2.post = True
r3 = homer.RequestDOH('www.rfc-editor.org', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r3.head = True
requests = []
requests.append(('Test GET', r1, homer.mandatory_levels["legal"])) # RFC 8484, section 4.1
requests.append(('Test POST', r2, homer.mandatory_levels["legal"])) # RFC 8484, section 4.1
# HEAD method is not mentioned in RFC 8484 (see section 4.1), so just "nice to have".
requests.append(('Test HEAD', r3, homer.mandatory_levels["nicetohave"]))
return do_check(connection, requests, opts)
def check_doh_header(connection, opts, level=homer.mandatory_levels["nicetohave"],
accept="application/dns-message", content_type="application/dns-message"):
# change the MIME value and see what happens
# based on the RFC only application/dns-message must be supported, any
# other MIME type can be also supported, but nothing is said on that
# not using a DoH connection -> exit the test
if connection.dot:
return True
header = ["Accept: %s" % accept, "Content-type: %s" % content_type]
test_name = "Test Header MIME: %s " % ", ".join(h for h in header)
r1 = homer.RequestDOH('curl.haxx.se', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
r1.post = True
requests = []
requests.append((test_name, r1, level))
handle = connection.curl_handle
handle.setopt(pycurl.HTTPHEADER, header)
ok = do_check(connection, requests, opts)
default_accept = "application/dns-message"
default_ct = "application/dns-message"
default_header = ["Accept: %s" % default_accept, "Content-type: %s" % default_ct]
handle.setopt(pycurl.HTTPHEADER, default_header)
return ok
def do_check(connection, requests, opts):
ok = True
req_args = { 'qname': name, 'qtype': opts.rtype, 'use_edns': opts.edns, 'want_dnssec': opts.dnssec, 'no_ecs': opts.no_ecs}
requests = homer.create_requests_list(dot=connection.dot, **req_args)
for request_pack in requests:
if connection.dot:
test_name, request, level = request_pack
else:
test_name, request, method, level = request_pack
if connection.verbose:
test_name, request, level = request_pack
# the test level is too small, therefore shouldn't be run
if level < opts.mandatory_level:
continue
request.to_wire()
if connection.debug:
print(test_name)
if connection.dot:
bundle = request
else:
if method == homer.DOH_POST:
request.post = True
elif method == homer.DOH_HEAD:
request.head = True
handle = connection.curl_handle
handle.prepare(handle, connection, request)
bundle = handle
try:
connection.send_and_receive(bundle)
except (homer.ConnectionException, homer.DOHException) as e:
# GET and POST are mandatory and therefore if an error is caught
# here, print it and exit
if method == homer.DOH_GET or method == homer.DOH_POST:
error(e)
else:
print(e, file=sys.stderr)
return False
ok = False
print_check_result(test_name, ok, verbose=connection.verbose)
print(e, file=sys.stderr)
continue
if level >= opts.mandatory_level:
ok = request.check_response(connection.debug)
print_check_result(test_name, ok, verbose=connection.verbose)
print_result(connection, request, prefix=test_name, display_err=not ok)
if connection.verbose:
print()
if not ok:
break
return ok
def run_check_mime(connection, accept="application/dns-message", content_type="application/dns-message"):
# change the MIME value and see what happens
# based on the RFC only application/dns-message must be supported, any
# other MIME type can be also supported, but nothing is said on that
if connection.dot:
return True
ok = True
header = [f"Accept: {accept}", f"Content-type: {content_type}"]
if connection.verbose:
test_name = f'Test mime: {", ".join(h for h in header)}'
print(test_name)
req_args = { 'qname': name, 'qtype': opts.rtype, 'use_edns': opts.edns, 'want_dnssec': opts.dnssec, 'no_ecs': opts.no_ecs}
request = homer.create_request(**req_args)
handle = connection.curl_handle
handle.setopt(pycurl.HTTPHEADER, header)
handle.prepare(handle, connection, request)
try:
connection.send_and_receive(handle)
except (homer.ConnectionException, homer.DOHException) as e:
print(e, file=sys.stderr)
return False
ok = request.check_response(connection.debug)
print_result(connection, request, prefix=f"Test Header {', '.join(header)}")
default = "application/dns-message"
default_header = [f"Accept: {default}", f"Content-type: {default}"]
handle.setopt(pycurl.HTTPHEADER, default_header)
if connection.verbose:
print()
return ok
def run_check_trunc(connection):
def check_truncated_query(connection, opts, level=homer.mandatory_levels["nicetohave"]):
# send truncated DNS request to the server and expect a HTTP return code
# either equal to 200 or in the 400 range
# in case the server answers with 200, look for a FORMERR error in the DNS
# response
# the test level is too small, therefore shouldn't be run
if level < opts.mandatory_level:
return True
ok = True
test_name = 'Test truncated data'
if opts.verbose:
if connection.dot:
request = homer.RequestDOT('example.com', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
else:
request = homer.RequestDOH('example.com', qtype=opts.rtype, use_edns=opts.edns, want_dnssec=opts.dnssec, no_ecs=opts.no_ecs)
request.post = True
request.trunc_data()
if connection.debug:
print(test_name)
req_args = { 'qname': name, 'qtype': opts.rtype, 'use_edns': opts.edns, 'want_dnssec': opts.dnssec, 'no_ecs': opts.no_ecs}
if connection.dot:
request = homer.create_request(dot=connection.dot, trunc=True, **req_args)
bundle = request
else:
request = homer.create_request(trunc=True, **req_args)
request.post = True
handle = connection.curl_handle
handle.prepare(handle, connection, request)
bundle = handle
try:
# 8.8.8.8 replies FORMERR but most DoT servers violently shut down the connection (which is legal)
connection.send_and_receive(bundle, dump=connection.debug)
except OpenSSL.SSL.ZeroReturnError: # This is acceptable
return ok
return True
except dns.exception.FormError: # This is also acceptable
# Some DSN resolvers will echo mangled requests with
# the RCODE set to FORMERR
# so response can not be parsed in this case
return ok
return True
except homer.DOHException as e:
print(e, file=sys.stderr)
return False
if request.check_response(connection.debug): # FORMERR is expected
if connection.dot:
ok = request.rcode == dns.rcode.FORMERR
......@@ -277,27 +329,33 @@ def run_check_trunc(connection):
# error code (even so it means the server failed to process the
# input data)
ok = (request.rcode >= 400 and request.rcode < 500)
print_check_result(test_name, ok, verbose=connection.verbose)
print_result(connection, request, prefix=test_name, display_err=not ok)
if connection.verbose:
print()
return ok
def run_check_additionals(connection):
if not run_check_trunc(connection):
return False
# The DoH server is right to reject these (Example: 'HTTP
# error 415: only Content-Type: application/dns-message is
# supported')
run_check_mime(connection, accept="text/html")
run_check_mime(connection, content_type="text/html")
return True
return ok
def run_check(connection):
if not run_check_default(connection):
return False
if opts.check_additional and not run_check_additionals(connection):
ok = True
if connection.dot:
ok = check_dot_two_requests(connection, opts)
else:
ok = check_doh_methods(connection, opts)
if not ok and opts.mandatory_level >= homer.mandatory_levels["nicetohave"]:
return False
return True
# Test that different Header values are not breaking anything
if not connection.dot:
# The DoH server is right to reject these (Example: 'HTTP
# error 415: only Content-Type: application/dns-message is
# supported')
ok = check_doh_header(connection, opts, level=homer.mandatory_levels["nocrash"], accept="text/html") and ok
ok = check_doh_header(connection, opts, level=homer.mandatory_levels["nocrash"], content_type="text/html") and ok
# test if a truncated query breaks anything
ok = check_truncated_query(connection, opts, level=homer.mandatory_levels["nocrash"]) and ok
return ok
def resolved_ips(host, port, family, dot=False):
try:
......@@ -555,14 +613,20 @@ else:
family = 0
ip_set = resolved_ips(netloc, port, family, opts.dot)
# print number of IPs found
if opts.verbose and opts.check:
print("Checking \"%s\" ..." % url)
print("%d IP found : %s" % (len(ip_set), ', '.join(ip_set)))
ok = True
i = 1 # ip counter
for ip in ip_set:
if opts.dot and opts.vhostname is not None:
extracheck = opts.vhostname
else:
extracheck = None
if opts.verbose and opts.check and ip:
print("Checking \"%s\" on %s ..." % (url, ip))
print("(%d/%d) checking IP : %s" % (i, len(ip_set), ip))
try:
if opts.dot:
conn = homer.ConnectionDOT(url, servername=extracheck, connect_to=ip,
......@@ -587,6 +651,8 @@ for ip in ip_set:
err = "Could not connect to \"%s\"" % url
if opts.connectTo is not None:
err += " on %s" % opts.connectTo
elif ip is not None:
err += " on %s" % ip
error(err)
except (homer.ConnectionException, homer.DOHException) as e:
error(e)
......@@ -603,6 +669,8 @@ for ip in ip_set:
input.close()
conn.end()
i += 1
if ok:
if opts.check or opts.pipelining:
print('OK')
......
......@@ -40,5 +40,9 @@ DOH_GET = 0
DOH_POST = 1
DOH_HEAD = 2
# Is the test mandatory?
mandatory_levels = {"legal": 30, "necessary": 20, "nicetohave": 10}
# legal : RFC compliant
# necessary : should work
# nicetohave : not mentionned in the RFC but good if implemented
# nocrash : edge tests (undocumented) just to see if the server crash (this would be bad)
mandatory_levels = {"legal": 30, "necessary": 20, "nicetohave": 10, "nocrash": 5}
......@@ -65,8 +65,8 @@ class RequestDOH(Request):
def __init__(self, qname, qtype='AAAA', use_edns=True, want_dnssec=False, no_ecs=True):
super().__init__(qname, qtype=qtype, use_edns=use_edns, want_dnssec=want_dnssec, no_ecs=no_ecs)
self.message.id = 0 # DoH requests that
self.post = False
self.head = False
self.post = False # TODO pass as argument
self.head = False # pass as argument
# raising custom exception for each unexpected response might be a good idea
def check_response(self, debug=False):
......
......@@ -183,6 +183,8 @@ tests:
timeout: 12
args:
- '--check'
- '--mandatory-level'
- 'nocrash'
- 'https://doh.42l.fr/dns-query'
- 'framagit.org'
partstderr: 'Test truncated data: HTTP error 502'
......@@ -223,7 +225,7 @@ tests:
- '--dot'
- 'dot.bortzmeyer.fr'
- 'www.afnic.fr'
partstdout: "on 193.70.85.11 ...\nConnecting to 193.70.85.11 ..."
partstdout: "checking IP : 193.70.85.11\nConnecting to 193.70.85.11 ..."
- exe: './homer.py'
name: '[doh][check] Test that all the resolved IPs are tried, try another IP'
......@@ -236,7 +238,7 @@ tests:
- '--dot'
- 'dot.bortzmeyer.fr'
- 'www.afnic.fr'
partstdout: "on 2001:41d0:302:2200::180 ...\nConnecting to 2001:41d0:302:2200::180 ..."
partstdout: "checking IP : 2001:41d0:302:2200::180\nConnecting to 2001:41d0:302:2200::180 ..."
- exe: './homer.py'
name: '[dot][check] Test all the IPs, force IPv4'
......@@ -251,7 +253,7 @@ tests:
- '--dot'
- 'dns.google'
- 'framagit.org'
partstdout: "on 8.8.8.8 ...\nConnecting to 8.8.8.8 ..."
partstdout: "checking IP : 8.8.8.8\nConnecting to 8.8.8.8 ..."
- exe: './homer.py'
name: '[dot][check] Test all the IPs, force IPv4, check another IP'
......@@ -266,7 +268,7 @@ tests:
- '--dot'
- 'dns.google'
- 'framagit.org'
partstdout: "on 8.8.4.4 ...\nConnecting to 8.8.4.4 ..."
partstdout: "checking IP : 8.8.4.4\nConnecting to 8.8.4.4 ..."
- exe: './homer.py'
name: '[dot][check] Test all the IPs, force IPv6'
......@@ -281,7 +283,7 @@ tests:
- '--dot'
- 'dns.google'
- 'framagit.org'
partstdout: "on 2001:4860:4860::8844 ...\nConnecting to 2001:4860:4860::8844 ..."
partstdout: "checking IP : 2001:4860:4860::8844\nConnecting to 2001:4860:4860::8844 ..."
- exe: './homer.py'
name: '[dot][check] Test all the IPs, force IPv6, check another IP'
......@@ -296,7 +298,7 @@ tests:
- '--dot'
- 'dns.google'
- 'framagit.org'
partstdout: "on 2001:4860:4860::8888 ...\nConnecting to 2001:4860:4860::8888 ..."
partstdout: "checking IP : 2001:4860:4860::8888\nConnecting to 2001:4860:4860::8888 ..."
- exe: './homer.py'
name: '[doh][check] Test all the IPs, force IPv4'
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment