Commit cf1d18f4 authored by alex's avatar alex
Browse files

Merge branch 'pipelining' into 'master'

[DoT] Pipelining

See merge request bortzmeyer/homer!18
parents 63d193fa 2be57b81
......@@ -59,6 +59,9 @@ Possible options, besides `--dot`:
* --key KEYINBASE64: authentifies a DoT resolver with its public
key. Example: `homer.py --key "62lKu9HsDVbyiPenApnc4sfmSYTHOVfFgL3pyB+cBL4=" --dot 145.100.185.15 IN NS`
* --check: Run a set of tests (see below)
* --pipelining: on DoT, send several requests even before getting the
reply to the first one (may increase performance when you have
several requests)
* --multistreams: (DoH) Uses HTTP/2 streams (requires the --file option)
* --file INPUT_FILE: provide an input file with a list of domain name to query
(read the first line only, use --repeat N to read up to N lines of the file)
......
......@@ -53,6 +53,8 @@ delay = None
forceIPv4 = False
forceIPv6 = False
connectTo = None
pipelining = False
max_in_flight = 20
multistreams = False
sync = False
display_results = True
......@@ -83,6 +85,9 @@ DOH_HEAD = 2
mandatory_levels = {"legal": 30, "necessary": 20, "nicetohave": 10}
TIMEOUT_CONN = 2
TIMEOUT_READ = 1
SLEEP_TIMEOUT = 0.5
MAX_DURATION = 10
def error(msg=None, exit=True):
if msg is None:
......@@ -269,11 +274,9 @@ def timeout_connection(signum, frame):
class TimeoutConnectionError(Exception):
pass
class CustomException(Exception):
pass
class Request:
def __init__(self, qname, qtype=rtype, use_edns=edns, want_dnssec=dnssec):
if no_ecs:
......@@ -298,6 +301,8 @@ class Request:
class RequestDoT(Request):
def check_response(self, debug=False):
if self.response is None:
raise Exception("No reply received")
ok = self.ok
if not self.rcode:
self.ok = False
......@@ -310,6 +315,11 @@ class RequestDoT(Request):
return False
return self.ok
def store_response(self, rcode, response, size):
self.rcode = True
self.response = response
self.response_size = size
class RequestDoH(Request):
def __init__(self, qname, qtype=rtype, use_edns=edns, want_dnssec=dnssec):
......@@ -395,7 +405,7 @@ class Connection:
class ConnectionDoT(Connection):
def __init__(self, server, servername=None, connect=None, forceIPv4=False, forceIPv6=False,
verbose=verbose, debug=debug, insecure=insecure):
pipelining=pipelining, verbose=verbose, debug=debug, insecure=insecure):
Connection.__init__(self, server, servername=servername, connect=connect,
forceIPv4=forceIPv4, forceIPv6=forceIPv6, dot=True,
verbose=verbose, debug=debug, insecure=insecure)
......@@ -422,7 +432,16 @@ class ConnectionDoT(Connection):
error(f'Could not connect to "{server}"')
else:
print(f'Could not connect to "{server}" on {connect}')
self.pipelining = pipelining
if pipelining:
self.all_requests = [] # Currently, we load everything in memory
# since we want to keep everything,
# anyway. May be in the future, if we don't
# want to keep individual results, we'll use
# an iterator to fill a smaller table.
# all_requests is indexed by its rank in the input file.
self.pending = {} # pending is indexed by the query ID, and its
# maximum size is max_in_flight.
def connect(self, addr, sock_family):
signal.alarm(TIMEOUT_CONN)
......@@ -485,6 +504,8 @@ class ConnectionDoT(Connection):
if key_string != key:
error("Key error: expected \"%s\", got \"%s\"" % (key, key_string))
signal.alarm(0)
if pipelining:
self.sock.settimeout(TIMEOUT_READ)
return True
def end(self):
......@@ -497,23 +518,69 @@ class ConnectionDoT(Connection):
length = len(data)
self.session.send(length.to_bytes(2, byteorder='big') + data)
def receive_data(self, request, dump=False):
buf = self.session.recv(2)
request.response_size = int.from_bytes(buf, byteorder='big')
buf = self.session.recv(request.response_size)
def receive_data(self, dump=False):
try:
buf = self.session.recv(2)
except OpenSSL.SSL.WantReadError:
return (False, None, None)
size = int.from_bytes(buf, byteorder='big')
buf = self.session.recv(size)
if dump:
dump_data(buf, 'data recv')
request.response = dns.message.from_wire(buf)
request.rcode = True
response = dns.message.from_wire(buf)
return (True, response, size)
def send_and_receive(self, request, dump=False):
self.send_data(request.data, dump=dump)
self.receive_data(request, dump=dump)
rcode, response, size = self.receive_data(dump=dump)
request.store_response(rcode, response, size)
def do_test(self, request, synchronous=True):
self.send_and_receive(request)
request.check_response(self.debug)
self.send_data(request.data)
if synchronous:
rcode, response, size = self.receive_data()
request.store_response(rcode, response, size)
request.check_response(self.debug)
def pipelining_add_request(self, request):
self.all_requests.append({'request': request, 'response': None}) # No answer yet
def pipelining_fill_pending(self, index):
if index < len(self.all_requests):
request = self.all_requests[index]['request']
id = request.message.id
# TODO check there is no duplicate in IDs
self.pending[id] = (False, index, request)
self.do_test(request, synchronous = False)
def pipelining_init_pending(self, max_in_flight):
for i in range(0, max_in_flight):
if i == len(self.all_requests):
break
self.pipelining_fill_pending(i)
return i
def read_result(self, connection, requests):
rcode, response, size = self.receive_data() # TODO can raise
# OpenSSL.SSL.ZeroReturnError
# if the
# conenction was
# closed
if not rcode:
if display_results:
print("TIMEOUT")
return None
id = response.id
if id not in requests:
raise Exception("Received response for ID %s which is unexpected" % id)
over, rank, request = requests[id]
self.all_requests[rank]['response'] = (rcode, response, size)
requests[id] = (True, rank, request)
if display_results:
print()
print(response)
# TODO a timeout if some responses are lost?
return id
def create_handle(connection):
def reset_opt_default(handle):
......@@ -928,9 +995,10 @@ if not monitoring:
["help", "verbose", "debug", "dot",
"head", "HEAD", "post", "POST",
"insecure", "vhost=", "multistreams",
"pipelining", "max-in-flight=", "key=",
"dnssec", "noedns", "ecs", "nosni",
"sync", "no-display-results", "time",
"dnssec", "noedns", "ecs", "repeat=", "file=", "delay=",
"key=", "nosni",
"file=", "repeat=", "delay=",
"v4only", "v6only",
"check", "mandatory-level="])
for option, value in optlist:
......@@ -983,12 +1051,23 @@ if not monitoring:
ifile = value
elif option == "--key":
key = value
elif option == "-4" or option == "v4only":
elif option == "-4" or option == "--v4only":
forceIPv4 = True
elif option == "-6" or option == "v6only":
elif option == "-6" or option == "--v6only":
forceIPv6 = True
elif option == "--pipelining":
pipelining = True
elif option == "--max-in-flight":
max_in_flight = int(value)
if max_in_flight <= 0:
error("--max_in_flight but be > 0")
if max_in_flight >= 65536:
error("Because of a limit of the DNS protocol (the size of the query ID) --max_in_flight must be < 65 536")
elif option == "--check":
check = True
display_results = False
elif option == "--no-display-results":
display_results = False
elif option == "--mandatory-level":
mandatory_level = value
else:
......@@ -1000,11 +1079,17 @@ if not monitoring:
error("--delay makes no sense with multistreams")
if tests <= 1 and delay is not None:
error("--delay makes no sense if there is no repetition")
if post and head:
usage("POST or HEAD but not both")
if not dot and pipelining:
usage("Pipelining is only accepted for DoT")
sys.exit(1)
if dot and (post or head):
usage("POST or HEAD makes non sense for DoT")
sys.exit(1)
if post and head:
usage("POST or HEAD but not both")
sys.exit(1)
if pipelining and ifile is None:
usage("Pipelining requires an input file")
sys.exit(1)
if check and multistreams:
usage("--check and --multistreams are not compatible")
......@@ -1156,7 +1241,7 @@ for connectTo in ip_set:
if dot:
conn = ConnectionDoT(url, servername=extracheck, connect=connectTo, verbose=verbose,
debug=debug, forceIPv4=forceIPv4, forceIPv6=forceIPv6,
insecure=insecure)
pipelining=pipelining, insecure=insecure)
else:
conn = ConnectionDoH(url, servername=extracheck, connect=connectTo, verbose=verbose,
debug=debug, forceIPv4=forceIPv4, forceIPv6=forceIPv6,
......@@ -1189,33 +1274,61 @@ for connectTo in ip_set:
if not dot:
request.head = head
request.post = post
try:
conn.do_test(request, synchronous = not multistreams)
except (OpenSSL.SSL.Error, CustomException) as e:
ok = False
error(e)
break
if not multistreams:
if not print_result(conn, request):
if not pipelining:
try:
conn.do_test(request, synchronous = not multistreams)
except (OpenSSL.SSL.Error, CustomException) as e:
ok = False
if tests > 1 and i == 0:
if multistreams: # do the first query alone
# to establish the connection and hence avoid starting
# the transfer of all the other queries
conn.perform_multi()
conn.first_handle = conn.all_handles[0]
start2 = time.time()
if delay is not None and not multistreams:
time.sleep(delay)
error(e)
break
if not multistreams:
if not print_result(conn, request):
ok = False
if tests > 1 and i == 0:
if multistreams: # do the first query alone
# to establish the connection and hence avoid starting
# the transfer of all the other queries
conn.perform_multi()
conn.first_handle = conn.all_handles[0]
start2 = time.time()
if delay is not None:
time.sleep(delay)
else: # We do pipelining
conn.pipelining_add_request(request)
if multistreams:
conn.perform_multi()
if sync:
conn.read_results()
if dot and pipelining:
print("")
done = 0
current = conn.pipelining_init_pending(max_in_flight)
while done < tests:
if time.time() > start + MAX_DURATION: # if we send thousands of requests
# MAX_DURATION will be reached
# need to increase MAX_DURATION based
# on the number of queries
# or to define a relation such as
# f(tests) = MAX_DURATION
print("Elapsed time too long, %i requests never got a reply" % (tests-done))
ok = False
break
id = conn.read_result(conn, conn.pending)
if id is None: # Probably a timeout
time.sleep(SLEEP_TIMEOUT)
continue
done += 1
over, rank, request = conn.pending[id]
if not over:
error("Internal error, request %i should be over" % id)
if current < len(conn.all_requests):
conn.pipelining_fill_pending(current)
current += 1
else:
ok = run_check(conn) and ok # need to run run_check first
stop = time.time()
if tests > 1 and not multistreams:
extra = " , %.2f ms/request if we ignore the first one" % ((stop-start2)*1000/(tests-1))
if tests > 1 and not pipelining and not multistreams:
extra = ", %.2f ms/request if we ignore the first one" % ((stop-start2)*1000/(tests-1))
else:
extra = ""
if not monitoring and (not check or verbose):
......@@ -1233,7 +1346,7 @@ for connectTo in ip_set:
input.close()
conn.end()
if ok:
if check:
if check or pipelining:
print('OK')
sys.exit(0)
else:
......
......@@ -10,6 +10,7 @@ config:
- "check: test related to the compliance option --check"
- "forceIPv4: test using the option -4"
- "forceIPv6: test using the option -6"
- "pipelining: test with --pipelining option"
- "fail: mark test with a resolver currently failing our tests"
- "slow: test that need time to run"
......@@ -528,6 +529,52 @@ tests:
retcode: 1
partstderr: 'Key error'
################################################################################
- exe: './homer.py'
name: '[dot][pipelining] Pipelining only with dot'
markers:
- 'pipelining'
args:
- '--pipelining'
- 'dot.bortzmeyer.fr'
retcode: 1
partstderr: 'Pipelining is only accepted for DoT'
stdout: ''
- exe: './homer.py'
name: '[dot][pipelining] Pipelining only with input file'
markers:
- 'pipelining'
args:
- '--dot'
- '--pipelining'
- 'dot.bortzmeyer.fr'
retcode: 1
partstderr: 'Pipelining requires an input file'
stdout: ''
- exe: './homer.py'
name: '[dot][pipelining] 5 requests from input file'
markers:
- 'dot'
- 'pipelining'
args:
- '--dot'
- '--pipelining'
- '--file'
- 'input_file'
- '--repeat'
- '5'
- 'dot.bortzmeyer.fr'
retcode: 0
stderr: ''
partstdout: "OK\n"
################################################################################
- exe: './homer.py'
name: '[dot][check] Resolver returning a malformed DNS message'
markers:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment