Commit 4d0876a9 authored by Alexandre's avatar Alexandre
Browse files

Prepare code for independant curl handle

parent 247f007c
......@@ -173,6 +173,25 @@ def validate_hostname(hostname, cert):
return True
return False
def check_ip_address(addr, dot=dot):
repraddress = addr
(is_addr, family) = is_valid_ip_address(addr)
if not is_addr and not dot:
raise CustomException("%s is not IPv4 and not IPv6" % addr)
if forceIPv4 and family == 6:
raise CustomException("You cannot force IPv4 with a litteral IPv6 address (%s)" % addr)
elif forceIPv6 and family == 4:
raise CustomException("You cannot force IPv6 with a litteral IPv4 address (%s)" % addr)
if forceIPv4 or family == 4:
family = socket.AF_INET
repraddress = addr
elif forceIPv6 or family == 6:
family = socket.AF_INET6
repraddress = f'[{addr}]'
else:
family = 0
return (family, repraddress)
def timeout_connection(signum, frame):
raise TimeoutConnectionError('Connection timeout')
......@@ -262,23 +281,6 @@ class Connection:
def __str__(self):
return self.server
def check_ip_address(self, addr):
(is_addr, self.family) = is_valid_ip_address(addr)
if not is_addr and not self.dot:
raise CustomException("%s is not IPv4 and not IPv6" % addr)
if forceIPv4 and self.family == 6:
raise CustomException("You cannot force IPv4 with a litteral IPv6 address (%s)" % addr)
elif forceIPv6 and self.family == 4:
raise CustomException("You cannot force IPv6 with a litteral IPv4 address (%s)" % addr)
if forceIPv4 or self.family == 4:
self.family = socket.AF_INET
self.repraddress = addr
elif forceIPv6 or self.family == 6:
self.family = socket.AF_INET6
self.repraddress = f'[{addr}]'
else:
self.family = 0
def do_test(self, qname, qtype=rtype, synchronous=True):
# Routine doing one actual test. Returns a Request object
pass
......@@ -290,7 +292,7 @@ class ConnectionDoT(Connection):
Connection.__init__(self, server, servername=servername, connect=connect,
forceIPv4=forceIPv4, forceIPv6=forceIPv6, dot=True,
verbose=verbose, insecure=insecure)
self.check_ip_address(self.server)
self.family, self.repraddress = check_ip_address(self.server, dot=True)
addrinfo_list = socket.getaddrinfo(self.server, 853, self.family)
addrinfo_set = { (addrinfo[4], addrinfo[0]) for addrinfo in addrinfo_list }
signal.signal(signal.SIGALRM, timeout_connection)
......@@ -416,7 +418,7 @@ class ConnectionDoH(Connection):
if forceIPv6:
self.curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V6)
if self.connect is not None:
self.check_ip_address(self.connect)
self.family, self.repraddress = check_ip_address(self.connect, dot=False)
self.curl.setopt(pycurl.CONNECT_TO, [f'::{self.repraddress}:443',])
self.curl.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message"])
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment