Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
DNS testing tools
Remoh
Commits
bbffbc9d
Commit
bbffbc9d
authored
Mar 05, 2020
by
Alexandre
Browse files
Separate connection class into DoT & DoH classes
parent
d3a25ecf
Changes
1
Hide whitespace changes
Inline
Side-by-side
homer.py
View file @
bbffbc9d
...
...
@@ -180,118 +180,132 @@ class Connection:
else
:
self
.
check
=
self
.
server
self
.
dot
=
dot
if
not
self
.
dot
:
self
.
post
=
post
self
.
head
=
head
if
self
.
dot
:
self
.
hasher
=
hashlib
.
sha256
()
self
.
verbose
=
verbose
self
.
insecure
=
insecure
if
self
.
dot
:
if
forceIPv4
and
forceIPv6
:
raise
Exception
(
"Force IPv4 *or* IPv6 but not both"
)
(
is_addr
,
family
)
=
is_valid_ip_address
(
self
.
server
)
if
forceIPv4
:
if
is_addr
and
family
==
6
:
raise
Exception
(
"You cannot force IPv4 with a litteral IPv6 address (%s)"
%
self
.
server
)
family
=
socket
.
AF_INET
elif
forceIPv6
:
if
is_addr
and
family
==
4
:
raise
Exception
(
"You cannot force IPv6 with a litteral IPv4 address (%s)"
%
self
.
server
)
family
=
socket
.
AF_INET6
else
:
family
=
0
addrinfo
=
socket
.
getaddrinfo
(
server
,
853
,
family
)
# May be loop over the results of getaddrinfo, to test all
# the IP addresses? See #13.
self
.
sock
=
socket
.
socket
(
addrinfo
[
0
][
0
],
socket
.
SOCK_STREAM
)
self
.
addr
=
addrinfo
[
0
][
4
]
if
self
.
verbose
:
print
(
"Connecting to %s ..."
%
str
(
self
.
addr
))
# With typical DoT servers, we *must* use TLS 1.2 (otherwise,
# do_handshake fails with "OpenSSL.SSL.SysCallError: (-1, 'Unexpected
# EOF')" Typical HTTP servers are more lax.
self
.
context
=
OpenSSL
.
SSL
.
Context
(
OpenSSL
.
SSL
.
TLSv1_2_METHOD
)
if
self
.
insecure
:
self
.
context
.
set_verify
(
OpenSSL
.
SSL
.
VERIFY_NONE
,
lambda
*
x
:
True
)
else
:
self
.
context
.
set_default_verify_paths
()
self
.
context
.
set_verify_depth
(
4
)
# Seems ignored
self
.
context
.
set_verify
(
OpenSSL
.
SSL
.
VERIFY_PEER
|
OpenSSL
.
SSL
.
VERIFY_FAIL_IF_NO_PEER_CERT
|
\
OpenSSL
.
SSL
.
VERIFY_CLIENT_ONCE
,
lambda
conn
,
cert
,
errno
,
depth
,
preverify_ok
:
preverify_ok
)
self
.
session
=
OpenSSL
.
SSL
.
Connection
(
self
.
context
,
self
.
sock
)
self
.
session
.
set_tlsext_host_name
(
canonicalize
(
check
).
encode
())
# Server Name Indication (SNI)
self
.
session
.
connect
((
self
.
addr
))
# TODO We may here have exceptions such as OpenSSL.SSL.ZeroReturnError
self
.
session
.
do_handshake
()
self
.
cert
=
self
.
session
.
get_peer_certificate
()
# RFC 7858, section 4.2 and appendix A
self
.
publickey
=
self
.
cert
.
get_pubkey
()
if
verbose
:
print
(
"Certificate #%x for
\"
%s
\"
, delivered by
\"
%s
\"
"
%
\
(
self
.
cert
.
get_serial_number
(),
self
.
cert
.
get_subject
().
commonName
,
self
.
cert
.
get_issuer
().
commonName
))
self
.
hasher
.
update
(
OpenSSL
.
crypto
.
dump_publickey
(
OpenSSL
.
crypto
.
FILETYPE_ASN1
,
self
.
publickey
))
self
.
digest
=
self
.
hasher
.
digest
()
print
(
"Public key is pin-sha256=
\"
%s
\"
"
%
\
base64
.
standard_b64encode
(
self
.
digest
).
decode
())
if
not
insecure
:
valid
=
validate_hostname
(
check
,
self
.
cert
)
if
not
valid
:
error
(
"Certificate error:
\"
%s
\"
is not in the certificate"
%
(
check
))
else
:
# DoH
self
.
curl
=
pycurl
.
Curl
()
self
.
url
=
server
# Does not work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
self
.
curl
.
setopt
(
pycurl
.
HTTP_VERSION
,
pycurl
.
CURL_HTTP_VERSION_2
)
self
.
curl
.
setopt
(
pycurl
.
HTTPHEADER
,
[
"Content-type: application/dns-message"
])
if
self
.
verbose
:
self
.
curl
.
setopt
(
self
.
curl
.
VERBOSE
,
True
)
if
self
.
insecure
:
self
.
curl
.
setopt
(
pycurl
.
SSL_VERIFYPEER
,
False
)
self
.
curl
.
setopt
(
pycurl
.
SSL_VERIFYHOST
,
False
)
if
self
.
head
:
self
.
curl
.
setopt
(
pycurl
.
NOBODY
,
True
)
if
self
.
post
:
self
.
curl
.
setopt
(
pycurl
.
POST
,
True
)
if
forceIPv4
and
forceIPv6
:
raise
Exception
(
"Force IPv4 *or* IPv6 but not both"
)
if
forceIPv4
:
self
.
curl
.
setopt
(
pycurl
.
IPRESOLVE
,
pycurl
.
IPRESOLVE_V4
)
if
forceIPv6
:
self
.
curl
.
setopt
(
pycurl
.
IPRESOLVE
,
pycurl
.
IPRESOLVE_V6
)
if
connect
is
not
None
:
try
:
binaddress
=
netaddr
.
IPAddress
(
connect
)
if
binaddress
.
version
==
4
:
if
forceIPv6
:
raise
Exception
(
"You cannot force the use of IPv6 with a litteral IPv4 address (%s)"
%
connect
)
repraddress
=
connect
elif
binaddress
.
version
==
6
:
if
forceIPv4
:
raise
Exception
(
"You cannot force the use of IPv6 with a litteral IPv4 address (%s)"
%
connect
)
repraddress
=
"[%s]"
%
connect
else
:
raise
Exception
(
"%s is not IPv4 and not IPv6"
%
connect
)
except
netaddr
.
core
.
AddrFormatError
:
repraddress
=
connect
try
:
self
.
curl
.
setopt
(
pycurl
.
CONNECT_TO
,
[
"::%s:"
%
repraddress
,])
except
AttributeError
:
pass
# Probably an old version of libcurl, without CONNECT_TO. It appeared with 7.49.0
def
__str__
(
self
):
return
self
.
server
class
ConnectionDoT
(
Connection
):
def
__init__
(
self
,
server
,
servername
=
None
,
connect
=
None
,
forceIPv4
=
False
,
forceIPv6
=
False
,
dot
=
False
,
verbose
=
verbose
,
insecure
=
insecure
,
post
=
post
,
head
=
head
):
Connection
.
__init__
(
self
,
server
,
servername
=
servername
,
connect
=
connect
,
forceIPv4
=
forceIPv4
,
forceIPv6
=
forceIPv6
,
dot
=
dot
,
verbose
=
verbose
,
insecure
=
insecure
,
post
=
post
,
head
=
head
)
if
forceIPv4
and
forceIPv6
:
raise
Exception
(
"Force IPv4 *or* IPv6 but not both"
)
(
is_addr
,
family
)
=
is_valid_ip_address
(
self
.
server
)
if
forceIPv4
:
if
is_addr
and
family
==
6
:
raise
Exception
(
"You cannot force IPv4 with a litteral IPv6 address (%s)"
%
self
.
server
)
family
=
socket
.
AF_INET
elif
forceIPv6
:
if
is_addr
and
family
==
4
:
raise
Exception
(
"You cannot force IPv6 with a litteral IPv4 address (%s)"
%
self
.
server
)
family
=
socket
.
AF_INET6
else
:
family
=
0
self
.
hasher
=
hashlib
.
sha256
()
addrinfo
=
socket
.
getaddrinfo
(
server
,
853
,
family
)
# May be loop over the results of getaddrinfo, to test all
# the IP addresses? See #13.
self
.
sock
=
socket
.
socket
(
addrinfo
[
0
][
0
],
socket
.
SOCK_STREAM
)
self
.
addr
=
addrinfo
[
0
][
4
]
if
self
.
verbose
:
print
(
"Connecting to %s ..."
%
str
(
self
.
addr
))
# With typical DoT servers, we *must* use TLS 1.2 (otherwise,
# do_handshake fails with "OpenSSL.SSL.SysCallError: (-1, 'Unexpected
# EOF')" Typical HTTP servers are more lax.
self
.
context
=
OpenSSL
.
SSL
.
Context
(
OpenSSL
.
SSL
.
TLSv1_2_METHOD
)
if
self
.
insecure
:
self
.
context
.
set_verify
(
OpenSSL
.
SSL
.
VERIFY_NONE
,
lambda
*
x
:
True
)
else
:
self
.
context
.
set_default_verify_paths
()
self
.
context
.
set_verify_depth
(
4
)
# Seems ignored
self
.
context
.
set_verify
(
OpenSSL
.
SSL
.
VERIFY_PEER
|
OpenSSL
.
SSL
.
VERIFY_FAIL_IF_NO_PEER_CERT
|
\
OpenSSL
.
SSL
.
VERIFY_CLIENT_ONCE
,
lambda
conn
,
cert
,
errno
,
depth
,
preverify_ok
:
preverify_ok
)
self
.
session
=
OpenSSL
.
SSL
.
Connection
(
self
.
context
,
self
.
sock
)
self
.
session
.
set_tlsext_host_name
(
canonicalize
(
self
.
check
).
encode
())
# Server Name Indication (SNI)
self
.
session
.
connect
((
self
.
addr
))
# TODO We may here have exceptions such as OpenSSL.SSL.ZeroReturnError
self
.
session
.
do_handshake
()
self
.
cert
=
self
.
session
.
get_peer_certificate
()
# RFC 7858, section 4.2 and appendix A
self
.
publickey
=
self
.
cert
.
get_pubkey
()
if
verbose
:
print
(
"Certificate #%x for
\"
%s
\"
, delivered by
\"
%s
\"
"
%
\
(
self
.
cert
.
get_serial_number
(),
self
.
cert
.
get_subject
().
commonName
,
self
.
cert
.
get_issuer
().
commonName
))
self
.
hasher
.
update
(
OpenSSL
.
crypto
.
dump_publickey
(
OpenSSL
.
crypto
.
FILETYPE_ASN1
,
self
.
publickey
))
self
.
digest
=
self
.
hasher
.
digest
()
print
(
"Public key is pin-sha256=
\"
%s
\"
"
%
\
base64
.
standard_b64encode
(
self
.
digest
).
decode
())
if
not
insecure
:
valid
=
validate_hostname
(
self
.
check
,
self
.
cert
)
if
not
valid
:
error
(
"Certificate error:
\"
%s
\"
is not in the certificate"
%
(
self
.
check
))
def
end
(
self
):
if
self
.
dot
:
self
.
session
.
shutdown
()
self
.
session
.
close
()
else
:
# DoH
self
.
curl
.
close
()
self
.
session
.
shutdown
()
self
.
session
.
close
()
class
ConnectionDoH
(
Connection
):
def
__init__
(
self
,
server
,
servername
=
None
,
connect
=
None
,
forceIPv4
=
False
,
forceIPv6
=
False
,
dot
=
False
,
verbose
=
verbose
,
insecure
=
insecure
,
post
=
post
,
head
=
head
):
Connection
.
__init__
(
self
,
server
,
servername
=
servername
,
connect
=
connect
,
forceIPv4
=
forceIPv4
,
forceIPv6
=
forceIPv6
,
dot
=
dot
,
verbose
=
verbose
,
insecure
=
insecure
,
post
=
post
,
head
=
head
)
self
.
post
=
post
self
.
head
=
head
self
.
curl
=
pycurl
.
Curl
()
self
.
url
=
server
# Does not work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
self
.
curl
.
setopt
(
pycurl
.
HTTP_VERSION
,
pycurl
.
CURL_HTTP_VERSION_2
)
self
.
curl
.
setopt
(
pycurl
.
HTTPHEADER
,
[
"Content-type: application/dns-message"
])
if
self
.
verbose
:
self
.
curl
.
setopt
(
self
.
curl
.
VERBOSE
,
True
)
if
self
.
insecure
:
self
.
curl
.
setopt
(
pycurl
.
SSL_VERIFYPEER
,
False
)
self
.
curl
.
setopt
(
pycurl
.
SSL_VERIFYHOST
,
False
)
if
self
.
head
:
self
.
curl
.
setopt
(
pycurl
.
NOBODY
,
True
)
if
self
.
post
:
self
.
curl
.
setopt
(
pycurl
.
POST
,
True
)
if
forceIPv4
and
forceIPv6
:
raise
Exception
(
"Force IPv4 *or* IPv6 but not both"
)
if
forceIPv4
:
self
.
curl
.
setopt
(
pycurl
.
IPRESOLVE
,
pycurl
.
IPRESOLVE_V4
)
if
forceIPv6
:
self
.
curl
.
setopt
(
pycurl
.
IPRESOLVE
,
pycurl
.
IPRESOLVE_V6
)
if
connect
is
not
None
:
try
:
binaddress
=
netaddr
.
IPAddress
(
connect
)
if
binaddress
.
version
==
4
:
if
forceIPv6
:
raise
Exception
(
"You cannot force the use of IPv6 with a litteral IPv4 address (%s)"
%
connect
)
repraddress
=
connect
elif
binaddress
.
version
==
6
:
if
forceIPv4
:
raise
Exception
(
"You cannot force the use of IPv6 with a litteral IPv4 address (%s)"
%
connect
)
repraddress
=
"[%s]"
%
connect
else
:
raise
Exception
(
"%s is not IPv4 and not IPv6"
%
connect
)
except
netaddr
.
core
.
AddrFormatError
:
repraddress
=
connect
try
:
self
.
curl
.
setopt
(
pycurl
.
CONNECT_TO
,
[
"::%s:"
%
repraddress
,])
except
AttributeError
:
pass
# Probably an old version of libcurl, without CONNECT_TO. It appeared with 7.49.0
def
end
(
self
):
self
.
curl
.
close
()
# Routine doing one actual test. Returns a tuple, first member is a
# result (boolean indicating success for DoT, HTTP status code for
# DoH), second member is a DNS message (or a string if there is an
...
...
@@ -489,9 +503,14 @@ try:
extracheck
=
vhostname
else
:
extracheck
=
None
conn
=
Connection
(
url
,
dot
=
dot
,
servername
=
extracheck
,
connect
=
connectTo
,
verbose
=
verbose
,
forceIPv4
=
forceIPv4
,
forceIPv6
=
forceIPv6
,
insecure
=
insecure
,
post
=
post
,
head
=
head
)
if
dot
:
conn
=
ConnectionDoT
(
url
,
dot
=
dot
,
servername
=
extracheck
,
connect
=
connectTo
,
verbose
=
verbose
,
forceIPv4
=
forceIPv4
,
forceIPv6
=
forceIPv6
,
insecure
=
insecure
,
post
=
post
,
head
=
head
)
else
:
conn
=
ConnectionDoH
(
url
,
dot
=
dot
,
servername
=
extracheck
,
connect
=
connectTo
,
verbose
=
verbose
,
forceIPv4
=
forceIPv4
,
forceIPv6
=
forceIPv6
,
insecure
=
insecure
,
post
=
post
,
head
=
head
)
except
TimeoutError
:
error
(
"timeout"
)
except
ConnectionRefusedError
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment