homer.py 18.5 KB
Newer Older
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
1
2
#!/usr/bin/env python3

3
4
5
6
7
# Homer is a DoH (DNS-over-HTTPS) and DoT (DNS-over-TLS) client. Its
# main purpose is to test DoH and DoT resolvers. Reference site is
# <https://framagit.org/bortzmeyer/homer/> See author, documentation,
# etc, there, or in the README.md included with the distribution.

Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
8
9
10
11
12
13
# http://pycurl.io/docs/latest
import pycurl

# http://www.dnspython.org/
import dns.message

14
15
16
# https://github.com/drkjam/netaddr/
import netaddr

17
18
19
20
21
# Octobre 2019: the Python GnuTLS bindings don't work with Python 3. So we use OpenSSL.
# https://www.pyopenssl.org/
# https://pyopenssl.readthedocs.io/
import OpenSSL

Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
22
23
24
25
26
27
import io
import sys
import base64
import getopt
import urllib.parse
import time
28
29
import socket
import ctypes
30
import re
31
import os.path
32
33
import hashlib
import base64
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
34

35
# Values that can be changed from the command line
36
dot = False # DoH by default
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
37
38
verbose = False
insecure = False
39
post = False
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
40
41
head = False
rtype = 'AAAA'
42
vhostname = None
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
43
tests = 1 # Number of repeated tests
44
ifile = None # Input file
45
delay = None
46
47
48
# Monitoring plugin only:
host = None
path = None
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
49

50
51
52
# Do not change these
re_host = re.compile(r'^([0-9a-z][0-9a-z-\.]*)|([0-9:]+)|([0-9\.])$')

53
54
55
56
57
58
59
# For the monitoring plugin
STATE_OK = 0
STATE_WARNING = 1
STATE_CRITICAL = 2
STATE_UNKNOWN = 3
STATE_DEPENDENT = 4

Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
60
61
62
def error(msg=None):
    if msg is None:
        msg = "Unknown error"
63
64
65
66
67
68
    if monitoring:
        print("%s: %s" % (url, msg))
        sys.exit(STATE_CRITICAL)         
    else:
        print(msg,file=sys.stderr)
        sys.exit(1)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
69
70
71
72
    
def usage(msg=None):
    if msg:
        print(msg,file=sys.stderr)
73
74
    print("Usage: %s [--dot] url-or-servername domain-name [DNS type]" % sys.argv[0], file=sys.stderr)
    print("See the README.md for more details.", file=sys.stderr)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
75

76
def is_valid_hostname(name):
77
    name = canonicalize(name)
78
    return re_host.search(name)
79

80
81
82
83
84
85
86
87
88
def canonicalize(hostname):
    result = hostname.lower()
    # TODO handle properly the case where it fails with UnicodeError
    # (two consecutive dots for instance) to get a custom exception
    result = result.encode('idna').decode()
    if result[len(result)-1] == '.':
        result = result[:-1]
    return result

89
90
91
92
93
94
95
def is_valid_ip_address(addr):
    try:
        baddr = netaddr.IPAddress(addr)
    except netaddr.core.AddrFormatError:
        return False
    return True

96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def is_valid_url(url):
  try:
    result = urllib.parse.urlparse(url) # A very poor validation, many
    # errors (for instance whitespaces, IPv6 address litterals without
    # brackets...) are ignored.
    return (result.scheme=="https" and result.netloc != "")
  except ValueError:
    return False

def get_certificate_san(x509cert):
    san = ""
    ext_count = x509cert.get_extension_count()
    for i in range(0, ext_count):
        ext = x509cert.get_extension(i)
        if "subjectAltName" in str(ext.get_short_name()):
            san = str(ext)
    return san

114
115
116
117
118
119
120
121
122
123
124
# Try one possible name. Names must be already canonicalized.
def match_hostname(hostname, possibleMatch):
    if possibleMatch.startswith("*."): # Wildcard
        base = possibleMatch[1:] # Skip the star
        # RFC 6125 says that we MAY accept left-most labels with
        # wildcards included (foo*bar). We don't do it here.
        try:
            (first, rest) = hostname.split(".", maxsplit=1)
        except ValueError: # One-label name
            rest = hostname
        if rest == base[1:]:
125
            return True
126
        if hostname == base[1:]:
127
            return True
128
129
130
131
132
133
134
135
136
        return False
    else:
        return hostname == possibleMatch

# Try all the names in the certificate
def validate_hostname(hostname, cert):
    # Complete specification is in RFC 6125. It is long and
    # complicated and I'm not sure we do it perfectly.
    is_addr = is_valid_ip_address(hostname)
137
    hostname = canonicalize(hostname)
138
    for alt_name in get_certificate_san(cert).split(", "):
139
        if alt_name.startswith("DNS:") and not is_addr:
140
            (start, base) = alt_name.split("DNS:")
141
            base = canonicalize(base)
142
143
            found = match_hostname(hostname, base)
            if found:
144
                return True
145
146
        elif alt_name.startswith("IP Address:") and is_addr:
            host_i = netaddr.IPAddress(hostname)
147
148
149
            (start, base) = alt_name.split("IP Address:")
            if base.endswith("\n"):
                base = base[:-1]
150
151
152
153
154
            try:
                base_i = netaddr.IPAddress(base)
            except netaddr.core.AddrFormatError:
                continue # Ignore broken IP addresses in certificates. Are we too liberal?
            if host_i == base_i: 
155
156
                return True
        else:
157
158
159
            pass # Ignore unknown alternative name types. May be
                 # accept URI alternative names for DoH,
    # According to RFC 6125, we MUST NOT try the Common Name before the Subject Alternative Names.
160
    cn = canonicalize(cert.get_subject().commonName)
161
162
163
    found = match_hostname(hostname, cn)
    if found:
        return True
164
165
    return False

166
class Connection:
167
    def __init__(self, server, servername=None, dot=False, verbose=verbose, insecure=insecure, post=post, head=head):
168
        if dot and not is_valid_hostname(server):
169
            error("DoT requires a host name or IP address, not \"%s\"" % server)
170
171
        if not dot and not is_valid_url(url):
            error("DoH requires a valid HTTPS URL, not \"%s\"" % server)
172
        self.server = server
173
174
175
176
177
        self.servername = servername
        if self.servername is not None:
            check = self.servername
        else:
            check = self.server
178
179
180
181
        self.dot = dot
        if not self.dot:
            self.post = post
            self.head = head
182
183
        if self.dot:
            self.hasher = hashlib.sha256()
184
185
186
        self.verbose = verbose
        self.insecure = insecure
        if self.dot:
187
            addrinfo = socket.getaddrinfo(server, 853)
188
            # May be loop over the results of getaddrinfo, to test all the IP addresses? See #13
189
            self.sock = socket.socket(addrinfo[0][0], socket.SOCK_STREAM)
190
191
            if self.verbose:
                print("Connecting to %s ..." % str(addrinfo[0][4]))
192
193
194
195
            # With typical DoT servers, we *must* use TLS 1.2 (otherwise,
            # do_handshake fails with "OpenSSL.SSL.SysCallError: (-1, 'Unexpected
            # EOF')" Typical HTTP servers are more lax.
            self.context = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
196
197
198
199
200
201
202
203
            if self.insecure:
                self.context.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda *x: True)
            else:
                self.context.set_default_verify_paths()
                self.context.set_verify_depth(4) # Seems ignored
                self.context.set_verify(OpenSSL.SSL.VERIFY_PEER | OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT | \
                                        OpenSSL.SSL.VERIFY_CLIENT_ONCE,
                                        lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
204
            self.session = OpenSSL.SSL.Connection(self.context, self.sock)
205
            self.session.set_tlsext_host_name(check.encode()) # Server Name Indication (SNI)
206
207
            self.session.connect((self.server, 853))
            # TODO We may here have exceptions such as OpenSSL.SSL.ZeroReturnError
208
209
            self.session.do_handshake()
            self.cert = self.session.get_peer_certificate()
210
211
212
            # RFC 7858, section 4.2 and appendix A
            self.publickey = self.cert.get_pubkey()
            if verbose:
213
214
215
216
                print("Certificate #%x for \"%s\", delivered by \"%s\"" % \
                      (self.cert.get_serial_number(),
                       self.cert.get_subject().commonName,
                       self.cert.get_issuer().commonName))
217
218
219
220
221
                self.hasher.update(OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1,
                                                      self.publickey))
                self.digest = self.hasher.digest()
                print("Public key is pin-sha256=\"%s\"" % \
                      base64.standard_b64encode(self.digest).decode())
222
            if not insecure:
223
                valid = validate_hostname(check, self.cert)
224
                if not valid:
225
                    error("Certificate error: \"%s\" is not in the certificate" % (check))
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
        else: # DoH
            self.curl = pycurl.Curl()
            self.url = server
            # Does not work if pycurl was not compiled with nghttp2 (recent Debian
            # packages are OK) https://github.com/pycurl/pycurl/issues/477
            self.curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
            self.curl.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message"])
            if self.verbose:
                self.curl.setopt(self.curl.VERBOSE, True)
            if self.insecure:
                self.curl.setopt(pycurl.SSL_VERIFYPEER, False)   
                self.curl.setopt(pycurl.SSL_VERIFYHOST, False)
            if self.head:
                self.curl.setopt(pycurl.NOBODY, True)
            if self.post:
                 self.curl.setopt(pycurl.POST, True)
    def __str__(self):
        return self.server
    def end(self):
        if self.dot:
            self.session.shutdown()
            self.session.close()
        else: # DoH
            self.curl.close()
            
# Routine doing one actual test. Returns a tuple, first member is a
252
253
# result (boolean indicating success for DoT, HTTP status code for
# DoH), second member is a DNS message (or a string if there is an
254
255
# error), third member is the size of the DNS message (or None if no
# proper response).
256
257
def do_test(connection, qname, qtype=rtype):
    message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype))
258
    size = None
259
260
261
262
263
264
265
    if connection.dot:
        messagew = message.to_wire() 
        length = len(messagew)
        n = connection.session.send(length.to_bytes(2, byteorder='big') + messagew)
        buf = connection.session.recv(2) 
        received = int.from_bytes(buf, byteorder='big')
        buf = connection.session.recv(received)
266
        response = dns.message.from_wire(buf) 
267
        return (True, response, received)
268
269
270
    else: # DoH
        message.id = 0 # DoH requests that
        if connection.post:
271
            connection.curl.setopt(connection.curl.URL, connection.server)
272
273
274
275
276
277
278
279
280
281
282
            data = message.to_wire()
            connection.curl.setopt(pycurl.POSTFIELDS, data)
        else:
            dns_req = base64.urlsafe_b64encode(message.to_wire()).decode('UTF8').rstrip('=')
            connection.curl.setopt(connection.curl.URL, connection.server + ("?dns=%s" % dns_req))
        buffer = io.BytesIO()
        connection.curl.setopt(connection.curl.WRITEDATA, buffer)
        connection.curl.perform()
        rcode = connection.curl.getinfo(pycurl.RESPONSE_CODE)
        ok = True
        if rcode == 200:
283
284
285
286
            ctype = connection.curl.getinfo(pycurl.CONTENT_TYPE)
            if ctype != "application/dns-message":
                response = "Content type of the response (\"%s\") invalid" % ctype
                ok = False
287
            else:
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
                if not head:
                    body = buffer.getvalue()
                    try:
                        size = len(body)
                        response = dns.message.from_wire(body)
                    except dns.message.TrailingJunk: # Not DNS. Should
                        # not happen for a content type
                        # application/dns-message but who knows?
                        response = "ERROR Not proper DNS data, trailing junk \"%s\"" % body
                        ok = False
                    except dns.name.BadLabelType: # Not DNS. 
                        response = "ERROR Not proper DNS data (wrong path in the URL?) \"%s\"" % body[:100]
                        ok = False
                else:
                    response = "HEAD successful"
303
        else:
304
            ok = False
305
306
            body =  buffer.getvalue()
            if len(body) == 0:
307
308
309
310
                response = "[No details]"
            else:
                response = body
            buffer.close()
311
        return (rcode, response, size)
312
    
313
# Main program
314
315
316
317
318
319
me = os.path.basename(sys.argv[0])
monitoring = (me == "check_doh" or me == "check_dot")
if not monitoring:
    name = None
    message = None
    try:
320
321
        optlist, args = getopt.getopt (sys.argv[1:], "hvPkeV:r:f:d:t",
                                       ["help", "verbose", "dot", "head", "insecure", "POST", "vhost=", "repeat=", "file=", "delay="])
322
323
324
325
        for option, value in optlist:
            if option == "--help" or option == "-h":
                usage()
                sys.exit(0)
326
327
            elif option == "--dot" or option == "-t":
                dot = True
328
329
            elif option == "--verbose" or option == "-v":
                verbose = True
330
            elif option == "--HEAD" or option == "--head" or option == "-e":
331
                head = True
332
            elif option == "--POST" or option == "--post" or option == "-P":
333
                post = True
334
335
            elif option == "--vhost" or option == "-V":
                vhostname = value
336
337
            elif option == "--insecure" or option == "-k":
                insecure = True
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
            elif option == "--repeat" or option == "-r":
                tests = int(value)
                if tests <= 1:
                    error("--repeat needs a value > 1")
            elif option == "--delay" or option == "-d":
                delay = float(value)
                if delay <= 0:
                    error("--delay needs a value > 0")
            elif option == "--file" or option == "-f":
                ifile = value
            else:
                error("Unknown option %s" % option)
    except getopt.error as reason:
        usage(reason)
        sys.exit(1)
    if tests <= 1 and delay is not None:
        error("--delay makes no sense if there is no repetition")
    if post and head:
        usage("POST or HEAD but not both")
        sys.exit(1)
    if dot and (post or head):
        usage("POST or HEAD makes non sense for DoT")
        sys.exit(1)    
    if ifile is None and (len(args) != 2 and len(args) != 3):
        usage("Wrong number of arguments")
        sys.exit(1)
    if ifile is not None and len(args) != 1:
        usage("Wrong number of arguments (if --file is used, do not indicate the domain name)")
        sys.exit(1)
    url = args[0]
    if ifile is None:
        name = args[1]
        if len(args) == 3:
            rtype = args[2]
else: # Monitoring plugin
    dot = (me == "check_dot")
    name = None
    try:
        optlist, args = getopt.getopt (sys.argv[1:], "H:n:p:V:t:Pih")
        for option, value in optlist:
            if option == "-H":
                host = value
            elif option == "-V":
                vhostname = value 
            elif option == "-n":
                name = value
            elif option == "-t":
                rtype = value
            elif option == "-p":
                path = value
            elif option == "-P":
                post = True
            elif option == "-h":
                head = True
392
393
            elif option == "-i":
                insecure = True
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
            else:
                # Should never occur, it is trapped by getopt
                print("Unknown option %s" % option)
                sys.exit(STATE_UNKNOWN)
    except getopt.error as reason:
        print("Option parsing problem %s" % reason)
        sys.exit(STATE_UNKNOWN)
    if len(args) > 0:
        print("Too many arguments (\"%s\")" % args)
        sys.exit(STATE_UNKNOWN)
    if host is None or name is None:
        print("Host (-H) and name to lookup (-n) are necessary")
        sys.exit(STATE_UNKNOWN)
    if post and head:
        print("POST or HEAD but not both")
        sys.exit(STATE_UNKNOWN)
410
411
412
413
414
415
    if dot and (post or head):
        print("POST or HEAD makes no sense for DoT")
        sys.exit(STATE_UNKNOWN)
    if dot and path:
        print("URL path makes no sense for DoT")
        sys.exit(STATE_UNKNOWN)
416
417
418
419
420
    if dot:
        url = host
    else:
        if vhostname is None:
            url = "https://%s/" % host
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
421
        else:
422
423
424
425
426
            url = "https://%s/" % vhostname # host is ignored in that case
        if path is not None:
            if path.startswith("/"):
                path = path[1:]
            url += path
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
427
ok = True
428
start = time.time() 
429
try:
430
    if dot and vhostname is not None:
431
432
433
434
435
436
        extracheck = vhostname
    else:
        extracheck = None
    conn = Connection(url, dot=dot, servername=extracheck, verbose=verbose, insecure=insecure, post=post, head=head)
except TimeoutError:
    error("timeout")
437
438
except ConnectionRefusedError:
    error("Connection to server refused")
439
440
if ifile is not None:
    input = open(ifile)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
441
for i in range (0, tests):
442
443
444
445
446
447
448
449
450
451
452
    if tests > 1:
        print("\nTest %i" % i)
    if ifile is not None:
        line = input.readline()
        if line[:-1] == "":
            error("Not enough data in %s for the %i tests" % (ifile, tests))
        if line.find(' ') == -1:
            name = line[:-1]
            rtype = 'AAAA'
        else:
            (name, rtype) = line.split()
453
    (rcode, msg, size) = do_test(conn, name, rtype)
454
    if (dot and rcode) or (not dot and rcode == 200):
455
456
457
458
459
460
461
462
        if not monitoring:
            print(msg)
        else:
            if size is not None and size > 0:
                print("%s OK - %s" % (url, "No error for %s/%s, %i bytes received" % (name, rtype, size)))
            else:
                print("%s OK - %s" % (url, "No error"))
            sys.exit(STATE_OK)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
463
    else:
464
465
466
467
        if not monitoring:
            if dot:
                print("Error: %s" % msg, file=sys.stderr)
            else:
468
469
               try:
                   msg = msg.decode()
470
               except (UnicodeDecodeError, AttributeError):
471
472
                   pass # Sometimes, msg can be binary, or Latin-1
               print("HTTP error %i: %s" % (rcode, msg), file=sys.stderr)
473
        else:
474
            if not dot:
475
                print("%s HTTP error - %i: %s" % (url, rcode, msg))
476
477
478
            else:
                print("%s Error - %i: %s" % (url, rcode, msg))
            sys.exit(STATE_CRITICAL)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
479
        ok = False
480
481
    if tests > 1 and i == 0:
        start2 = time.time()
482
483
    if delay is not None:
        time.sleep(delay)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
484
stop = time.time()
485
486
487
488
if tests > 1:
    extra = ", %.2f ms/request if we ignore the first one" % ((stop-start2)*1000/(tests-1))
else:
    extra = ""
489
490
if not monitoring:
    print("\nTotal elapsed time: %.2f seconds (%.2f ms/request %s)" % (stop-start, (stop-start)*1000/tests, extra))
491
492
if ifile is not None:
    input.close()
493
conn.end()
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
494
if ok:
495
496
497
498
    if not monitoring:
        sys.exit(0)
    else:
        sys.exit(STATE_OK)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
499
else:
500
501
502
503
504
    if not monitoring:
        sys.exit(1)
    else:
        sys.exit(STATE_CRITICAL)