0%

基于 BPF 的 Python Raw Socket 实现

Linux 上通过基于 BPF 的 Python Raw Socket 实现模拟 tcpdump 的抓包,在内核态通过 BPF 字节码过滤,在用户态对包解析处理。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import socket
import struct
import ctypes

# 1. 定义常数(Linux 专用内核参数)
SOL_SOCKET = 1
SO_ATTACH_FILTER = 26

# 2. 准备 BPF 字节码
# 以下是通过 `tcpdump -dd tcp port 8000` 生成的过滤指令,只有匹配该规则的报文才会被送到用户态
# tcpdump -dd tcp port 8000 | tr '{}' '()'
bpf_instructions = [
(0x28, 0, 0, 0x0000000C),
(0x15, 0, 6, 0x000086DD),
(0x30, 0, 0, 0x00000014),
(0x15, 0, 15, 0x00000006),
(0x28, 0, 0, 0x00000036),
(0x15, 12, 0, 0x00001F40),
(0x28, 0, 0, 0x00000038),
(0x15, 10, 11, 0x00001F40),
(0x15, 0, 10, 0x00000800),
(0x30, 0, 0, 0x00000017),
(0x15, 0, 8, 0x00000006),
(0x28, 0, 0, 0x00000014),
(0x45, 6, 0, 0x00001FFF),
(0xB1, 0, 0, 0x0000000E),
(0x48, 0, 0, 0x0000000E),
(0x15, 2, 0, 0x00001F40),
(0x48, 0, 0, 0x00000010),
(0x15, 0, 1, 0x00001F40),
(0x6, 0, 0, 0x00040000),
(0x6, 0, 0, 0x00000000),
]


def apply_bpf_filter(sock, ins_list):
"""
将 BPF 指令数组打包,并通过 setsockopt 附加到 raw socket 上
"""
num_ins = len(ins_list)

# 每一个 BPF 指令 (sock_filter) 在 Linux 中占用 8 字节:
# u_short (2B) code, u_char (1B) jt, u_char (1B) jf, u_int (4B) k
bpf_format = "HBBI" # 对应 C 结构体的内存布局

# 打包所有过滤指令
packed_instructions = b""
for code, jt, jf, k in ins_list:
packed_instructions += struct.pack(bpf_format, code, jt, jf, k)

# 获取打包后的过滤程序内存地址 (模拟 C 的 sock_fprog 结构体指针)
bpf_buffer = ctypes.create_string_buffer(packed_instructions)
buffer_addr = ctypes.addressof(bpf_buffer)

# 打包成 Linux 内核所需的 sock_fprog 结构体:
# unsigned short len (2 字节) + 填充(2/6 字节对齐) + unsigned long filter 指针 (8 字节)
# 在 64 位系统上通常是 'HP' 格式 (unsigned short + pointer)
sock_fprog = struct.pack("HP", num_ins, buffer_addr)

# 将 BPF 过滤器注入内核
sock.setsockopt(SOL_SOCKET, SO_ATTACH_FILTER, sock_fprog)


class Sniffer:
def __init__(self, interface=None):
# 创建链路层的原始套接字 (ETH_P_ALL 接收所有进入网卡的以太网帧)
# socket.ntohs(0x0003) 代表 ETH_P_ALL
self.sock = socket.socket(
socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003)
)
if interface:
self.sock.bind((interface, 0))
apply_bpf_filter(self.sock, bpf_instructions)

def start(self):
print("开始捕获 TCP 80 端口的数据包...")
try:
while True:
packet, addr = self.sock.recvfrom(65535)
print(f"捕获到报文!来自接口: {addr[0]},长度: {len(packet)} 字节")
print(packet[:40].hex())
print("-" * 50)
except KeyboardInterrupt:
print("\n捕获中止。")
finally:
self.sock.close()


def main():
# 创建链路层的原始套接字 (ETH_P_ALL 接收所有进入网卡的以太网帧)
# socket.ntohs(0x0003) 代表 ETH_P_ALL
try:
s = Sniffer(interface=None) # 可选绑定到特定网卡,如 'eth0'
except PermissionError:
print("错误:此程序必须以 root 权限 (sudo) 运行!")
return

# 应用 BPF 过滤器
print("正在向内核注入 BPF 过滤器...")
s.start()


if __name__ == "__main__":
main()