mirror of
https://github.com/GNS3/gns3-gui.git
synced 2026-05-17 00:46:01 +03:00
257 lines
11 KiB
Python
257 lines
11 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Copyright (C) 2015 GNS3 Technologies Inc.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import sys
|
|
import tarfile
|
|
import os
|
|
import shutil
|
|
import json
|
|
import re
|
|
|
|
|
|
from gns3.utils import parse_version
|
|
from gns3 import version
|
|
from gns3.qt import QtNetwork, QtCore, QtWidgets, QtGui, qslot
|
|
from gns3.local_config import LocalConfig
|
|
|
|
|
|
import logging
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class UpdateManager(QtCore.QObject):
|
|
|
|
"""
|
|
Manage application updates
|
|
"""
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__()
|
|
|
|
if sys.platform.startswith("win"):
|
|
root = os.path.join(os.path.expandvars("%APPDATA%"), "GNS3")
|
|
else:
|
|
root = os.path.dirname(sys.executable)
|
|
self._update_directory = os.path.join(root, 'updates')
|
|
self._package_directory = os.path.join(root, 'site-packages')
|
|
self._network_manager = None
|
|
|
|
def isDevVersion(self):
|
|
"""
|
|
:returns: Boolean. True if it's a dev build. False it's a release build
|
|
"""
|
|
if version.__version_info__[3] != 0:
|
|
return True
|
|
return False
|
|
|
|
def _get(self, url, finished_slot, user_attribute=None):
|
|
"""
|
|
HTTP get
|
|
|
|
:param url: Url to download
|
|
:param user_attribute: Param to pass to the finished slot
|
|
:returns: QNetworkReply
|
|
"""
|
|
if self._network_manager is None:
|
|
self._network_manager = QtNetwork.QNetworkAccessManager()
|
|
request = QtNetwork.QNetworkRequest(QtCore.QUrl(url))
|
|
request.setRawHeader(b'User-Agent', b'GNS3 Check For Update')
|
|
request.setAttribute(QtNetwork.QNetworkRequest.Attribute.User, user_attribute)
|
|
reply = self._network_manager.get(request)
|
|
reply.finished.connect(finished_slot)
|
|
log.debug('Download %s', url)
|
|
return reply
|
|
|
|
def checkForUpdate(self, parent, silent):
|
|
"""
|
|
Check for update. Start by asking PyPi for minor upgrade
|
|
and next GNS3 for major upgrade.
|
|
|
|
:param parent: Parent Windows
|
|
:param silent: Display or not the notifications
|
|
"""
|
|
self._silent = silent
|
|
self._parent = parent
|
|
|
|
if not hasattr(sys, "frozen") and LocalConfig.instance().experimental():
|
|
url = 'https://pypi.org/pypi/gns3-gui/json'
|
|
self._get(url, self._pypiReplySlot)
|
|
else:
|
|
self._get('http://update.gns3.net', self._gns3UpdateReplySlot)
|
|
|
|
@qslot
|
|
def _gns3UpdateReplySlot(self):
|
|
network_reply = self.sender()
|
|
if network_reply is None:
|
|
return
|
|
if network_reply.error() != QtNetwork.QNetworkReply.NetworkError.NoError:
|
|
if not self._silent:
|
|
QtWidgets.QMessageBox.critical(self._parent, "Check For Update", "Cannot check for update: {}".format(network_reply.errorString()))
|
|
return
|
|
try:
|
|
latest_release = bytes(network_reply.readAll()).decode("utf-8").rstrip()
|
|
except UnicodeDecodeError:
|
|
log.debug("Invalid answer from the update server")
|
|
return
|
|
if re.match(r"^[a-z0-9\.]+$", latest_release) is None:
|
|
log.debug("Invalid answer from the update server")
|
|
return
|
|
if parse_version(version.__version__) < parse_version(latest_release):
|
|
reply = QtWidgets.QMessageBox.question(self._parent,
|
|
"Check For Update",
|
|
"Newer GNS3 version {} is available, do you want to visit our website to download it?".format(latest_release),
|
|
QtWidgets.QMessageBox.StandardButton.Yes,
|
|
QtWidgets.QMessageBox.StandardButton.No)
|
|
if reply == QtWidgets.QMessageBox.StandardButton.Yes:
|
|
QtGui.QDesktopServices.openUrl(QtCore.QUrl("http://www.gns3.com/software"))
|
|
elif not self._silent:
|
|
QtWidgets.QMessageBox.information(self._parent, "Check For Update", "GNS3 is up-to-date!")
|
|
|
|
def _pypiReplySlot(self):
|
|
network_reply = self.sender()
|
|
if network_reply.error() != QtNetwork.QNetworkReply.NetworkError.NoError:
|
|
if not self._silent:
|
|
QtWidgets.QMessageBox.critical(self._parent, "Check For Update", "Cannot check for update: {}".format(network_reply.errorString()))
|
|
return
|
|
try:
|
|
body = bytes(network_reply.readAll()).decode("utf-8")
|
|
body = json.loads(body)
|
|
except (UnicodeEncodeError, ValueError) as e:
|
|
log.warning("Invalid answer from the PyPi server: {}".format(e))
|
|
QtWidgets.QMessageBox.critical(self._parent, "Check For Update", "Invalid answer from PyPi server")
|
|
return
|
|
|
|
last_version = self._getLastMinorVersionFromPyPiReply(body)
|
|
if parse_version(last_version) > parse_version(version.__version__):
|
|
reply = QtWidgets.QMessageBox.question(self._parent,
|
|
"Check For Update",
|
|
"Newer GNS3 version {} is available, do you want to to download it in background and install it at next application launch?".format(last_version),
|
|
QtWidgets.QMessageBox.StandardButton.Yes,
|
|
QtWidgets.QMessageBox.StandardButton.No)
|
|
if reply == QtWidgets.QMessageBox.StandardButton.Yes:
|
|
try:
|
|
self.downloadUpdates(last_version)
|
|
except OSError as e:
|
|
QtWidgets.QMessageBox.critical(self._parent, "Check For Update", "Cannot download update: {}".format(e))
|
|
else:
|
|
self._get('http://update.gns3.net', self._gns3UpdateReplySlot)
|
|
|
|
def _getLastMinorVersionFromPyPiReply(self, body):
|
|
"""
|
|
Return the most recent minor version for this release
|
|
from a PyPi answer.
|
|
|
|
If no valid version is found it's return the current.
|
|
"""
|
|
|
|
current_version = parse_version(version.__version__)
|
|
for release in sorted(body['releases'].keys(), reverse=True):
|
|
release_version = parse_version(release)
|
|
if release_version[-1:][0] == "final":
|
|
if self.isDevVersion():
|
|
continue
|
|
else:
|
|
if not self.isDevVersion():
|
|
continue
|
|
if release_version > current_version and release_version[:2] == current_version[:2]:
|
|
return release
|
|
return version.__version__
|
|
|
|
def downloadUpdates(self, version):
|
|
"""
|
|
Download updates from PyPi to disk
|
|
|
|
:param version: The version to download
|
|
"""
|
|
log.debug('Download updates to %s', self._package_directory)
|
|
os.makedirs(self._update_directory, exist_ok=True)
|
|
self._filesToDownload = 2
|
|
url = 'https://pypi.python.org/packages/source/g/gns3-server/gns3-server-{}.tar.gz'.format(version)
|
|
self._get(url, self._fileDownloadedSlot, user_attribute=os.path.join(self._update_directory, 'gns3-server.tar.gz'))
|
|
url = 'https://pypi.python.org/packages/source/g/gns3-gui/gns3-gui-{}.tar.gz'.format(version)
|
|
self._get(url, self._fileDownloadedSlot, user_attribute=os.path.join(self._update_directory, 'gns3-gui.tar.gz'))
|
|
|
|
def _fileDownloadedSlot(self):
|
|
network_reply = self.sender()
|
|
file_path = network_reply.request().attribute(QtNetwork.QNetworkRequest.Attribute.User)
|
|
if network_reply.error() == QtNetwork.QNetworkReply.NetworkError.NoError:
|
|
log.debug('File downloaded %s', file_path)
|
|
with open(file_path, 'wb+') as f:
|
|
f.write(network_reply.readAll())
|
|
self._filesToDownload -= 1
|
|
if self._filesToDownload == 0:
|
|
reply = QtWidgets.QMessageBox.question(self._parent,
|
|
"Check For Update",
|
|
"GNS3 upgrade downloaded do you want to quit the application?",
|
|
QtWidgets.QMessageBox.StandardButton.Yes,
|
|
QtWidgets.QMessageBox.StandardButton.No)
|
|
if reply == QtWidgets.QMessageBox.StandardButton.Yes:
|
|
QtWidgets.QApplication.instance().closeAllWindows()
|
|
else:
|
|
log.debug('Error when downloading %s', file_path)
|
|
network_reply.deleteLater()
|
|
|
|
def installDownloadedUpdates(self):
|
|
"""
|
|
If update have been downloaded and
|
|
ready for install we process the install
|
|
|
|
:returns: Boolean True if an update is installed
|
|
"""
|
|
|
|
if os.path.exists(self._update_directory):
|
|
os.makedirs(self._package_directory, exist_ok=True)
|
|
|
|
gui_tgz = os.path.join(self._update_directory, 'gns3-gui.tar.gz')
|
|
self._extractTgz(gui_tgz)
|
|
server_tgz = os.path.join(self._update_directory, 'gns3-server.tar.gz')
|
|
self._extractTgz(server_tgz)
|
|
shutil.rmtree(self._update_directory, ignore_errors=True)
|
|
return True
|
|
return False
|
|
|
|
def _extractTgz(self, tgz):
|
|
if os.path.exists(tgz):
|
|
log.info('Extract update %s', tgz)
|
|
with tarfile.open(tgz, 'r:gz') as tar:
|
|
# Tar add a folder with the name of archive in first position
|
|
# we need to drop it
|
|
members = tar.getmembers()[1:]
|
|
for member in members:
|
|
# Path separator is always / even on windows
|
|
member.name = member.name.split("/", 1)[1]
|
|
def is_within_directory(directory, target):
|
|
|
|
abs_directory = os.path.abspath(directory)
|
|
abs_target = os.path.abspath(target)
|
|
|
|
prefix = os.path.commonprefix([abs_directory, abs_target])
|
|
|
|
return prefix == abs_directory
|
|
|
|
def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
|
|
|
|
for member in tar.getmembers():
|
|
member_path = os.path.join(path, member.name)
|
|
if not is_within_directory(path, member_path):
|
|
raise Exception("Attempted Path Traversal in Tar File")
|
|
|
|
tar.extractall(path, members, numeric_owner=numeric_owner)
|
|
|
|
safe_extract(tar, path=self._package_directory, members=members)
|