Files
gns3-gui/gns3/packet_capture.py
2025-12-14 18:19:47 +08:00

285 lines
11 KiB
Python

# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import shlex
import subprocess
from .qt import QtWidgets
from .local_config import LocalConfig
from .settings import PACKET_CAPTURE_SETTINGS
from .dialogs.capture_dialog import CaptureDialog
from .topology import Topology
import logging
log = logging.getLogger(__name__)
class PacketCapture:
"""
This class manage packet capture, it's a singleton
"""
def __init__(self):
self._tail_process = {}
self._capture_reader_process = {}
# Auto start the capture program for this link
self._autostart = {}
Topology.instance().project_changed_signal.connect(self.killAllCapture)
def killAllCapture(self):
"""
Kill all running captures (for example when change project)
"""
for process in list(self._tail_process.values()):
try:
process.kill()
except OSError:
pass
self._tail_process = {}
self._capture_reader_process = {}
def topology(self):
from .topology import Topology
return Topology.instance()
def parent(self):
from .main_window import MainWindow
return MainWindow.instance()
def settings(self):
return LocalConfig.instance().loadSectionSettings("PacketCapture", PACKET_CAPTURE_SETTINGS)
def startCapture(self, link):
"""
Start the packet capture reader on this port
:param vm: Instance of the virtual machine
:param port: Instance of port where capture should be executed
:param file_path: capture file path on server
"""
link.updated_link_signal.connect(self._updatedLinkSlot)
if link.capturing():
QtWidgets.QMessageBox.critical(self.parent(), "Packet capture", "A capture is already running")
return
if link.sourcePort().linkType() == "Serial":
ethernet_link = False
else:
ethernet_link = True
dialog = CaptureDialog(self.parent(), link.capture_file_name(), self.settings()["command_auto_start"], ethernet_link)
if dialog.exec():
self._autostart[link] = dialog.commandAutoStart()
link.startCapture(dialog.dataLink(), dialog.fileName() + ".pcap")
def _updatedLinkSlot(self, link_id):
link = self.topology().getLink(link_id)
if link:
if link.capturing():
if self._autostart.get(link) and link not in self._tail_process:
log.debug("Starting packet capture reader for link {}".format(link.link_id()))
self.startPacketCaptureReader(link)
else:
self.stopPacketCaptureReader(link)
def stopCapture(self, link):
"""
Stop the packet capture reader on this link
:param vm: Instance of the virtual machine
:param link: Instance of link where capture should be stopped
"""
link.stopCapture()
def startPacketCaptureReader(self, link):
"""
Starts the packet capture reader.
"""
self._startPacketCommand(link, self.settings()["packet_capture_reader_command"])
def stopPacketCaptureReader(self, link):
"""
Stop the packet capture reader
"""
if link in self._tail_process:
log.debug("Stopping packet capture reader for link {}".format(link.link_id()))
try:
self._tail_process[link].kill()
except (PermissionError, OSError):
pass
del self._tail_process[link]
def startPacketCaptureAnalyzer(self, link):
"""
Starts the packet capture analyzer
"""
if link.capture_file_path() is None:
QtWidgets.QMessageBox.critical(self.parent(), "Packet Capture Analyzer", "A packet capture is not running")
return
capture_file_path = link.capture_file_path()
if not os.path.isfile(capture_file_path):
QtWidgets.QMessageBox.critical(self.parent(), "Packet Capture Analyzer", "No packet capture file found at '{}'".format(capture_file_path))
return
command = self.settings()["packet_capture_analyzer_command"]
# PCAP capture file path
command = command.replace("%c", '"' + capture_file_path + '"')
command = command.replace("{pcap_file}", '"' + capture_file_path + '"')
# Add project name
command = command.replace("%P", link.project().name())
command = command.replace("{project}", link.project().name().replace('"', '\\"'))
# Add link description
description = "{}[{}]->{}[{}]".format(link.sourceNode().name(),
link.sourcePort().name(),
link.destinationNode().name(),
link.destinationPort().name())
command = command.replace("%d", description)
command = command.replace("{link_description}", description)
if not sys.platform.startswith("win"):
command = shlex.split(command)
if len(command) == 0:
QtWidgets.QMessageBox.critical(self.parent(), "Packet Capture Analyzer", "No packet capture analyzer program configured")
return
try:
subprocess.Popen(command, env=os.environ)
except OSError as e:
QtWidgets.QMessageBox.critical(self.parent(), "Packet Capture Analyzer", "Can't start packet capture analyzer program {}".format(str(e)))
return
def packetAnalyzerAvailable(self):
command = self.settings()["packet_capture_analyzer_command"]
return command is not None and len(command) > 0
def _startPacketCommand(self, link, command):
"""
Start a command for analyzing the packets
"""
if link.capture_file_path() is None:
QtWidgets.QMessageBox.critical(self.parent(), "Packet capture", "A packet capture is not running")
return
capture_file_path = link.capture_file_path()
if not os.path.isfile(capture_file_path):
try:
open(capture_file_path, 'w+').close()
except OSError as e:
QtWidgets.QMessageBox.critical(self.parent(), "Packet capture", "Can't create packet capture file {}: {}".format(capture_file_path, str(e)))
return
if link in self._tail_process:
try:
self._tail_process[link].kill()
except (PermissionError, OSError):
# Sometimes we have condition on windows where the process is in the process to quit
pass
del self._tail_process[link]
if link in self._capture_reader_process:
try:
self._capture_reader_process[link].kill()
except (PermissionError, OSError):
pass
del self._capture_reader_process[link]
# PCAP capture file path
command = command.replace("%c", '"' + capture_file_path + '"')
command = command.replace("{pcap_file}", '"' + capture_file_path + '"')
# Add project name
command = command.replace("%P", link.project().name())
command = command.replace("{project}", link.project().name().replace('"', '\\"'))
# Add link description
description = "{} {} to {} {}".format(link.sourceNode().name(),
link.sourcePort().name(),
link.destinationNode().name(),
link.destinationPort().name())
command = command.replace("%d", description)
command = command.replace("{link_description}", description)
if "|" in command:
# live traffic capture (using tail)
command1, command2 = command.split("|", 1)
info = None
if sys.platform.startswith("win"):
# hide tail window on Windows
info = subprocess.STARTUPINFO()
info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
info.wShowWindow = subprocess.SW_HIDE
if hasattr(sys, "frozen"):
tail_path = os.path.dirname(os.path.abspath(sys.executable)) # for Popen to find tail.exe
else:
# We suppose a developer will have tail the standard GNS3 location
tail_path = "C:\\Program Files\\GNS3"
command1 = command1.replace("tail.exe", os.path.join(tail_path, "tail.exe"))
command1 = command1.strip()
command2 = command2.strip()
else:
try:
command1 = shlex.split(command1)
command2 = shlex.split(command2)
except ValueError as e:
log.error("Invalid packet capture command {}: {}".format(command, str(e)))
return
try:
self._tail_process[link] = subprocess.Popen(command1, startupinfo=info, stdout=subprocess.PIPE)
self._capture_reader_process[link] = subprocess.Popen(
command2,
stdin=self._tail_process[link].stdout,
stdout=subprocess.PIPE)
self._tail_process[link].stdout.close()
except OSError as e:
QtWidgets.QMessageBox.critical(self.parent(), "Packet capture", "Can't start packet capture program {}".format(str(e)))
return
else:
# normal traffic capture
if not sys.platform.startswith("win"):
command = shlex.split(command)
if len(command) == 0:
QtWidgets.QMessageBox.critical(self.parent(), "Packet capture", "No packet capture program configured")
return
try:
self._capture_reader_process[link] = subprocess.Popen(command)
except OSError as e:
QtWidgets.QMessageBox.critical(self.parent(), "Packet capture", "Can't start packet capture program {}".format(str(e)))
return
@staticmethod
def instance():
"""
Singleton to return only on instance of Servers.
:returns: instance of Servers
"""
if not hasattr(PacketCapture, "_instance"):
PacketCapture._instance = PacketCapture()
return PacketCapture._instance