Commit f4edcb0c authored by Alexandre's avatar Alexandre
Browse files

[DoT] Refactor with a connect function

parent 85a93bcd
......@@ -344,54 +344,58 @@ class ConnectionDoT(Connection):
signal.signal(signal.SIGALRM, timeout_connection)
connection_success = False
for addrinfo in addrinfo_set:
signal.alarm(TIMEOUT_CONN)
self.hasher = hashlib.sha256()
self.sock = socket.socket(addrinfo[1], socket.SOCK_STREAM)
self.addr = addrinfo[0]
if self.verbose:
print("Connecting to %s ..." % str(self.addr))
# With typical DoT servers, we *must* use TLS 1.2 (otherwise,
# do_handshake fails with "OpenSSL.SSL.SysCallError: (-1, 'Unexpected
# EOF')" Typical HTTP servers are more lax.
self.context = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
if self.insecure:
self.context.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda *x: True)
else:
self.context.set_default_verify_paths()
self.context.set_verify_depth(4) # Seems ignored
self.context.set_verify(OpenSSL.SSL.VERIFY_PEER | OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT | \
OpenSSL.SSL.VERIFY_CLIENT_ONCE,
lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
self.session = OpenSSL.SSL.Connection(self.context, self.sock)
self.session.set_tlsext_host_name(canonicalize(self.check).encode()) # Server Name Indication (SNI)
try:
self.session.connect((self.addr))
# TODO We may here have exceptions such as OpenSSL.SSL.ZeroReturnError
self.session.do_handshake()
except TimeoutConnectionError:
continue
self.cert = self.session.get_peer_certificate()
# RFC 7858, section 4.2 and appendix A
self.publickey = self.cert.get_pubkey()
if verbose:
print("Certificate #%x for \"%s\", delivered by \"%s\"" % \
(self.cert.get_serial_number(),
self.cert.get_subject().commonName,
self.cert.get_issuer().commonName))
self.hasher.update(OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1,
self.publickey))
self.digest = self.hasher.digest()
print("Public key is pin-sha256=\"%s\"" % \
base64.standard_b64encode(self.digest).decode())
if not insecure:
valid = validate_hostname(self.check, self.cert)
if not valid:
error("Certificate error: \"%s\" is not in the certificate" % (self.check))
connection_success = True
break
if self.connect(addrinfo[0], addrinfo[1]):
connection_success = True
break
if not connection_success:
error(f'Could not connect to "{server}"')
def connect(self, addr, sock_family):
signal.alarm(TIMEOUT_CONN)
self.addr = addr
self.sock = socket.socket(sock_family, socket.SOCK_STREAM)
if self.verbose:
print("Connecting to %s ..." % str(self.addr))
# With typical DoT servers, we *must* use TLS 1.2 (otherwise,
# do_handshake fails with "OpenSSL.SSL.SysCallError: (-1, 'Unexpected
# EOF')" Typical HTTP servers are more lax.
self.context = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
if self.insecure:
self.context.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda *x: True)
else:
self.context.set_default_verify_paths()
self.context.set_verify_depth(4) # Seems ignored
self.context.set_verify(OpenSSL.SSL.VERIFY_PEER | OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT | \
OpenSSL.SSL.VERIFY_CLIENT_ONCE,
lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
self.session = OpenSSL.SSL.Connection(self.context, self.sock)
self.session.set_tlsext_host_name(canonicalize(self.check).encode()) # Server Name Indication (SNI)
try:
self.session.connect((self.addr))
# TODO We may here have exceptions such as OpenSSL.SSL.ZeroReturnError
self.session.do_handshake()
except TimeoutConnectionError:
return False
self.cert = self.session.get_peer_certificate()
# RFC 7858, section 4.2 and appendix A
self.publickey = self.cert.get_pubkey()
if verbose:
print("Certificate #%x for \"%s\", delivered by \"%s\"" % \
(self.cert.get_serial_number(),
self.cert.get_subject().commonName,
self.cert.get_issuer().commonName))
self.hasher.update(OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1,
self.publickey))
self.digest = self.hasher.digest()
print("Public key is pin-sha256=\"%s\"" % \
base64.standard_b64encode(self.digest).decode())
if not insecure:
valid = validate_hostname(self.check, self.cert)
if not valid:
error("Certificate error: \"%s\" is not in the certificate" % (self.check))
signal.alarm(0)
return True
def end(self):
self.session.shutdown()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment