ratanon.com - developer

packet sniffer

วันนี้ลองเขียน tools ตัวนึงที่เอาไว้แก้ปัญหาเชิงลึกสำหรับหาเคส issue ยากๆ เช่น ลูกค้าบอกข้อมูลติดที่เรานะ ฮ่ะ.. อะไรนะ เป็นที่ผมจริงหรอ 555 คำถามในหัวเกิดขึ้นทันที ปกติจะใช้โปรแกรมมาดักช่วย รอบนี้เขียนดักมันซะเอง มาดูกันว่าทำไงบ้าง

.

รวบรัดไปดู code

  • ใช้งานคู่กับ npcap-1.79.exe
  • โหลดมาติดตั้งลงเครื่องก่อนเลย
  • pip lib ให้เรียบร้อย
  • code ด้านล่าง แก้ ip เครื่องที่จะทำ packet sniffer ด้วย


import os
import socket
import requests
from datetime import datetime
from scapy.all import sniff, conf, hexdump, Raw
from PySide6.QtCore import QThread, Signal, QTimer
from PySide6.QtWidgets import QApplication, QMainWindow, QTextEdit

# ฟังก์ชันตรวจสอบประเทศจาก IP โดยใช้ ipinfo.io
def get_ip_info(ip):
    if ip.startswith("192.") or ip.startswith("10.") or ip.startswith("172."):
        return "[Local]"
    try:
        response = requests.get(f"http://ipinfo.io/{ip}/json")
        response.raise_for_status()
        country = response.json().get("country", "Unknown")
        return f"[{country}]"
    except requests.RequestException:
        return "[Unknown]"

# ฟังก์ชันแปลงข้อมูล payload ให้เป็นรูปแบบ hexdump
def format_hexdump(data):
    hex_str = hexdump(data, dump=True)
    lines = hex_str.split('\n')
    formatted_lines = [f"{i*16:04X}  {line}" for i, line in enumerate(lines)]
    return '\n'.join(formatted_lines)

class PacketSnifferThread(QThread):
    packet_received = Signal(str)

    def run(self):
        conf.L3socket = conf.L3socket  # ใช้ Layer 3 socket แทน Layer 2
        sniff(prn=self.process_packet, store=False)

    def process_packet(self, packet):
        if packet.haslayer('IP') and packet.haslayer('TCP'):
            src_ip = packet['IP'].src
            dst_ip = packet['IP'].dst
            if src_ip == "34.117.186.192" or dst_ip == "34.117.186.192":
                return  # Skip packets from or to ipinfo.io to avoid looping
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            src_port = packet['TCP'].sport
            dst_port = packet['TCP'].dport
            seq_num = packet['TCP'].seq
            ack_num = packet['TCP'].ack
            flags = packet['TCP'].flags
            ttl = packet['IP'].ttl
            proto = packet['IP'].proto

            # ดึงข้อมูล payload
            raw_payload = packet[Raw].load if packet.haslayer(Raw) else b'No Payload'
            formatted_payload = format_hexdump(raw_payload)

            src_info = get_ip_info(src_ip)
            dst_info = get_ip_info(dst_ip)
            packet_info = (
                f"{timestamp} Source IP {src_info} {src_ip}, Destination IP {dst_info} {dst_ip}\n"
                f"Source Port: {src_port}, Destination Port: {dst_port}\n"
                f"Sequence Number: {seq_num}, Acknowledgment Number: {ack_num}\n"
                f"Flags: {flags}, TTL: {ttl}, Protocol: {proto}\n"
                f"Payload:\n{formatted_payload}\n"
                "------------------------------------------------------"
            )
            self.packet_received.emit(packet_info)

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Packet Sniffer")
        self.setGeometry(100, 100, 600, 400)

        self.text_area = QTextEdit(self)
        self.text_area.setStyleSheet("background-color: black; color: green;")
        self.setStyleSheet("background-color: black;")

        self.setCentralWidget(self.text_area)

        self.sniffer_thread = PacketSnifferThread()
        self.sniffer_thread.packet_received.connect(self.update_packet_info)
        self.sniffer_thread.start()

        self.current_date = datetime.now().strftime('%Y%m%d')
        self.log_file = open(f"{self.current_date}-log.txt", "a")

        self.timer = QTimer(self)
        self.timer.timeout.connect(self.check_date)
        self.timer.start(60000)  # Check every minute

    def update_packet_info(self, packet_info):
        self.text_area.append(packet_info)
        self.log_file.write(packet_info + '\n')
        self.log_file.flush()

    def check_date(self):
        new_date = datetime.now().strftime('%Y%m%d')
        if new_date != self.current_date:
            self.log_file.close()
            self.current_date = new_date
            self.log_file = open(f"{self.current_date}-sniffer.txt", "a")

    def closeEvent(self, event):
        self.sniffer_thread.terminate()
        self.log_file.close()
        super().closeEvent(event)

if __name__ == "__main__":
    app = QApplication([])
    window = MainWindow()
    window.show()
    app.exec()


รันก็จะได้ตามรูป เพิ่มเติมคือเขียน log ไว้ดูด้วย..

หากันตาแตก 555 ไว้รอบหน้าจะพามาเขียนระบบกรอง ถอดรหัสแบบละเอียดไว้ จะได้หาปัญหากันง่ายๆ

0
16