request.py 5.82 KB
Newer Older
Alexandre's avatar
Alexandre committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# request.py

try:
    # http://www.dnspython.org/
    import dns.message
except ImportError as e:
    print("Error: missing module")
    print(e)
    sys.exit(1)

import homer

class Request:
    def __init__(self, qname, qtype='AAAA', use_edns=True, want_dnssec=False, no_ecs=True):
        if no_ecs:
             opt = dns.edns.ECSOption(address='', srclen=0) # Disable ECS (RFC 7871, section 7.1.2)
             options = [opt]
        else:
            options = None
        self.message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype),
                                              use_edns=use_edns, want_dnssec=want_dnssec, options=options)
        self.message.flags |= dns.flags.AD # Ask for validation
23
        self.success = True # True by default, set to False as soon as an error is encountered
Alexandre's avatar
Alexandre committed
24
25
26
27
28
29
30
31
32
33
        self.i = 0 # request's number on the connection (default to the first)

    def trunc_data(self):
        self.data = self.message.to_wire()
        half = round(len(self.data) / 2)
        self.data = self.data[:half]

    def to_wire(self):
        self.data = self.message.to_wire()

34
35
36
37
38
39
    def has_expected_str(self, string):
        try:
            return string is None or string in str(self.response)
        except AttributeError:
            return False

Alexandre's avatar
Alexandre committed
40
41
42
43
44

class RequestDOT(Request):
    # raising custom exception for each unexpected response might be a good idea
    def check_response(self, debug=False):
        if self.response is None:
45
            self.success = False
Alexandre's avatar
Alexandre committed
46
47
            raise homer.RequestDOTException("No reply received")
        if not self.rcode:
48
            self.success = False
Alexandre's avatar
Alexandre committed
49
50
51
52
53
            return False
        if self.response.id != self.message.id:
            self.response = "The ID in the answer does not match the one in the query"
            if debug:
                self.response += f'"(query id: {self.message.id}) (response id: {self.response.id})'
54
            self.success = False
Alexandre's avatar
Alexandre committed
55
            return False
56
        return self.success
Alexandre's avatar
Alexandre committed
57
58
59
60
61
62
63
64
65

    def store_response(self, rcode, data, size):
        self.rcode = True
        self.response = dns.message.from_wire(data)
        self.response_size = size


class RequestDOH(Request):
    def __init__(self, qname, qtype='AAAA', use_edns=True, want_dnssec=False, no_ecs=True):
66
        super().__init__(qname, qtype=qtype, use_edns=use_edns, want_dnssec=want_dnssec, no_ecs=no_ecs)
Alexandre's avatar
Alexandre committed
67
        self.message.id = 0 # DoH requests that
Alexandre's avatar
Alexandre committed
68
69
        self.post = False # TODO pass as argument
        self.head = False # pass as argument
Alexandre's avatar
Alexandre committed
70
71
72
73
74
75

    # raising custom exception for each unexpected response might be a good idea
    def check_response(self, debug=False):
        if self.rcode == 200:
            if self.ctype != "application/dns-message":
                self.response = "Content type of the response (\"%s\") invalid" % self.ctype
76
                self.success = False
Alexandre's avatar
Alexandre committed
77
78
79
80
81
82
83
84
85
86
            else:
                if not self.head:
                    try:
                        response = dns.message.from_wire(self.response)
                    except dns.message.TrailingJunk: # Not DNS. Should
                        # not happen for a content type
                        # application/dns-message but who knows?
                        self.response = "ERROR Not proper DNS data, trailing junk"
                        if debug:
                            self.response += " \"%s\"" % response
87
                        self.success = False
Alexandre's avatar
Alexandre committed
88
89
90
91
                    except dns.name.BadLabelType: # Not DNS.
                        self.response = "ERROR Not proper DNS data (wrong path in the URL?)"
                        if debug:
                            self.response += " \"%s\"" % response[:100]
92
                        self.success = False
Alexandre's avatar
Alexandre committed
93
94
95
96
97
98
99
100
101
102
                    else:
                        self.response = response
                else:
                    if self.response_size == 0:
                        self.response = "HEAD successful"
                    else:
                        data = self.response
                        self.response = "ERROR Body length is not null"
                        if debug:
                            self.response += "\"%s\"" % data[:100]
103
                        self.success = False
Alexandre's avatar
Alexandre committed
104
        else:
105
            self.success = False
Alexandre's avatar
Alexandre committed
106
107
108
109
            if self.response_size == 0:
                self.response = "[No details]"
            else:
                self.response = self.response
110
        return self.success
Alexandre's avatar
Alexandre committed
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141

def create_request(qname, qtype='AAAA', use_edns=True, want_dnssec=False, no_ecs=True, dot=False, trunc=False):
    if dot:
        request = RequestDOT(qname, qtype, use_edns, want_dnssec, no_ecs)
    else:
        request = RequestDOH(qname, qtype, use_edns, want_dnssec, no_ecs)
    if trunc:
        request.trunc_data()
    else:
        request.to_wire()
    return request

def create_requests_list(dot=False, **req_args):
    requests = []
    if dot:
        requests.append(('Test 1', create_request(dot=dot, **req_args),
                        homer.mandatory_levels["legal"]))
        requests.append(('Test 2', create_request(dot=dot, **req_args),
                         homer.mandatory_levels["necessary"])) # RFC 7858,
        # section 3.3, SHOULD accept several requests on one connection.
        # TODO we miss the tests of pipelining and out-of-order.
    else:
        requests.append(('Test GET', create_request(**req_args), homer.DOH_GET,
                         homer.mandatory_levels["legal"])) # RFC 8484, section 4.1
        requests.append(('Test POST', create_request(**req_args), homer.DOH_POST,
                         homer.mandatory_levels["legal"])) # RFC 8484, section 4.1
        requests.append(('Test HEAD', create_request(**req_args), homer.DOH_HEAD,
                         homer.mandatory_levels["nicetohave"])) # HEAD
        # method is not mentioned in RFC 8484 (see section 4.1), so
        # just "nice to have".
    return requests