request.py 4.37 KB
Newer Older
Alexandre's avatar
Alexandre committed
1
2
3
4
5
6
7
8
9
10
# request.py

try:
    # http://www.dnspython.org/
    import dns.message
except ImportError as e:
    print("Error: missing module")
    print(e)
    sys.exit(1)

Alexandre's avatar
Alexandre committed
11
import remoh
Alexandre's avatar
Alexandre committed
12
13
14
15
16
17
18
19
20
21
22

class Request:
    def __init__(self, qname, qtype='AAAA', use_edns=True, want_dnssec=False, no_ecs=True):
        if no_ecs:
             opt = dns.edns.ECSOption(address='', srclen=0) # Disable ECS (RFC 7871, section 7.1.2)
             options = [opt]
        else:
            options = None
        self.message = dns.message.make_query(qname, dns.rdatatype.from_text(qtype),
                                              use_edns=use_edns, want_dnssec=want_dnssec, options=options)
        self.message.flags |= dns.flags.AD # Ask for validation
23
        self.success = True # True by default, set to False as soon as an error is encountered
Alexandre's avatar
Alexandre committed
24
25
26
27
28
29
30
31
32
33
        self.i = 0 # request's number on the connection (default to the first)

    def trunc_data(self):
        self.data = self.message.to_wire()
        half = round(len(self.data) / 2)
        self.data = self.data[:half]

    def to_wire(self):
        self.data = self.message.to_wire()

34
35
36
37
38
39
    def has_expected_str(self, string):
        try:
            return string is None or string in str(self.response)
        except AttributeError:
            return False

Alexandre's avatar
Alexandre committed
40
41
42
43
44

class RequestDOT(Request):
    # raising custom exception for each unexpected response might be a good idea
    def check_response(self, debug=False):
        if self.response is None:
45
            self.success = False
Alexandre's avatar
Alexandre committed
46
            raise remoh.RequestDOTException("No reply received")
Alexandre's avatar
Alexandre committed
47
        if not self.rcode:
48
            self.success = False
Alexandre's avatar
Alexandre committed
49
50
51
52
53
            return False
        if self.response.id != self.message.id:
            self.response = "The ID in the answer does not match the one in the query"
            if debug:
                self.response += f'"(query id: {self.message.id}) (response id: {self.response.id})'
54
            self.success = False
Alexandre's avatar
Alexandre committed
55
            return False
56
        return self.success
Alexandre's avatar
Alexandre committed
57
58
59

    def store_response(self, rcode, data, size):
        self.rcode = True
Alexandre's avatar
Alexandre committed
60
        self.response_size = size
Alexandre's avatar
Alexandre committed
61
        self.response = dns.message.from_wire(data)
Alexandre's avatar
Alexandre committed
62
63
64
65


class RequestDOH(Request):
    def __init__(self, qname, qtype='AAAA', use_edns=True, want_dnssec=False, no_ecs=True):
66
        super().__init__(qname, qtype=qtype, use_edns=use_edns, want_dnssec=want_dnssec, no_ecs=no_ecs)
Alexandre's avatar
Alexandre committed
67
        self.message.id = 0 # DoH requests that
Alexandre's avatar
Alexandre committed
68
69
        self.post = False # TODO pass as argument
        self.head = False # pass as argument
Alexandre's avatar
Alexandre committed
70
71
72
73
74
75

    # raising custom exception for each unexpected response might be a good idea
    def check_response(self, debug=False):
        if self.rcode == 200:
            if self.ctype != "application/dns-message":
                self.response = "Content type of the response (\"%s\") invalid" % self.ctype
76
                self.success = False
Alexandre's avatar
Alexandre committed
77
78
79
80
81
82
83
84
85
86
            else:
                if not self.head:
                    try:
                        response = dns.message.from_wire(self.response)
                    except dns.message.TrailingJunk: # Not DNS. Should
                        # not happen for a content type
                        # application/dns-message but who knows?
                        self.response = "ERROR Not proper DNS data, trailing junk"
                        if debug:
                            self.response += " \"%s\"" % response
87
                        self.success = False
Alexandre's avatar
Alexandre committed
88
89
90
91
                    except dns.name.BadLabelType: # Not DNS.
                        self.response = "ERROR Not proper DNS data (wrong path in the URL?)"
                        if debug:
                            self.response += " \"%s\"" % response[:100]
92
                        self.success = False
Alexandre's avatar
Alexandre committed
93
94
95
96
97
98
99
100
101
102
                    else:
                        self.response = response
                else:
                    if self.response_size == 0:
                        self.response = "HEAD successful"
                    else:
                        data = self.response
                        self.response = "ERROR Body length is not null"
                        if debug:
                            self.response += "\"%s\"" % data[:100]
103
                        self.success = False
Alexandre's avatar
Alexandre committed
104
        else:
105
            self.success = False
Alexandre's avatar
Alexandre committed
106
107
108
109
            if self.response_size == 0:
                self.response = "[No details]"
            else:
                self.response = self.response
110
        return self.success