From 35747c0bef2b7c0f789f704d00d9a3311f6f666c Mon Sep 17 00:00:00 2001 From: DarthSidious007 Date: Tue, 20 May 2025 20:15:26 +0300 Subject: [PATCH] v3.2.0 - grafana part deleted - dynamic metric's labels added - global refactoring --- Makefile | 2 +- README.md | 4 +- exporter.py | 518 +++++++++++++++++++++++++--------------------------- 3 files changed, 249 insertions(+), 275 deletions(-) diff --git a/Makefile b/Makefile index 73390d7..097989f 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: all ci docker_build docker_retag docker_login docker_push -VERSION := 3.1.0 +VERSION := 3.2.0 PROJECT_NAME ?= amneziavpn/amneziawg-exporter DOCKER_BUILDKIT ?= 1 DOCKER_REGISTRY ?= docker.io diff --git a/README.md b/README.md index 9717bd7..63426f3 100644 --- a/README.md +++ b/README.md @@ -37,9 +37,7 @@ The following environment variables can be used to configure amneziawg-exporter. | AWG_EXPORTER_METRICS_FILE | /tmp/prometheus/awg.prom | Path to the metrics file for Node exporter textfile collector. | | AWG_EXPORTER_OPS_MODE | http | Operation mode for the exporter (`http`, `metricsfile`, `oneshot` or `grafana_cloud`). | | AWG_EXPORTER_AWG_SHOW_EXEC | "awg show all dump" | Command to run the `awg show` command. | -| AWG_GRAFANA_WRITE_URL | | URL for sending metrics to Grafana Cloud (for `grafana_cloud` mode). | -| AWG_GRAFANA_WRITE_TOKEN | | Authorization token for Grafana Cloud (for `grafana_cloud` mode). | -| AWG_GRAFANA_ADDITIONAL_LABELS | | Additional labels to add when sending metrics to Grafana Cloud. | +| AWG_EXPORTER_LABEL_* | | Additional labels to add to each metric (`AWG_EXPORTER_LABEL_(.*)` - lowercase key by this regexp) | | AWG_EXPORTER_REDIS_HOST | localhost | Redis server host to store peers data | | AWG_EXPORTER_REDIS_PORT | 6379 | Redis server port to store peers data | | AWG_EXPORTER_REDIS_DB | 0 | Redis server db number to store peers data | diff --git a/exporter.py b/exporter.py index dbee5b7..44b9dec 100755 --- a/exporter.py +++ b/exporter.py @@ -2,6 +2,7 @@ import logging import sys +import os import time import subprocess import signal @@ -9,85 +10,110 @@ import argparse import redis from decouple import Config, RepositoryEnv, RepositoryEmpty from datetime import datetime, timedelta -from prometheus_client import start_http_server, CollectorRegistry, Gauge, write_to_textfile, generate_latest -from prometheus_client.parser import text_string_to_metric_families -import requests +from prometheus_client import start_http_server, CollectorRegistry, Gauge, write_to_textfile +from dataclasses import dataclass, asdict class MyLogger: - """ - A simple wrapper around Python's logging module to set up loggers with stdout and stderr handlers. + """Custom logger that outputs INFO messages to stdout and ERROR messages to stderr.""" - Parameters: - name (str): The name of the logger. - level (int): The logging level (default is logging.INFO). - """ def __init__(self, name: str, level=logging.INFO): + """ + Initialize the logger. + + Args: + name (str): Name of the logger. + level (int): Logging level (default is logging.INFO). + """ self.logger = logging.getLogger(name) self.logger.setLevel(level) - formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s') - stdout_handler = logging.StreamHandler(sys.stdout) - stdout_handler.setLevel(logging.INFO) - stdout_handler.setFormatter(formatter) - stderr_handler = logging.StreamHandler(sys.stderr) - stderr_handler.setLevel(logging.ERROR) - stderr_handler.setFormatter(formatter) - self.logger.addHandler(stdout_handler) - self.logger.addHandler(stderr_handler) + if not self.logger.hasHandlers(): + formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s') + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setLevel(logging.INFO) + stdout_handler.setFormatter(formatter) + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.setLevel(logging.ERROR) + stderr_handler.setFormatter(formatter) + self.logger.addHandler(stdout_handler) + self.logger.addHandler(stderr_handler) + + +class ReConfig(Config): + """Extended Config class to support environment variable discovery by prefix.""" + + def find(self, regex): + """ + Find environment variables starting with a specific prefix. + + Args: + regex (str): Prefix to filter environment variables. + + Returns: + dict: Filtered environment variables. + """ + return {k: v for k, v in os.environ.items() if k.startswith(regex)} class Decouwrapper: - """ - A wrapper class providing access to configuration options. + """Wrapper for Decouple's Config to support dynamic discovery and loading.""" - This class reads configuration options from a file specified by the `--envfile` argument - or uses an empty repository if the argument is not provided. - - Attributes: - __config (dict): A dictionary storing the configuration options. - """ - def __init__(self): - self.__config = {} - self.__read_config() - - def __read_config(self): + def __init__(self, envfile: str = None): """ - Reads configuration options from the file specified by the `--envfile` argument. - - If the `--envfile` argument is not provided, configuration options are fetched from the system environment. - """ - parser = argparse.ArgumentParser(description='AWG exporter options') - parser.add_argument('--envfile', type=str, help='Path to config.env file') - if parser.parse_args().envfile is None: - repository = RepositoryEmpty() - else: - repository = RepositoryEnv(parser.parse_args().envfile) - self.__config = Config(repository) - - def __call__(self, *args, **kwargs): - """ - Provides access to configuration options via the Config object. + Initialize the wrapper with an optional .env file. Args: - *args: Variable length argument list for configuration key. - **kwargs: Arbitrary keyword arguments for default values. + envfile (str): Path to the .env file. + """ + repository = RepositoryEnv(envfile) if envfile else RepositoryEmpty() + self.__config = ReConfig(repository) + + def discovery(self, regex): + """ + Discover environment variables with a given prefix, returning them in lowercase without prefix. + + Args: + regex (str): Prefix string to search. Returns: - str: The configuration value for the given key. + dict: Discovered key-value pairs with prefix removed. """ - return self.__config.get(*args, **kwargs) + discovered = self.__config.find(regex) + prefix_len = len(regex) + return {key[prefix_len:].lower(): value for key, value in discovered.items()} + + def get(self, key, default=None): + """ + Get a configuration value. + + Args: + key (str): Configuration key. + default: Default value if the key is not found. + + Returns: + str or default: Retrieved value or default. + """ + return self.__config.get(key, default) + + +@dataclass +class ExporterConfig: + """Dataclass for holding configuration options for the exporter.""" + + scrape_interval: int + http_port: int + addr: str + metrics_file: str + ops_mode: str + awg_executable: str + redis_host: str + redis_port: int + redis_db: int + extra_labels: dict class PersistenceWrapper: - """ - A wrapper for interacting with Redis to maintain active user statistics. - - Attributes: - connection (redis.Redis): The Redis connection object. - mau (int): Monthly active users, calculated based on recent activity. - dau (int): Daily active users, calculated based on recent activity. - online (int): Users currently online (last 5 minutes). - """ + """Handles Redis-based persistence for tracking peer activity over time.""" FIVE_MINUTES = timedelta(minutes=5) ONE_DAY = timedelta(days=1) @@ -95,282 +121,232 @@ class PersistenceWrapper: def __init__(self, host: str, port: int, db: int): """ - Initializes the Redis connection and user activity counters. + Initialize Redis connection and set up metrics counters. Args: - host (str): Redis server hostname. Defaults to 'localhost'. - port (int): Redis server port. Defaults to 6379. - db (int): Redis database number. Defaults to 0. + host (str): Redis host. + port (int): Redis port. + db (int): Redis database number. """ self.log = MyLogger(self.__class__.__name__).logger self.connection = redis.Redis(host=host, port=port, db=db, decode_responses=True) - self.log.info('Redis storage initialized') - self.dau: int = 0 - self.mau: int = 0 - self.mau_abs: int = 0 - self.online: int = 0 - self.current_month: str = '' + try: + self.connection.ping() + except redis.ConnectionError: + self.log.error("Redis connection failed.") + raise + self.dau = self.mau = self.mau_abs = self.online = 0 + self.current_month = '' - def update_peer(self, peer: str, handshake_time: int) -> None: + def update_peer(self, peer: str, handshake_time: int): """ - Updates the last handshake time for a specific peer in Redis. + Update a peer's last handshake time in Redis. Args: - peer (str): Unique identifier for the peer. - handshake_time (int): Timestamp of the handshake event. + peer (str): Peer identifier. + handshake_time (int): Timestamp of the latest handshake. """ try: self.connection.set(peer, handshake_time) except redis.RedisError as e: self.log.error(f"Error updating peer {peer}: {e}") - def recalculate(self) -> None: + def recalculate(self): """ - Recalculates the MAU, DAU, and online user counts by iterating over all peer entries - in Redis and comparing their handshake times to current time thresholds. - - MAU (Monthly Active Users): Users active within the last 30 days. - DAU (Daily Active Users): Users active within the last 24 hours. - Online: Users active within the last 5 minutes. + Recalculate activity metrics (DAU, MAU, online peers) based on stored timestamps. """ + now = datetime.now() + five_minutes_ago = (now - self.FIVE_MINUTES).timestamp() + day_ago = (now - self.ONE_DAY).timestamp() + month_ago = (now - self.ONE_MONTH).timestamp() + first_day_of_month = datetime(now.year, now.month, 1).timestamp() + counts = dict(mau_abs=0, mau=0, dau=0, online=0) try: - now = datetime.now() - five_minutes_ago = (now - self.FIVE_MINUTES).timestamp() - day_ago = (now - self.ONE_DAY).timestamp() - month_ago = (now - self.ONE_MONTH).timestamp() - first_day_of_month = datetime(now.year, now.month, 1).timestamp() - mau_abs_count = 0 - mau_count = 0 - dau_count = 0 - online_count = 0 for peer in self.connection.keys(): - handshake_time = self.connection.get(peer) - if handshake_time: - handshake_time = float(handshake_time) - if handshake_time >= first_day_of_month: - mau_abs_count += 1 - if handshake_time >= month_ago: - mau_count += 1 - if handshake_time >= day_ago: - dau_count += 1 - if handshake_time >= five_minutes_ago: - online_count += 1 - self.mau_abs = mau_abs_count - self.mau = mau_count - self.dau = dau_count - self.online = online_count - self.current_month = f"{now.strftime('%Y')}-{now.strftime('%m')}" + ts = self.connection.get(peer) + if ts: + ts = float(ts) + if ts >= first_day_of_month: counts['mau_abs'] += 1 + if ts >= month_ago: counts['mau'] += 1 + if ts >= day_ago: counts['dau'] += 1 + if ts >= five_minutes_ago: counts['online'] += 1 + self.mau_abs, self.mau, self.dau, self.online = counts.values() + self.current_month = f"{now:%Y-%m}" except redis.RedisError as e: - self.log.error(f"Error recalculating active users: {e}") + self.log.error(f"Error during recalculation: {e}") + + + def __getitem__(self, key): + """ + Allow dictionary-style access to internal metric attributes. + + Args: + key (str): One of 'dau', 'mau', 'mau_abs', 'online'. + + Returns: + int: Corresponding metric value. + """ + if key in ['dau', 'mau', 'mau_abs', 'online']: + return getattr(self, key) + raise KeyError(f"Invalid key: {key}") class AwgShowWrapper: - """ - A wrapper class providing utility methods for parsing output from the 'awg show' command. - - This class includes static methods for parsing text blocks into structured data and running 'awg show' commands. - - Attributes: - None - """ + """Handles execution and parsing of the AWG binary output.""" @staticmethod def parse(text_block: str) -> list: """ - Parse a text block containing information about AmneziaWG peers into a list of dictionaries. + Parse AWG output text to extract peer info. Args: - text_block (str): The text block to parse. - """ - lines = text_block.strip().splitlines() - peers = [] - for line in lines[1:]: # exclude 1st line with host data - parts = line.split() - current_peer = {} - if len(parts) >= 6: - current_peer['peer'] = parts[4] - current_peer['latest_handshake'] = parts[5] - peers.append(current_peer) + text_block (str): Output from AWG binary. + Returns: + list: List of peers with handshake info. + """ + lines = text_block.strip().splitlines()[1:] # Skip header + peers = [] + for line in lines: + parts = line.split() + if len(parts) >= 6: + peers.append({'peer': parts[4], 'latest_handshake': parts[5]}) return peers @staticmethod def run_bin(command: list) -> str: """ - Run an 'awg show' command (or its replacement) and return the output. + Execute a shell command and return the output. Args: - command (list): The 'awg show' command to run. + command (list): Command to run as a list. Returns: - str: The output of the 'awg show' command. + str: Standard output from the command. """ log = MyLogger('AwgShowWrapper').logger try: - process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) - return process.stdout.strip() + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) + return result.stdout.strip() except subprocess.CalledProcessError as e: - log.error(f"Error: Subprocess failed with exit code {e.returncode} and stderr: {e.stderr.strip()}") - return '' - except FileNotFoundError as e: - log.error(f"{e}") - log.error('Cannot execute awg binary because of the previous exception. Exporter will not work as expected.') - return '' + log.error(f"Subprocess failed: {e.stderr.strip()}") + except FileNotFoundError: + log.error("AWG binary not found.") except Exception as e: - log.error(f"{e}") - return '' + log.error(f"Unexpected error: {e}") + return '' class Exporter: - """ - A class to handle exporting of metrics to Grafana Cloud or local storage based on configuration settings. + """Prometheus exporter that collects and exposes metrics based on AWG output.""" - Attributes: - config (dict): Configuration parameters for the exporter. - registry (CollectorRegistry): Registry to store Prometheus metrics. - storage (PersistenceWrapper): Redis storage wrapper to handle peer data persistence. - log (Logger): Logger for monitoring and debugging. - current_online_metric (Gauge): Gauge metric for tracking current online users. - dau_metric (Gauge): Gauge metric for daily active users. - mau_metric (Gauge): Gauge metric for monthly active users. - status (Gauge): Gauge metric for exporter status. - """ - - def __init__(self, config: dict) -> None: + def __init__(self, config: ExporterConfig): """ - Initializes the Exporter instance with the given configuration. + Initialize the exporter with configuration and setup metrics. Args: - config (dict): Dictionary containing exporter configuration. + config (ExporterConfig): Exporter configuration object. """ self.config = config - self.awg_show_command = self.config['awg_executable'].split(' ') self.log = MyLogger(self.__class__.__name__).logger self.registry = CollectorRegistry() - self.storage = PersistenceWrapper(self.config['redis_host'], self.config['redis_port'], self.config['redis_db']) - self.current_online_metric = Gauge('awg_current_online', 'Current online users', registry=self.registry) - self.dau_metric = Gauge('awg_dau', 'Daily active users', registry=self.registry) - self.mau_metric = Gauge('awg_mau', 'Monthly active users', registry=self.registry) - self.mau_abs_metric = Gauge('awg_mau_abs', 'Monthly active users (Absolute)', ['month'], registry=self.registry) - self.status = Gauge('awg_status', 'Exporter status. 1 - OK, 0 - not OK', registry=self.registry) - self.log.info('AmneziaWG exporter initialized') + self.storage = PersistenceWrapper(config.redis_host, config.redis_port, config.redis_db) + self.awg_show_command = config.awg_executable.split() + labels = list(config.extra_labels.keys()) + self.has_labels = True if len(labels) > 0 else False + self.metrics = { + 'online': Gauge('awg_current_online', 'Online users', labels, registry=self.registry), + 'dau': Gauge('awg_dau', 'Daily Active Users', labels, registry=self.registry), + 'mau': Gauge('awg_mau', 'Monthly Active Users', labels, registry=self.registry), + 'mau_abs': Gauge('awg_mau_abs', 'Absolute Monthly Active Users', ['month'] + labels, registry=self.registry), + 'status': Gauge('awg_status', 'Exporter status', labels, registry=self.registry) + } + self.log.info("Exporter initialized") - def sigterm_handler(self, sig, frame) -> None: - """ - Handles SIGTERM signal for graceful shutdown. - """ - self.log.info('SIGTERM received, preparing to shut down...') - sys.exit(0) + def set_metric(self, name): + if name != 'status': + value = self.storage[name] + else: + value = 1 + if self.has_labels: + if name == 'mau_abs': + self.metrics[name].labels(month=self.storage.current_month, **self.config.extra_labels).set(value) + else: + self.metrics[name].labels(**self.config.extra_labels).set(value) + else: + if name == 'mau_abs': + self.metrics[name].labels(month=self.storage.current_month).set(value) + else: + self.metrics[name].set(value) - def sigint_handler(self, sig, frame) -> None: + def update_metrics(self): """ - Handles SIGINT (Ctrl+C) signal for graceful shutdown. + Fetch the latest peer data, update Redis, and export metrics. """ - self.log.info('SIGINT (Ctrl+C) received, preparing to shut down...') - sys.exit(0) - - def update_metrics(self) -> None: - """ - Updates and recalculates metrics for online, daily, and monthly active users. - """ - try: - awg_show_result = AwgShowWrapper.run_bin(self.awg_show_command) - peers = AwgShowWrapper.parse(awg_show_result) - if not peers: - self.status.set(0) - self.current_online_metric.set(0) - return - for peer in peers: - if peer.get('latest_handshake') == "0": - continue + output = AwgShowWrapper.run_bin(self.awg_show_command) + peers = AwgShowWrapper.parse(output) + if not peers: + self.metrics['status'].labels(**self.config.extra_labels).set(0) + self.metrics['online'].labels(**self.config.extra_labels).set(0) + return + for peer in peers: + if peer.get('latest_handshake') != '0': self.storage.update_peer(peer['peer'], peer['latest_handshake']) - self.storage.recalculate() - self.dau_metric.set(self.storage.dau) - self.mau_metric.set(self.storage.mau) - self.mau_abs_metric.labels(self.storage.current_month).set(self.storage.mau_abs) - self.current_online_metric.set(self.storage.online) - self.status.set(1) - except Exception as e: - self.log.error(f"Error updating metrics: {e}") + self.storage.recalculate() + for metric in self.metrics.keys(): + self.set_metric(metric) - def send_to_grafana_cloud(self) -> None: + def run(self): """ - Sends collected metrics to Grafana Cloud. + Start the exporter based on the configured operational mode. + Supports 'http', 'metricsfile', and 'oneshot'. """ - metrics = generate_latest(self.registry) - for family in text_string_to_metric_families(metrics.decode('utf-8')): - for sample in family.samples: - name = sample.name - labels = sample.labels - value = sample.value - labels_string = ','.join([f"{key}={value}" for key, value in labels.items()]) - # Dirty hack: We might need to add some labels (usually Prometheus does this for us). - if self.config['grafana_additional_labels'] != '': - labels_string = f"{labels_string},{self.config['grafana_additional_labels']}" - response = requests.post(self.config['grafana_write_url'], - headers={"Authorization": f"Bearer {self.config['grafana_write_token']}", "Content-Type": "text/plain"}, - data=f"{name},{labels_string} value={value}") - if response.status_code != 204: - self.log.info(f"Failed to send metrics to Grafana Cloud: {response.status_code}, {response.text}") - - def validate(self) -> None: - """ - Validates the configuration, ensuring required fields are set for Grafana Cloud mode. - """ - if self.config['ops_mode'] == 'grafana_cloud': - if self.config['grafana_write_url'] == '': - self.log.error('AWG_GRAFANA_WRITE_URL variable must be set!') - sys.exit(1) - if self.config['grafana_write_token'] == '': - self.log.error('AWG_GRAFANA_WRITE_TOKEN variable must be set!') - sys.exit(1) - - def main_loop(self) -> None: - self.log.info('Starting main loop') - signal.signal(signal.SIGTERM, self.sigterm_handler) - signal.signal(signal.SIGINT, self.sigint_handler) - self.validate() - if self.config['ops_mode'] == 'http': - # Start up the server to expose the metrics. - start_http_server(port=self.config['http_port'], addr=self.config['addr'], registry=self.registry) + self.log.info("Exporter running") + signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0)) + signal.signal(signal.SIGINT, lambda s, f: sys.exit(0)) + if self.config.ops_mode == 'http': + start_http_server(self.config.http_port, addr=self.config.addr, registry=self.registry) while True: - try: - self.update_metrics() - if self.config['ops_mode'] in ['metricsfile', 'oneshot']: - write_to_textfile(self.config['metrics_file'], self.registry) - if self.config['ops_mode'] == 'oneshot': - self.log.info("Exiting after successful metrics fetch...") - break - if self.config['ops_mode'] == 'grafana_cloud': - self.send_to_grafana_cloud() - time.sleep(self.config['scrape_interval']) - except Exception as e: - self.log.error(f"{str(e)}") - time.sleep(self.config['scrape_interval']) + self.update_metrics() + if self.config.ops_mode in ['metricsfile', 'oneshot']: + write_to_textfile(self.config.metrics_file, self.registry) + if self.config.ops_mode == 'oneshot': + break + time.sleep(self.config.scrape_interval) + + +def main(): + """ + Entry point for the exporter script. Parses arguments, loads config, + initializes and runs the exporter. + """ + parser = argparse.ArgumentParser() + parser.add_argument('--envfile', type=str, help='Path to env file') + args = parser.parse_args() + raw = Decouwrapper(args.envfile) + config = ExporterConfig( + scrape_interval=int(raw.get('AWG_EXPORTER_SCRAPE_INTERVAL', 60)), + http_port=int(raw.get('AWG_EXPORTER_HTTP_PORT', 9351)), + addr=raw.get('AWG_EXPORTER_LISTEN_ADDR', '0.0.0.0'), + metrics_file=raw.get('AWG_EXPORTER_METRICS_FILE', '/tmp/prometheus/awg.prom'), + ops_mode=raw.get('AWG_EXPORTER_OPS_MODE', 'http'), + awg_executable=raw.get('AWG_EXPORTER_AWG_SHOW_EXEC', 'awg show all dump'), + redis_host=raw.get('AWG_EXPORTER_REDIS_HOST', 'localhost'), + redis_port=int(raw.get('AWG_EXPORTER_REDIS_PORT', 6379)), + redis_db=int(raw.get('AWG_EXPORTER_REDIS_DB', 0)), + extra_labels=raw.discovery('AWG_EXPORTER_EXTRA_LABEL_') + ) + + logger = MyLogger("Main").logger + logger.info("Starting Exporter") + logger.info('Exporter config:') + for key, value in asdict(config).items(): + if key == 'metrics_file' and config.ops_mode != 'metricsfile': + continue + logger.info(f"--> {key}: {value}") + exporter = Exporter(config) + exporter.run() if __name__ == '__main__': - log = MyLogger("Main").logger - log.info('Starting AmneziaWG exporter') - config = Decouwrapper() - exporter_config = { - 'scrape_interval': config('AWG_EXPORTER_SCRAPE_INTERVAL', default=60), - 'http_port': config('AWG_EXPORTER_HTTP_PORT', default=9351), - 'addr': config('AWG_EXPORTER_LISTEN_ADDR', default='0.0.0.0'), - 'metrics_file': config('AWG_EXPORTER_METRICS_FILE', default='/tmp/prometheus/awg.prom'), - 'ops_mode': config('AWG_EXPORTER_OPS_MODE', default='http'), - 'grafana_write_url': config('AWG_GRAFANA_WRITE_URL', default=''), - 'grafana_write_token': config('AWG_GRAFANA_WRITE_TOKEN', default=''), - 'grafana_additional_labels': config('AWG_GRAFANA_ADDITIONAL_LABELS', default=''), - 'awg_executable': config('AWG_EXPORTER_AWG_SHOW_EXEC', default='awg show all dump'), - 'redis_host': config('AWG_EXPORTER_REDIS_HOST', default='localhost'), - 'redis_port': config('AWG_EXPORTER_REDIS_PORT', default=6379), - 'redis_db': config('AWG_EXPORTER_REDIS_DB', default=0) - } - log.info('Exporter config:') - for key, value in exporter_config.items(): - if key == 'metrics_file' and exporter_config['ops_mode'] != 'metricsfile': - continue - log.info(f"--> {key}: {value}") - exporter = Exporter(exporter_config) - exporter.main_loop() + main()