Commit e75c5912 authored by Alexandre's avatar Alexandre
Browse files

perf: script to stress load a DoH resolver

parent a69be136
#!/usr/bin/env python3
import pycurl
import dns.message
import io
import sys
import base64
import getopt
import urllib.parse
import socket
import time
import hashlib
import threading
parse_dnspython = False
check_rcode = False
# Values that can be changed from the command line
dot = False # DoH by default
verbose = False
debug = False
insecure = True
dnssec = False
edns = True
no_ecs = True
sni = True
rtype = 'AAAA'
vhostname = None
key = None # SPKI
ifile = None # Input file
forceIPv4 = False
forceIPv6 = False
connectTo = None
multistreams = True
total_time = 10 # seconds
n_conn = 5 # number of concurrent connections
max_handles = 200 # maximum handles in a multi_handle
ifile = None # input file
concurrent_streams = 100 # maximum concurrent streams
g_recv = 0 # total number of finished transfers (including errors)
g_err = 0 # total number of failing transfers (including servfail)
g_servfail = 0 # HTTP 200 + DNS SERVFAIL
g_handlefail = 0 # failing handles
g_recv_array = [] # array with all the recv value
def read_rcode(data):
return data[3] & 15
def is_valid_url(url):
try:
result = urllib.parse.urlparse(url) # A very poor validation, many
# errors (for instance whitespaces, IPv6 address litterals without
# brackets...) are ignored.
return (result.scheme=="https" and result.netloc != "")
except ValueError:
return False
class Request:
def __init__(self, qname, qtype=rtype, use_edns=edns, want_dnssec=dnssec):
if no_ecs:
opt = dns.edns.ECSOption(address='', srclen=0) # Disable ECS (RFC 7871, section 7.1.2)
options = [opt]
else:
options = None
self.message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype),
use_edns=use_edns, want_dnssec=want_dnssec, options=options)
self.message.flags |= dns.flags.AD # Ask for validation
self.ok = True
self.i = 0 # request's number on the connection (default to the first)
def to_wire(self):
self.data = self.message.to_wire()
class RequestDoH(Request):
def __init__(self, qname, qtype=rtype, use_edns=edns, want_dnssec=dnssec):
Request.__init__(self, qname, qtype=rtype, use_edns=edns, want_dnssec=dnssec)
self.message.id = 0 # DoH requests that
self.post = False
self.head = False
def parse_response(self, debug=False):
# be careful here: do not store the response in the request
# because we reuse requests for new handles
dns_rcode = -1
try:
response = dns.message.from_wire(self.response)
except dns.message.TrailingJunk: # Not DNS. Should
# not happen for a content type
# application/dns-message but who knows?
response = "ERROR Not proper DNS data, trailing junk"
if debug:
response += " \"%s\"" % response
ok = False
except dns.name.BadLabelType: # Not DNS.
response = "ERROR Not proper DNS data (wrong path in the URL?)"
if debug:
response += " \"%s\"" % response[:100]
ok = False
else:
ok = True
dns_rcode = response.rcode()
return (ok, dns_rcode)
class Connection:
def __init__(self, server, servername=None, connect=None, forceIPv4=False, forceIPv6=False,
dot=dot, verbose=verbose, debug=debug, insecure=insecure):
if dot and not is_valid_hostname(server):
error("DoT requires a host name or IP address, not \"%s\"" % server)
if not dot and not is_valid_url(server):
error("DoH requires a valid HTTPS URL, not \"%s\"" % server)
if forceIPv4 and forceIPv6:
raise CustomException("Force IPv4 *or* IPv6 but not both")
self.dot = dot
self.server = server
self.servername = servername
if self.servername is not None:
self.check = self.servername
else:
self.check = self.server
self.dot = dot
self.verbose = verbose
self.debug = debug
self.insecure = insecure
self.forceIPv4 = forceIPv4
self.forceIPv6 = forceIPv6
self.connect_to = connect
def __str__(self):
return self.server
def create_handle(connection):
def reset_opt_default(handle):
opts = {
pycurl.NOBODY: False,
pycurl.POST: False,
pycurl.POSTFIELDS: '',
pycurl.URL: ''
}
for opt, value in opts.items():
handle.setopt(opt, value)
def prepare(handle, connection, request):
if not connection.multistreams:
handle.reset_opt_default(handle)
if request.post:
handle.setopt(pycurl.POST, True)
handle.setopt(pycurl.POSTFIELDS, request.data)
handle.setopt(pycurl.URL, connection.server)
else:
handle.setopt(pycurl.HTTPGET, True) # automatically sets CURLOPT_NOBODY to 0
if request.head:
handle.setopt(pycurl.NOBODY, True)
dns_req = base64.urlsafe_b64encode(request.data).decode('UTF8').rstrip('=')
handle.setopt(pycurl.URL, connection.server + ("?dns=%s" % dns_req))
handle.setopt(pycurl.HTTPHEADER,
[f"X-Homer: {request.i}", "Accept: application/dns-message", "Content-type: application/dns-message"])
handle.buffer = io.BytesIO()
handle.setopt(pycurl.WRITEDATA, handle.buffer)
handle.request = request
handle = pycurl.Curl()
# Does not work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
handle.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
if connection.debug:
handle.setopt(pycurl.VERBOSE, True)
if connection.insecure:
handle.setopt(pycurl.SSL_VERIFYPEER, False)
handle.setopt(pycurl.SSL_VERIFYHOST, False)
if connection.forceIPv4:
handle.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
if connection.forceIPv6:
handle.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V6)
if connection.connect is not None:
family, repraddress = check_ip_address(connection.connect, dot=False)
handle.setopt(pycurl.CONNECT_TO, [f'::{repraddress}:443',])
handle.setopt(pycurl.HTTPHEADER,
["Accept: application/dns-message", "Content-type: application/dns-message"])
handle.reset_opt_default = reset_opt_default
handle.prepare = prepare
return handle
class ConnectionDoH(Connection):
def __init__(self, server, servername=None, connect=None, forceIPv4=False, forceIPv6=False,
concurrent_streams=concurrent_streams, multistreams=True, verbose=verbose, debug=debug, insecure=insecure):
Connection.__init__(self, server, servername=servername, connect=connect,
forceIPv4=forceIPv4, forceIPv6=forceIPv6, dot=False,
verbose=verbose, debug=debug, insecure=insecure)
self.url = server
self.connect = connect
self.multistreams = multistreams
self.concurrent_streams = concurrent_streams
if self.multistreams:
self.multi = self.create_multi(concurrent_streams)
self.all_handles = []
self.endless = False
self.openconn = True
self.open_connection_multi()
def create_multi(self, streams):
multi = pycurl.CurlMulti()
multi.setopt(pycurl.M_MAX_HOST_CONNECTIONS, 1)
multi.setopt(pycurl.M_MAX_CONCURRENT_STREAMS, streams)
return multi
def open_connection_multi(self):
self.add_handles(request_list, 1)
self.perform_multi()
self.all_handles = [] # reset handles
def end(self):
if not self.multistreams:
self.curl_handle.close()
else:
self.remove_handles()
self.multi.close()
def remove_handles(self):
for h in all_handles:
h.close()
self.multi.remove_handle(h)
def add_handles(self, request_list, n):
start = request_list['start'] % request_list['length']
for i in range(n):
request = request_list['list'][(start + i) % request_list['length']]
handle = create_handle(self)
self.all_handles.append(handle)
handle.prepare(handle, self, request)
self.multi.add_handle(handle)
request_list['start'] += n
def perform_multi(self):
while 1:
ret, num_handles = self.multi.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
while num_handles:
ret = self.multi.select(1.0)
if ret == -1:
continue
while 1:
ret, num_handles = self.multi.perform()
if not self.openconn:
#print("num_handles:", num_handles, end=' ')
done = self.read_info_multi()
if self.endless:
self.add_handles(request_list, done)
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
if not self.openconn:
self.read_info_multi()
def read_info_multi(self):
n, handle_pass, handle_fail = self.multi.info_read()
done = len(handle_pass) + len(handle_fail)
#print("verifier que n=done,", "n:", n, "done:", done)
for handle in handle_pass:
if handle.getinfo(pycurl.RESPONSE_CODE) == 200:
body = handle.buffer.getvalue()
if parse_dnspython or check_rcode:
handle.request.response = body
ok, handle.dns_rcode = handle.request.parse_response()
if not ok:
self.info_thread['err'] += 1
else:
if handle.dns_rcode == dns.rcode.SERVFAIL:
self.info_thread['err'] += 1
self.info_thread['servfail'] += 1
if check_rcode:
if handle.dns_rcode != read_rcode(body):
print(f"FAIL: parsed rcode ({handle.dns_rcode})", end="")
print(f"is different from body rcode ({read_rcode(body)}).")
else:
rcode = body[3] & 15 # this directly extract the rcode value
# from the raw body
# parsing with dnspython would take too
# much time
if rcode == 2:
self.info_thread['err'] += 1
self.info_thread['servfail'] += 1
else:
self.info_thread['err'] += 1
self.info_thread['err'] += len(handle_fail)
self.info_thread['handlefail'] += len(handle_fail)
self.info_thread['recv'] += done
return done
def get_next_domain(line):
name, rtype = 'framagit.org', 'NS'
if line[:-1] == "":
error("Not enough data in %s for the %i tests" % (ifile, tests))
if line.find(' ') == -1:
name = line[:-1]
rtype = 'NS'
else:
(name, rtype) = line.split()
return name, rtype
def create_connection(max_handles, streams):
#print("Create new connection")
try:
conn = ConnectionDoH(url, servername=None, connect=connectTo,
verbose=verbose, debug=debug,
forceIPv4=forceIPv4, forceIPv6=forceIPv6,
concurrent_streams=streams, multistreams=True, insecure=True)
except TimeoutError:
error("timeout")
except ConnectionRefusedError:
error("Connection to server refused")
except ValueError:
error(f'"{url}" not a name or an IP address')
except socket.gaierror:
error(f'Could not resolve "{url}"')
except CustomException as e:
error(e)
#print("connection is ready")
conn.max_handles = max_handles
conn.info_thread = { 'recv': 0, 'err': 0, 'servfail': 0, 'handlefail': 0 } # thread uniquement
conn.info_main = { 'recv': 0, 'err': 0, 'servfail': 0 , 'handlefail': 0} # modifiable par le main uniquement
conn.endless = True
conn.openconn = False
conn.add_handles(request_list, max_handles)
return conn
def usage():
print(f"Usage: {sys.argv[0]} ", end='')
print("[-c CONN] [-s STREAMS] [-h HANDLE] [-l TIME] -f FILE url")
print()
print(" -h print this message and exits")
print(" -c CONN number of open connections (default 5)")
print(" -s STREAMS number of concurrent streams (default 100)")
print(" -m HANDLE maximum number of handles in a curl multi")
print(" -l TIME number of seconds the test should run (default 10)")
print(" -f FILE input file with lines made of a query name and a query type")
print(" url remote server URL")
def output_data(endless=True):
global g_recv, g_err, g_servfail, g_handlefail
tot_recv = tot_err = tot_servfail = tot_handlefail = 0
if endless:
print(f"{time.time() - t0:.3f}s ", end='')
for conn in connections:
recv = conn.info_thread['recv'] - conn.info_main['recv']
err = conn.info_thread['err'] - conn.info_main['err']
servfail = conn.info_thread['servfail'] - conn.info_main['servfail']
handlefail = conn.info_thread['handlefail'] - conn.info_main['handlefail']
tot_recv += recv
tot_err += err
tot_servfail += servfail
tot_handlefail += handlefail
conn.info_main['recv'] = conn.info_thread['recv']
conn.info_main['err'] = conn.info_thread['err']
conn.info_main['servfail'] = conn.info_thread['servfail']
conn.info_main['handlefail'] = conn.info_thread['handlefail']
# print(f"{conn.index}: ", end='')
# print(f"{ok},{(err+servfail) / ok:.2f}% ", end='')
g_recv += tot_recv
g_err += tot_err
g_servfail += tot_servfail
g_handlefail += tot_handlefail
g_recv_array.append(tot_recv)
if endless:
print(f"rps: {tot_recv}", end=" , ")
print(f"err: {tot_err / tot_recv * 100 if tot_recv else 0:.2f}%", end=" ")
print(f"({tot_servfail / tot_err * 100 if tot_err else 0:.2f}%", end=" , ")
print(f"{tot_handlefail / tot_err * 100 if tot_err else 0:.2f}%)", end=" , ")
print(f"total_recv: {g_recv}")
else:
print(f"total recv : {g_recv}")
print(f"total err : {g_err} {g_err / g_recv * 100 if g_recv else 0:.2f}% (SERVFAIL: {g_servfail / g_err * 100 if g_err else 0:.2f}%, HANDLEFAIL: {g_handlefail / g_err * 100 if g_handlefail else 0:.2f}%)")
print(f"max rps : {max(g_recv_array)}")
print(f"avg rps : {sum(g_recv_array) // len(g_recv_array)}")
# Main program
connections = list()
threads = list()
try:
optlist, args = getopt.getopt (sys.argv[1:], "hc:l:f:m:s:",
["help"])
for option, value in optlist:
if option == "--help" or option == "-h":
usage()
sys.exit(0)
elif option == "-c":
n_conn = int(value)
elif option == "-l":
try:
total_time = int(value)
except Exception:
print("The time should be an integer")
sys.exit(1)
elif option == "-m":
max_handles = int(value)
elif option == "-s":
concurrent_streams = int(value)
elif option == "-f":
ifile = value
except getopt.error as reason:
usage()
sys.exit(1)
if ifile is None:
print("Input file is missing (use -f FILE)")
sys.exit(1)
if len(args) < 1:
print("Missing URL")
sys.exit(1)
url = args[0]
# generate a list of requests used to generate all transfers
# start is the number of created request
# to get the index of the next request to create : start % length
request_list = { 'length': 0, 'start': 0, 'list': [] }
if ifile is None:
print("need a input file")
sys.exit(1)
with open(ifile) as f:
i = 0
for line in f:
qname, rtype = get_next_domain(line)
request = RequestDoH(qname, rtype, use_edns=False, want_dnssec=False)
request.i = i
request.head = False
request.post = False
request.to_wire()
request_list['list'].append(request)
i += 1
request_list['length'] = i
f.closed
print(f"\
Running for {total_time} seconds with {n_conn} connections, \
{concurrent_streams} concurrent streams per connection, \
up to {max_handles} handles per multi_handle.")
for i in range(n_conn):
conn = create_connection(max_handles, concurrent_streams)
conn.index = i
connections.append(conn)
# create a thread for each connection
t = threading.Thread(target=ConnectionDoH.perform_multi, args=(conn, ), daemon=True)
threads.append(t)
t0 = time.time()
for t in threads:
t.start()
for i in range(total_time):
time.sleep(1)
output_data()
for i in range(len(connections)):
connections[i].endless = False
#threads[i].join()
print()
print("waiting 3 seconds for finishing transfers...")
time.sleep(3)
output_data(endless=False)
sys.exit(0)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment