homer.py 5.35 KB
Newer Older
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python3

# http://pycurl.io/docs/latest
import pycurl

# http://www.dnspython.org/
import dns.message

import io
import sys
import base64
import getopt
import urllib.parse
import time

16
# Values that can be changed from the command line
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
17
18
19
20
21
22
post = False
verbose = False
insecure = False
head = False
rtype = 'AAAA'
tests = 1 # Number of repeated tests
23
ifile = None # Input file
24
delay = None
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
25
26
27
28
29
30
31
32
33
34
35
36

def error(msg=None):
    if msg is None:
        msg = "Unknown error"
    print(msg,file=sys.stderr)
    sys.exit(1)
    
def usage(msg=None):
    if msg:
        print(msg,file=sys.stderr)
    print("Usage: %s [-P] [-k] url domain-name [DNS type]" % sys.argv[0], file=sys.stderr)

37
38
39
# Main program
name = None
message = None
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
40
try:
41
42
    optlist, args = getopt.getopt (sys.argv[1:], "hvPker:f:d:",
                                   ["help", "verbose", "head", "insecure", "POST", "repeat=", "file=", "delay="])
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    for option, value in optlist:
        if option == "--help" or option == "-h":
            usage()
            sys.exit(0)
        elif option == "--verbose" or option == "-v":
            verbose = True
        elif option == "--head" or option == "-e":
            head = True
        elif option == "--insecure" or option == "-k":
            insecure = True
        elif option == "--POST" or option == "-P":
            post = True
        elif option == "--repeat" or option == "-r":
            tests = int(value)
            if tests <= 1:
                error("--repeat needs a value > 1")
59
60
61
62
        elif option == "--delay" or option == "-d":
            delay = float(value)
            if delay <= 0:
                error("--delay needs a value > 0")
63
64
        elif option == "--file" or option == "-f":
            ifile = value
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
65
66
67
68
69
        else:
            error("Unknown option %s" % option)
except getopt.error as reason:
    usage(reason)
    sys.exit(1)
70
71
if tests <= 1 and delay is not None:
    error("--delay makes no sense if there is no repetition")
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
72
73
74
if post and head:
    usage("POST or HEAD but not both")
    sys.exit(1)
75
if ifile is None and (len(args) != 2 and len(args) != 3):
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
76
77
    usage("Wrong number of arguments")
    sys.exit(1)
78
79
80
if ifile is not None and len(args) != 1:
    usage("Wrong number of arguments (if --file is used, do not indicate the domain name)")
    sys.exit(1)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
81
url = args[0]
82
83
84
85
if ifile is None:
    name = args[1]
    if len(args) == 3:
        rtype = args[2]
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
86
c = pycurl.Curl()
87
88
89
if ifile is None:
    message = dns.message.make_query(name, dns.rdatatype.from_text(rtype))
    message.id = 0 # DoH requests that
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
90
91
92
93
94
if head:
    c.setopt(pycurl.NOBODY, True)
if post:
    c.setopt(c.URL, url)
    c.setopt(pycurl.POST, True)
95
96
97
    if ifile is None:
        data = message.to_wire()
        c.setopt(pycurl.POSTFIELDS, data)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
98
else:
99
100
101
    if ifile is None:
        dns_req = base64.urlsafe_b64encode(message.to_wire()).decode('UTF8').rstrip('=')
        c.setopt(c.URL, url + ("?dns=%s" % dns_req))
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
102
103
104
105
106
107
108
109
110
111
112
113
c.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message"])
# libcurl sets HTTP persistence automatically, thus handling the case if tests > 1
if verbose:
    c.setopt(c.VERBOSE, True)
if insecure:
    c.setopt(pycurl.SSL_VERIFYPEER, False)   
    c.setopt(pycurl.SSL_VERIFYHOST, False)
# Does not work if pycurl was not compiled with nghttp2 (recent Debian
# packages are OK) https://github.com/pycurl/pycurl/issues/477
c.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
ok = True
start = time.time()
114
115
if ifile is not None:
    input = open(ifile)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
116
for i in range (0, tests):
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
    if tests > 1:
        print("\nTest %i" % i)
    if ifile is not None:
        line = input.readline()
        if line[:-1] == "":
            error("Not enough data in %s for the %i tests" % (ifile, tests))
        if line.find(' ') == -1:
            name = line[:-1]
            rtype = 'AAAA'
        else:
            (name, rtype) = line.split()
        message = dns.message.make_query(name, dns.rdatatype.from_text(rtype))
        message.id = 0 # DoH requests that
        if post:
            data = message.to_wire()
            c.setopt(pycurl.POSTFIELDS, data)
        else:
            dns_req = base64.urlsafe_b64encode(message.to_wire()).decode('UTF8').rstrip('=')
            c.setopt(c.URL, url + ("?dns=%s" % dns_req))
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    buffer = io.BytesIO()
    c.setopt(c.WRITEDATA, buffer)
    c.perform()
    rcode = c.getinfo(pycurl.RESPONSE_CODE)
    if rcode == 200:
        if not head:
            body = buffer.getvalue()
            try:
                response = dns.message.from_wire(body)
            except dns.message.TrailingJunk: # Not DNS. 
                response = "ERROR Not proper DNS data \"%s\"" % body
                ok = False
            print(response)
        else:
            print("HEAD request successful")
    else:
        body =  buffer.getvalue()
        if len(body) == 0:
            body = b"[No details]"
        print("HTTP error %i: %s" % (rcode, body[0:1000].decode()), file=sys.stderr)
        ok = False
    buffer.close()
158
159
    if tests > 1 and i == 0:
        start2 = time.time()
160
161
    if delay is not None:
        time.sleep(delay)
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
162
163
c.close()
stop = time.time()
164
165
166
167
168
169
170
if tests > 1:
    extra = ", %.2f ms/request if we ignore the first one" % ((stop-start2)*1000/(tests-1))
else:
    extra = ""
print("\nTotal elapsed time: %.2f seconds (%.2f ms/request %s)" % (stop-start, (stop-start)*1000/tests, extra))
if ifile is not None:
    input.close()
Stephane Bortzmeyer's avatar
Stephane Bortzmeyer committed
171
172
173
174
175
if ok:
    sys.exit(0)
else:
    sys.exit(1)