Commit e16103ce authored by Alexandre's avatar Alexandre
Browse files

Prepare code for --check option

parent ce9473e6
......@@ -186,10 +186,13 @@ class RequestDoH:
self.message.id = 0 # DoH requests that
self.message.flags |= dns.flags.AD # Ask for validation
self.data = self.message.to_wire()
self.head = False
def create_handle(self, curl_opt):
self.handle = pycurl.Curl()
for opt, value in curl_opt.items():
if opt == pycurl.NOBODY:
self.head = True
try:
self.handle.setopt(opt, value)
except AttributeError:
......@@ -220,7 +223,7 @@ class RequestDoH:
self.response = "Content type of the response (\"%s\") invalid" % ctype
ok = False
else:
if not head:
if not self.head:
try:
self.response = dns.message.from_wire(self.body)
except dns.message.TrailingJunk: # Not DNS. Should
......@@ -243,6 +246,7 @@ class RequestDoH:
self.response = "[No details]"
else:
self.response = self.body
return ok
class Connection:
......@@ -404,6 +408,9 @@ class ConnectionDoH(Connection):
self.post = post
self.head = head
self.url = server
self.connect = connect
def reset_conn_opt(self):
http_header = ["Content-type: application/dns-message"]
self.curl_opt = {}
self.curl_opt[pycurl.HTTP_VERSION] = pycurl.CURL_HTTP_VERSION_2 # Does not
......@@ -419,8 +426,8 @@ class ConnectionDoH(Connection):
self.curl_opt[pycurl.IPRESOLVE] = pycurl.IPRESOLVE_V4
if forceIPv6:
self.curl_opt[pycurl.IPRESOLVE] = pycurl.IPRESOLVE_V6
if connect is not None:
self.check_ip_address(connect)
if self.connect is not None:
self.check_ip_address(self.connect)
self.curl_opt[pycurl.CONNECT_TO] = [f'::{self.repraddress}:443',]
def end(self):
......@@ -442,6 +449,7 @@ class ConnectionDoH(Connection):
self.curl_opt[pycurl.NOBODY] = True
def do_test(self, qname, qtype=rtype):
self.reset_conn_opt()
if self.post:
self.prepare_test_post(qname, qtype)
elif self.head:
......@@ -454,6 +462,19 @@ class ConnectionDoH(Connection):
self.request.check_response()
return (self.request.rcode, self.request.response, self.request.body_size)
def get_next_domain(input_file):
name, rtype = 'framagit.org', 'AAAA'
line = input_file.readline()
if line[:-1] == "":
error("Not enough data in %s for the %i tests" % (ifile, tests))
if line.find(' ') == -1:
name = line[:-1]
rtype = 'AAAA'
else:
(name, rtype) = line.split()
return name, rtype
# Main program
me = os.path.basename(sys.argv[0])
monitoring = (me == "check_doh" or me == "check_dot")
......@@ -609,16 +630,10 @@ for i in range (0, tests):
if tests > 1:
print("\nTest %i" % i)
if ifile is not None:
line = input.readline()
if line[:-1] == "":
error("Not enough data in %s for the %i tests" % (ifile, tests))
if line.find(' ') == -1:
name = line[:-1]
rtype = 'AAAA'
else:
(name, rtype) = line.split()
name, rtype = get_next_domain(input)
(rcode, msg, size) = conn.do_test(name, rtype)
ok = conn.print_result(rcode, msg, size)
if not conn.print_result(rcode, msg, size):
ok = False
if tests > 1 and i == 0:
start2 = time.time()
if delay is not None:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment