Commit 2441c466 authored by Alexandre's avatar Alexandre
Browse files

Remove duplicate exceptions when forcing IPv4 and IPv6

parent a5c2e4fd
......@@ -173,6 +173,8 @@ class Connection:
error("DoT requires a host name or IP address, not \"%s\"" % server)
if not dot and not is_valid_url(url):
error("DoH requires a valid HTTPS URL, not \"%s\"" % server)
if forceIPv4 and forceIPv6:
raise Exception("Force IPv4 *or* IPv6 but not both")
self.server = server
self.servername = servername
if self.servername is not None:
......@@ -186,6 +188,23 @@ class Connection:
def __str__(self):
return self.server
def check_ip_address(self, addr):
(is_addr, self.family) = is_valid_ip_address(addr)
if not is_addr and not self.dot:
raise Exception("%s is not IPv4 and not IPv6" % addr)
if forceIPv4 and self.family == 6:
raise Exception("You cannot force IPv4 with a litteral IPv6 address (%s)" % addr)
elif forceIPv6 and self.family == 4:
raise Exception("You cannot force IPv6 with a litteral IPv4 address (%s)" % addr)
if forceIPv4 or self.family == 4:
self.family = socket.AF_INET
self.repraddress = addr
elif forceIPv6 or self.family == 6:
self.family = socket.AF_INET6
self.repraddress = f'[{addr}]'
else:
self.family = 0
class ConnectionDoT(Connection):
def __init__(self, server, servername=None, connect=None, forceIPv4=False, forceIPv6=False,
......@@ -193,21 +212,9 @@ class ConnectionDoT(Connection):
Connection.__init__(self, server, servername=servername, connect=connect,
forceIPv4=forceIPv4, forceIPv6=forceIPv6, dot=dot,
verbose=verbose, insecure=insecure, post=post, head=head)
if forceIPv4 and forceIPv6:
raise Exception("Force IPv4 *or* IPv6 but not both")
(is_addr, family) = is_valid_ip_address(self.server)
if forceIPv4:
if is_addr and family == 6:
raise Exception("You cannot force IPv4 with a litteral IPv6 address (%s)" % self.server)
family = socket.AF_INET
elif forceIPv6:
if is_addr and family == 4:
raise Exception("You cannot force IPv6 with a litteral IPv4 address (%s)" % self.server)
family = socket.AF_INET6
else:
family = 0
self.check_ip_address(self.server)
self.hasher = hashlib.sha256()
addrinfo = socket.getaddrinfo(server, 853, family)
addrinfo = socket.getaddrinfo(server, 853, self.family)
# May be loop over the results of getaddrinfo, to test all
# the IP addresses? See #13.
self.sock = socket.socket(addrinfo[0][0], socket.SOCK_STREAM)
......@@ -277,29 +284,14 @@ class ConnectionDoH(Connection):
self.curl.setopt(pycurl.NOBODY, True)
if self.post:
self.curl.setopt(pycurl.POST, True)
if forceIPv4 and forceIPv6:
raise Exception("Force IPv4 *or* IPv6 but not both")
if forceIPv4:
self.curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
if forceIPv6:
self.curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V6)
if connect is not None:
self.check_ip_address(connect)
try:
binaddress = netaddr.IPAddress(connect)
if binaddress.version == 4:
if forceIPv6:
raise Exception("You cannot force the use of IPv6 with a litteral IPv4 address (%s)" % connect)
repraddress = connect
elif binaddress.version == 6:
if forceIPv4:
raise Exception("You cannot force the use of IPv6 with a litteral IPv4 address (%s)" % connect)
repraddress = "[%s]" % connect
else:
raise Exception("%s is not IPv4 and not IPv6" % connect)
except netaddr.core.AddrFormatError:
repraddress = connect
try:
self.curl.setopt(pycurl.CONNECT_TO, ["::%s:" % repraddress,])
self.curl.setopt(pycurl.CONNECT_TO, ["::%s:" % self.repraddress,])
except AttributeError:
pass # Probably an old version of libcurl, without CONNECT_TO. It appeared with 7.49.0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment