Skip to content

搞英语 → 看世界

翻译英文优质信息和名人推特

Menu
  • 首页
  • 作者列表
  • 独立博客
  • 专业媒体
  • 名人推特
  • 邮件列表
  • 关于本站
Menu

Python 中的 DNS 服务器

Posted on 2024-12-10

简介

在这篇文章中,我尝试逐行解释如何在 Python 中实现大部分 DNS 协议,以创建我自己可以信任的本地 DNS 服务器(是的,我就是那个偏执狂)。这只是我对这个项目的笔记,不要期待任何东西。

进口

import socket import struct import random import threading import time from collections import OrderedDict import urllib.request import os import hashlib
  • socket :处理网络,例如发送和接收 DNS 查询。
  • struct :帮助打包/解包二进制数据以构建/解析 DNS 数据包。
  • random :为 DNS 查询生成随机事务 ID。
  • threading :管理多个操作(例如,阻止列表更新)而不阻塞主服务器。
  • time :处理基于时间的操作(例如,缓存过期)。
  • OrderedDict :维护 DNS 缓存中的顺序,同时驱逐旧条目。
  • urllib.request :从互联网下载阻止列表文件。
  • os :处理文件和目录操作(例如,块列表缓存)。
  • hashlib :为缓存文件名生成唯一的哈希值。

配置常量

DNS_SERVER = '9.9.9.9' BLOCKLIST_URLS = [ 'https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts', 'https://raw.githubusercontent.com/hagezi/dns-blocklists/main/hosts/pro-compressed.txt', ] BLOCKLIST_CACHE_DIR = 'blocklist_cache' BLOCKLIST_CACHE_TTL = 24 * 60 * 60 # 24 hours
  • DNS_SERVER :默认上游 DNS 服务器,用于解析非阻塞查询。
  • BLOCKLIST_URLS :包含域阻止列表的 URL 列表。
  • BLOCKLIST_CACHE_DIR :缓存下载的阻止列表的目录。
  • BLOCKLIST_CACHE_TTL :块列表缓存的生存时间 (TTL),设置为 24 小时。

块BlocklistCache类

此类管理阻止列表及其缓存。

初始化

class BlocklistCache: def __init__(self): self.blocked_domains = set() self.last_update = 0 self.lock = threading.Lock() self.cache_dir = BLOCKLIST_CACHE_DIR os.makedirs(self.cache_dir, exist_ok=True)
  • 初始化:
    • blocked_domains :被阻止的域集。
    • last_update :上次更新的时间戳。
    • lock :防止更新期间的竞争条件。
    • cache_dir :缓存阻止列表的目录(如果丢失则创建)。

辅助函数

def _get_cache_path(self, url): url_hash = hashlib.md5(url.encode()).hexdigest() return os.path.join(self.cache_dir, f"blocklist_{url_hash}.txt")
  • 使用 MD5 哈希值为阻止列表 URL 生成唯一的缓存文件路径。
 def _is_cache_valid(self, cache_path): if not os.path.exists(cache_path): return False cache_age = time.time() - os.path.getmtime(cache_path) return cache_age < BLOCKLIST_CACHE_TTL
  • 检查缓存文件是否存在并且在其 TTL 内。

下载并解析阻止列表

def _download_and_cache_blocklist(self, url): cache_path = self._get_cache_path(url) try: if self._is_cache_valid(cache_path): with open(cache_path, 'r') as f: return f.readlines() response = urllib.request.urlopen(url) content = response.read().decode('utf-8').splitlines() with open(cache_path, 'w') as f: f.write(' '.join(content)) return content except Exception as e: print(f"Error downloading blocklist {url}: {e}") if os.path.exists(cache_path): with open(cache_path, 'r') as f: return f.readlines() return []
  • 下载并缓存阻止列表数据:
    • 如果缓存有效,则从缓存中读取。
    • 否则,它会下载阻止列表并将其写入缓存。
 def update_blocklists(self): with self.lock: new_blocked_domains = set() for url in BLOCKLIST_URLS: content = self._download_and_cache_blocklist(url) for line in content: line = line.strip() if line and not line.startswith('#'): parts = line.split() if len(parts) >= 2 and parts[0] in {'0.0.0.0', '127.0.0.1'}: new_blocked_domains.add(parts[1].lower()) self.blocked_domains = new_blocked_domains self.last_update = time.time()
  • 从所有来源下载阻止列表并更新blocked_domains集。
 def is_blocked(self, domain): current_time = time.time() if current_time - self.last_update > BLOCKLIST_CACHE_TTL: threading.Thread(target=self.update_blocklists).start() return domain.lower() in self.blocked_domains
  • 检查域是否被阻止。如果阻止列表已过时,则会异步更新。

DNSCache类

管理 DNS 响应的内存缓存。

初始化

class DNSCache: def __init__(self, max_size=200 * 1024 * 1024): self.cache = OrderedDict() self.lock = threading.Lock() self.current_size = 0 self.max_size = max_size
  • 初始化:
    • cache :用于存储 DNS 响应的有序字典。
    • max_size :允许的最大缓存大小 (200 MB)。

缓存管理

def set(self, key, value, ttl): with self.lock: expiration = time.time() + ttl entry_size = self._calculate_entry_size(key, value) while self.current_size + entry_size > self.max_size: self._evict_oldest() self.cache[key] = (value, expiration) self.cache.move_to_end(key) self.current_size += entry_size
  • 添加对缓存的响应,如果需要则驱逐最旧的条目。
 def get(self, key): with self.lock: now = time.time() if key in self.cache: value, expiration = self.cache[key] if expiration > now: self.cache.move_to_end(key) return value del self.cache[key] self.current_size -= self._calculate_entry_size(key, value) return None
  • 如果响应尚未过期,则检索响应。

DNS查询/响应功能

这些函数构造、解析和转发 DNS 数据包:

  • build_query(domain, query_type) :构建 DNS 查询数据包。
  • parse_query(data) :解析传入的查询。
  • parse_response(data) :解析 DNS 响应。
  • query_upstream(domain, query_type) :将查询转发到上游服务器。

build_query(domain, query_type)

目的:

该函数构造一个 DNS 查询数据包。 DNS 查询被发送到服务器以将域名解析为 IP 地址(或其他记录类型)。

参数:

  • domain :要查询的域名(例如example.com )。
  • query_type :DNS 查询的类型(例如, 1表示 A 记录, 28表示 AAAA 记录)。

