Building a DNS server from scratch: DNS Message Question & Answer

Wed, Dec 20, 2023 8-minute read

Building upon the foundations laid in our previous post on creating a DNS server, we now progress to the next critical component: handling the DNS message’s Question and Answer sections. This blog post will explore how to construct these two sections, which are vital for the server’s ability to interpret queries and provide appropriate responses. We’ll delve into the intricacies of encoding domain names into the DNS query format and unpacking them from raw byte data, using Python’s powerful structuring capabilities.

DNS Question section

Understanding the DNS Question section

The DNS Question section, as specified in RFC 1035 Section 4.1.2, contains the query name (QNAME), query type (QTYPE), and query class (QCLASS).

      0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
    |                                               |
    /                     QNAME                     /
    /                                               /
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
    |                     QTYPE                     |
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
    |                     QCLASS                    |
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+

The QNAME field is of variable length and represents the domain name being queried, broken down into labels and encoded in a specific format: each label is preceded by its length (represented by one byte). The entire QNAME ends with a zero-length octet (null byte).

QTYPE specifies the type of query (e.g., A, MX, etc.), and QCLASS denotes the class of the query, typically the Internet (IN). Both of these fields have a length of 2 bytes.

Impelemtning the DNS Question

As we’ll see in the next section, the DNS Answer needs to encode the domain name the same way like the DNS Qeustion, which is why we outsource it to a standalone helper fucntion encode_str_to_bytes:

def encode_str_to_bytes(data: str) -> bytes:
    parts = data.split(".")
    result = b""
    for part in parts:
        length = len(part)
        result += length.to_bytes(1, byteorder="big") + part.encode()
    result += b"\x00"
    return result

class DNSQuestion:
    def __init__(self, domain: str, qtype: str = 1, qclass: str = 1) -> None:
        self.qname = self.encode(domain)
        self.qtype = qtype
        self.qclass = qclass

    def encode(self, domain: str) -> bytes:
        return encode_str_to_bytes(domain)

    def to_bytes(self) -> bytes:
        return self.qname + struct.pack("!HH", self.qtype, self.qclass)

Most of the work is done by our encoder: it takes a domain name, splits it into its constituent labels, and encodes each label with its length in bytes, followed by the label itself. The entire sequence is then terminated with a null byte, adhering to the DNS encoding standard described in RFC1035.

The DNSQuestion class encapsulates this logic, storing the domain name, query type, and query class, and providing a method to_bytes to combine these elements into the correct byte format for DNS queries (see the previous post here if you’re not familiar with the struct module).

DNS Answer section

## Understanding the DNS Answer section

      0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
    |                                               |
    /                                               /
    /                      NAME                     /
    |                                               |
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
    |                      TYPE                     |
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
    |                     CLASS                     |
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
    |                      TTL                      |
    |                                               |
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
    |                   RDLENGTH                    |
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--|
    /                     RDATA                     /
    /                                               /
    +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+

The DNS Answer is a little bit more complex then the question since it contains the resolved IP address of the domain the DNS Question asks for encoded in bytes (RDATA).

The domain name is repeated from the Question section and is followed by the type and class, which are typically the same as in the Question.

The TTL field dictates the duration for which the answer can be cached.

The RDLENGTH field specifies the length of the RDATA, which is crucial as it varies depending on the type of record. For an ‘A’ record, the RDATA will be the IP address of the queried domain, encoded in a 4-byte format for IPv4 addresses.

Impelemtning the DNS Answer

Here’s an example how DNS answer implementation might look like:

class DNSAnswer:
    def __init__(
        self,
        name: str,
        ip: str,
        atype: int = 1,
        aclass: int = 1,
        ttl: int = 60,
        rdlength: int = 4,
    ) -> None:
        self.name = self.encode(name)
        self.type = (atype).to_bytes(2, byteorder="big")
        self.aclass = (aclass).to_bytes(2, byteorder="big")
        self.ttl = (ttl).to_bytes(4, "big")
        self.length = (rdlength).to_bytes(2, "big")
        self.rdata = self.ipv4_to_bytes(ip)

    def encode(self, data: str) -> bytes:
        return encode_str_to_bytes(data)

    def ipv4_to_bytes(self, ip: str) -> bytes:
        res = b""
        for part in ip.split("."):
            res += int(part).to_bytes(1, "big")

        return res

    def to_bytes(self) -> bytes:
        return self.name + self.type + self.aclass + self.ttl + self.length + self.rdata

It encodes the domain name using the previously defined encode_str_to_bytes helper function.

The name, type, aclass, ttl, and rdlength fields are encoded into byte format according to RFC1035.

The ipv4_to_bytes method converts the IP address into its byte representation.

The final to_bytes method concatenates all these fields, forming a complete DNS Answer section in the correct byte format for transmission.

Putting it together

With the code from our previous posts here and here, our DNS server can now listen to a request and parse the header from that request. Since we haven’t written a parser for the DNS question yet, we’ll just assume an IPv4 A record and hardcode it in a first step like so:

import socket
import struct


class DNSHeader:
    def __init__(
        self,
        hid: int = 1234,
        qr: int = 1,
        opcode: int = 0,
        aa: int = 0,
        tc: int = 0,
        rd: int = 0,
        ra: int = 0,
        z: int = 0,
        rcode: int = 0,
        qdcount: int = 1,
        ancount: int = 1,
        nscount: int = 0,
        arcount: int = 0,
    ):
        self.id = hid
        self.qr = qr
        self.opcode = opcode
        self.aa = aa
        self.tc = tc 
        self.rd = rd
        self.ra = ra 
        self.z = z 
        self.rcode = rcode
        self.ancount = ancount
        self.qdcount = qdcount
        self.nscount = nscount
        self.arcount = arcount

    @staticmethod
    def from_bytes(message: bytes) -> "DNSHeader":
        # start & end indices in bytes
        start, end = (0, 6 * 2)
        header = message[start:end]
        hid, flags, qdcount, ancount, nscount, arcount = struct.unpack(
            "!HHHHHH", header
        )
        qr = (flags >> 15) & 0x1
        opcode = (flags >> 11) & 0xF
        aa = (flags >> 10) & 0x1
        tc = (flags >> 9) & 0x1
        rd = (flags >> 8) & 0x1
        ra = (flags >> 7) & 0x1
        z = (flags >> 4) & 0x7
        rcode = flags & 0xF
        return DNSHeader(
            hid,
            qr,
            opcode,
            aa,
            tc,
            rd,
            ra,
            z,
            rcode,
            qdcount,
            ancount,
            nscount,
            arcount,
        )

    def to_bytes(self) -> bytes:
        flags = (
            (self.qr << 15)
            | (self.opcode << 11)
            | (self.aa << 10)
            | (self.tc << 9)
            | (self.rd << 8)
            | (self.ra << 7)
            | (self.z << 4)
            | (self.rcode)
        )
        return struct.pack(
            "!HHHHHH",
            self.id,
            flags,
            self.qdcount,
            self.ancount,
            self.nscount,
            self.arcount,
        )


def encode_str_to_bytes(data: str) -> bytes:
    parts = data.split(".")
    result = b""
    for part in parts:
        length = len(part)
        result += length.to_bytes(1, byteorder="big") + part.encode()
    result += b"\x00"
    return result


class DNSQuestion:
    def __init__(self, domain: str, qtype: str = 1, qclass: str = 1) -> None:
        self.qname = self.encode(domain)
        self.qtype = qtype
        self.qclass = qclass

    def encode(self, domain: str) -> bytes:
        return encode_str_to_bytes(domain)

    def to_bytes(self) -> bytes:
        return self.qname + struct.pack("!HH", self.qtype, self.qclass)


class DNSAnswer:
    def __init__(
        self,
        name: str,
        ip: str,
        atype: int = 1,
        aclass: int = 1,
        ttl: int = 60,
        rdlength: int = 4,
    ) -> None:
        self.name = self.encode(name)
        self.type = (atype).to_bytes(2, byteorder="big")
        self.aclass = (aclass).to_bytes(2, byteorder="big")
        self.ttl = (ttl).to_bytes(4, "big")
        self.length = (rdlength).to_bytes(2, "big")
        self.rdata = self.ipv4_to_bytes(ip)

    def encode(self, data: str) -> bytes:
        return encode_str_to_bytes(data)

    def ipv4_to_bytes(self, ip: str) -> bytes:
        res = b""
        for part in ip.split("."):
            res += int(part).to_bytes(1, "big")

        return res

    def to_bytes(self) -> bytes:
        return self.name + self.type + self.aclass + self.ttl + self.length + self.rdata


def main():
    print("Starting UDP server...")

    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.bind(("127.0.0.1", 2053))
        while True:
            try:
                data, addr = s.recvfrom(512)
                print(f"Received data from {addr}: {data}")
                header = DNSHeader.from_bytes(data)
                # ovwerrite received flags for our reply
                header.qr, header.ancount, header.arcount, header.nscount = 1, 1, 0, 0
                domain = "example.com"
                q = DNSQuestion(domain)
                a = DNSAnswer(domain, "8.8.8.8")
                s.sendto(header.to_bytes() + q.to_bytes() + a.to_bytes(), addr)
            except Exception as e:
                print(f"Error receiving data: {e}")
                break


if __name__ == "__main__":
    main()

If we now start our server

$ python3 dns.py
Starting UDP server...

and run the following dig command in another terminal we should see the following:

$ dig @127.0.0.1 -p 2053 example.com

; <<>> DiG 9.10.6 <<>> @127.0.0.1 -p 2053 example.com
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 31351
;; flags: qr; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;example.com.                   IN      A

;; ANSWER SECTION:
example.com.            60      IN      A       8.8.8.8

;; Query time: 0 msec
;; SERVER: 127.0.0.1#2053(127.0.0.1)
;; WHEN: Tue Dec 19 15:11:49 CET 2023
;; MSG SIZE  rcvd: 56

Note, that dig’s ID mismatch warning of our first version is gone now because we parsed the ID from the request and used it to construct our response DNS message.

Next steps

We now have a DNS server that can listen to a request and parse the header from it. In a next post we’ll extend our DNSQuestion class to parse the question section of the request so that we can get rid of the hard-coded assumptions made above.