过程:

  1. 交易编号:

    • 使用random.randint(0, 65535)生成随机 16 位整数。
    • 这有助于将响应与请求相匹配。
  2. 标志:

    • 使用固定值0x0100 ,表示:
      • 标准查询。
      • 需要递归(如果需要,请询问上游服务器)。
  3. 标题结构:

    • 使用struct.pack(">HHHHHH")打包六个 16 位字段:
      • 交易ID。
      • 旗帜。
      • QDCOUNT:1(查询中的一个问题)。
      • ANCOUNT、NSCOUNT、ARCOUNT:0(尚无答案、权威或其他记录)。
  4. 问题部分:

    • 该域被分为标签(例如, example.com → example和com )。
    • 每个标签都以其长度为前缀并以字节为单位进行编码。
    • 添加空字节 ( � 以表示域名的结尾。
  5. 查询类型和类别:

    • 附加两个字段:
      • 查询类型(例如, 1表示 A 记录)。
      • 查询类( 1表示 Internet)。
  6. 返回:

    • 将标头和问题部分组合成完整的 DNS 查询数据包。
    • 返回数据包和交易ID以供参考。

parse_query(data)

目的:

解析收到的DNS查询报文,提取域名和查询类型。

参数:

  • data :包含 DNS 查询数据包的二进制字符串。

过程:

  1. 跳过标题:

    • 前 12 个字节是 DNS 标头。此后域开始。
  2. 域名解析:

    • 遍历域名标签:
      • 读取长度字节。
      • 提取相应的字节数作为标签。
      • 当遇到零长度标签 ( 0 ) 时停止,指示域的结尾。
  3. 提取查询类型和类:

    • 读取域名后接下来的 4 个字节:
      • 查询类型(2 个字节)。
      • 查询类(2 个字节)。
  4. 返回:

    • 返回域名(例如example.com )和查询类型(例如 A 记录为1 )。
    • 如果解析失败,它会打印错误并返回None, None 。

parse_response(data)

目的:

解析 DNS 响应数据包以提取详细信息,例如答案、权限和附加记录。

参数:

  • data :包含 DNS 响应数据包的二进制字符串。

过程:

  1. 辅助功能:

    • parse_name(data, offset) :
      • 使用标签和指针解析数据包中的域名。
    • parse_soa(data, offset) :
      • 提取 SOA(授权开始)记录详细信息。
  2. 标头解析:

    • 提取计数:
      • 问题( QDCOUNT )。
      • 答案( ANCOUNT )。
      • 权限记录 ( NSCOUNT )。
      • 附加记录 ( ARCOUNT )。
  3. 问题部分:

    • 跳过问题部分(域名、查询类型和类别)。
  4. 资源记录:

    • 使用辅助函数解析答案、权威和其他部分:
      • A 记录:IPv4 地址。
      • AAAA 记录:IPv6 地址。
      • CNAME :规范名称。
      • SOA :权威服务器详细信息。
      • MX :邮件交换服务器。
      • TXT :文本记录。
      • NS :名称服务器。
  5. 返回:

    • 返回包含解析部分( answers 、 authority 、 additional )的字典。

query_upstream(domain, query_type)

目的:

将 DNS 查询转发到上游 DNS 服务器进行解析。

参数:

  • domain :要解析的域名。
  • query_type :DNS 查询的类型(例如,A、AAAA)。

过程:

  1. 构建查询:

    • 调用build_query构造指定域和类型的DNS查询报文。
  2. 创建套接字:

    • 使用socket.socket(socket.AF_INET, socket.SOCK_DGRAM)创建 UDP 套接字。
    • 设置 5 秒超时以避免无限期挂起。
  3. 发送查询:

    • 将查询数据包发送到上游 DNS 服务器(端口53默认为9.9.9.9 )。
  4. 收到回复:

    • 等待来自服务器的响应数据包(最多 512 字节)。
  5. 关闭套接字:

    • 关闭套接字以释放资源。
  6. 返回:

    • 返回原始响应数据包以供进一步处理。

服务器逻辑

def handle_client(server_socket): try: data, client_addr = server_socket.recvfrom(512) domain, query_type = parse_query(data) if blocklist_cache.is_blocked(domain): response = get_blocked_response(client_transaction_id) else: cached_response = dns_cache.get(f"{domain}:{query_type}") response = cached_response or query_upstream(domain, query_type) server_socket.sendto(response, client_addr) except Exception as e: print(f"Error handling client: {e}")
  • 处理传入的查询:
    • 阻止阻止列表中的域。
    • 从缓存或上游 DNS 检索响应。
 def start_server(): print("Initializing blocklist cache...") blocklist_cache.update_blocklists() server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.bind(('0.0.0.0', 853)) print("DNS server running on port 853...") while True: handle_client(server_socket)
  • 在端口 853 上启动 DNS 服务器并初始化阻止列表缓存。

在此处阅读有关上游 9.9.9.9 的更多信息。

在此处下载完整的程序。

原文: https://xer0x.in/dns-server-in-python/

本站文章系自动翻译,站长会周期检查,如果有不当内容,请点此留言,非常感谢。
  • Abhinav
  • Abigail Pain
  • Adam Fortuna
  • Alberto Gallego
  • Alex Wlchan
  • Answer.AI
  • Arne Bahlo
  • Ben Carlson
  • Ben Kuhn
  • Bert Hubert
  • Bits about Money
  • Brian Krebs
  • ByteByteGo
  • Chip Huyen
  • Chips and Cheese
  • Christopher Butler
  • Cool Infographics
  • Dan Sinker
  • David Walsh
  • Dmitry Dolzhenko
  • Elad Gil
  • Ellie Huxtable
  • Ethan Marcotte
  • Exponential View
  • FAIL Blog
  • Founder Weekly
  • Geoffrey Huntley
  • Geoffrey Litt
  • Greg Mankiw
  • Henrique Dias
  • Hypercritical
  • IEEE Spectrum
  • Investment Talk
  • Jaz
  • Jeff Geerling
  • Jonas Hietala
  • Josh Comeau
  • Lenny Rachitsky
  • Lou Plummer
  • Luke Wroblewski
  • Matt Stoller
  • Mert Bulan
  • Mostly metrics
  • News Letter
  • NextDraft
  • Non_Interactive
  • Not Boring
  • One Useful Thing
  • Phil Eaton
  • Product Market Fit
  • Readwise
  • ReedyBear
  • Robert Heaton
  • Ruben Schade
  • Sage Economics
  • Sam Altman
  • Sam Rose
  • selfh.st
  • Shtetl-Optimized
  • Simon schreibt
  • Slashdot
  • Small Good Things
  • Taylor Troesh
  • Telegram Blog
  • The Macro Compass
  • The Pomp Letter
  • thesephist
  • Thinking Deep & Wide
  • Tim Kellogg
  • 英文媒体
  • 英文推特
  • 英文独立博客
©2025 搞英语 → 看世界 | Design: Newspaperly WordPress Theme