- grafana part deleted
- dynamic metric's labels added
- global refactoring
This commit is contained in:
DarthSidious007
2025-05-20 20:15:26 +03:00
parent 8897d47d07
commit 35747c0bef
3 changed files with 249 additions and 275 deletions

View File

@@ -1,6 +1,6 @@
.PHONY: all ci docker_build docker_retag docker_login docker_push
VERSION := 3.1.0
VERSION := 3.2.0
PROJECT_NAME ?= amneziavpn/amneziawg-exporter
DOCKER_BUILDKIT ?= 1
DOCKER_REGISTRY ?= docker.io

View File

@@ -37,9 +37,7 @@ The following environment variables can be used to configure amneziawg-exporter.
| AWG_EXPORTER_METRICS_FILE | /tmp/prometheus/awg.prom | Path to the metrics file for Node exporter textfile collector. |
| AWG_EXPORTER_OPS_MODE | http | Operation mode for the exporter (`http`, `metricsfile`, `oneshot` or `grafana_cloud`). |
| AWG_EXPORTER_AWG_SHOW_EXEC | "awg show all dump" | Command to run the `awg show` command. |
| AWG_GRAFANA_WRITE_URL | | URL for sending metrics to Grafana Cloud (for `grafana_cloud` mode). |
| AWG_GRAFANA_WRITE_TOKEN | | Authorization token for Grafana Cloud (for `grafana_cloud` mode). |
| AWG_GRAFANA_ADDITIONAL_LABELS | | Additional labels to add when sending metrics to Grafana Cloud. |
| AWG_EXPORTER_LABEL_* | | Additional labels to add to each metric (`AWG_EXPORTER_LABEL_(.*)` - lowercase key by this regexp) |
| AWG_EXPORTER_REDIS_HOST | localhost | Redis server host to store peers data |
| AWG_EXPORTER_REDIS_PORT | 6379 | Redis server port to store peers data |
| AWG_EXPORTER_REDIS_DB | 0 | Redis server db number to store peers data |

View File

@@ -2,6 +2,7 @@
import logging
import sys
import os
import time
import subprocess
import signal
@@ -9,85 +10,110 @@ import argparse
import redis
from decouple import Config, RepositoryEnv, RepositoryEmpty
from datetime import datetime, timedelta
from prometheus_client import start_http_server, CollectorRegistry, Gauge, write_to_textfile, generate_latest
from prometheus_client.parser import text_string_to_metric_families
import requests
from prometheus_client import start_http_server, CollectorRegistry, Gauge, write_to_textfile
from dataclasses import dataclass, asdict
class MyLogger:
"""
A simple wrapper around Python's logging module to set up loggers with stdout and stderr handlers.
"""Custom logger that outputs INFO messages to stdout and ERROR messages to stderr."""
Parameters:
name (str): The name of the logger.
level (int): The logging level (default is logging.INFO).
"""
def __init__(self, name: str, level=logging.INFO):
"""
Initialize the logger.
Args:
name (str): Name of the logger.
level (int): Logging level (default is logging.INFO).
"""
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
stdout_handler.setFormatter(formatter)
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.ERROR)
stderr_handler.setFormatter(formatter)
self.logger.addHandler(stdout_handler)
self.logger.addHandler(stderr_handler)
if not self.logger.hasHandlers():
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
stdout_handler.setFormatter(formatter)
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.ERROR)
stderr_handler.setFormatter(formatter)
self.logger.addHandler(stdout_handler)
self.logger.addHandler(stderr_handler)
class ReConfig(Config):
"""Extended Config class to support environment variable discovery by prefix."""
def find(self, regex):
"""
Find environment variables starting with a specific prefix.
Args:
regex (str): Prefix to filter environment variables.
Returns:
dict: Filtered environment variables.
"""
return {k: v for k, v in os.environ.items() if k.startswith(regex)}
class Decouwrapper:
"""
A wrapper class providing access to configuration options.
"""Wrapper for Decouple's Config to support dynamic discovery and loading."""
This class reads configuration options from a file specified by the `--envfile` argument
or uses an empty repository if the argument is not provided.
Attributes:
__config (dict): A dictionary storing the configuration options.
"""
def __init__(self):
self.__config = {}
self.__read_config()
def __read_config(self):
def __init__(self, envfile: str = None):
"""
Reads configuration options from the file specified by the `--envfile` argument.
If the `--envfile` argument is not provided, configuration options are fetched from the system environment.
"""
parser = argparse.ArgumentParser(description='AWG exporter options')
parser.add_argument('--envfile', type=str, help='Path to config.env file')
if parser.parse_args().envfile is None:
repository = RepositoryEmpty()
else:
repository = RepositoryEnv(parser.parse_args().envfile)
self.__config = Config(repository)
def __call__(self, *args, **kwargs):
"""
Provides access to configuration options via the Config object.
Initialize the wrapper with an optional .env file.
Args:
*args: Variable length argument list for configuration key.
**kwargs: Arbitrary keyword arguments for default values.
envfile (str): Path to the .env file.
"""
repository = RepositoryEnv(envfile) if envfile else RepositoryEmpty()
self.__config = ReConfig(repository)
def discovery(self, regex):
"""
Discover environment variables with a given prefix, returning them in lowercase without prefix.
Args:
regex (str): Prefix string to search.
Returns:
str: The configuration value for the given key.
dict: Discovered key-value pairs with prefix removed.
"""
return self.__config.get(*args, **kwargs)
discovered = self.__config.find(regex)
prefix_len = len(regex)
return {key[prefix_len:].lower(): value for key, value in discovered.items()}
def get(self, key, default=None):
"""
Get a configuration value.
Args:
key (str): Configuration key.
default: Default value if the key is not found.
Returns:
str or default: Retrieved value or default.
"""
return self.__config.get(key, default)
@dataclass
class ExporterConfig:
"""Dataclass for holding configuration options for the exporter."""
scrape_interval: int
http_port: int
addr: str
metrics_file: str
ops_mode: str
awg_executable: str
redis_host: str
redis_port: int
redis_db: int
extra_labels: dict
class PersistenceWrapper:
"""
A wrapper for interacting with Redis to maintain active user statistics.
Attributes:
connection (redis.Redis): The Redis connection object.
mau (int): Monthly active users, calculated based on recent activity.
dau (int): Daily active users, calculated based on recent activity.
online (int): Users currently online (last 5 minutes).
"""
"""Handles Redis-based persistence for tracking peer activity over time."""
FIVE_MINUTES = timedelta(minutes=5)
ONE_DAY = timedelta(days=1)
@@ -95,282 +121,232 @@ class PersistenceWrapper:
def __init__(self, host: str, port: int, db: int):
"""
Initializes the Redis connection and user activity counters.
Initialize Redis connection and set up metrics counters.
Args:
host (str): Redis server hostname. Defaults to 'localhost'.
port (int): Redis server port. Defaults to 6379.
db (int): Redis database number. Defaults to 0.
host (str): Redis host.
port (int): Redis port.
db (int): Redis database number.
"""
self.log = MyLogger(self.__class__.__name__).logger
self.connection = redis.Redis(host=host, port=port, db=db, decode_responses=True)
self.log.info('Redis storage initialized')
self.dau: int = 0
self.mau: int = 0
self.mau_abs: int = 0
self.online: int = 0
self.current_month: str = ''
try:
self.connection.ping()
except redis.ConnectionError:
self.log.error("Redis connection failed.")
raise
self.dau = self.mau = self.mau_abs = self.online = 0
self.current_month = ''
def update_peer(self, peer: str, handshake_time: int) -> None:
def update_peer(self, peer: str, handshake_time: int):
"""
Updates the last handshake time for a specific peer in Redis.
Update a peer's last handshake time in Redis.
Args:
peer (str): Unique identifier for the peer.
handshake_time (int): Timestamp of the handshake event.
peer (str): Peer identifier.
handshake_time (int): Timestamp of the latest handshake.
"""
try:
self.connection.set(peer, handshake_time)
except redis.RedisError as e:
self.log.error(f"Error updating peer {peer}: {e}")
def recalculate(self) -> None:
def recalculate(self):
"""
Recalculates the MAU, DAU, and online user counts by iterating over all peer entries
in Redis and comparing their handshake times to current time thresholds.
MAU (Monthly Active Users): Users active within the last 30 days.
DAU (Daily Active Users): Users active within the last 24 hours.
Online: Users active within the last 5 minutes.
Recalculate activity metrics (DAU, MAU, online peers) based on stored timestamps.
"""
now = datetime.now()
five_minutes_ago = (now - self.FIVE_MINUTES).timestamp()
day_ago = (now - self.ONE_DAY).timestamp()
month_ago = (now - self.ONE_MONTH).timestamp()
first_day_of_month = datetime(now.year, now.month, 1).timestamp()
counts = dict(mau_abs=0, mau=0, dau=0, online=0)
try:
now = datetime.now()
five_minutes_ago = (now - self.FIVE_MINUTES).timestamp()
day_ago = (now - self.ONE_DAY).timestamp()
month_ago = (now - self.ONE_MONTH).timestamp()
first_day_of_month = datetime(now.year, now.month, 1).timestamp()
mau_abs_count = 0
mau_count = 0
dau_count = 0
online_count = 0
for peer in self.connection.keys():
handshake_time = self.connection.get(peer)
if handshake_time:
handshake_time = float(handshake_time)
if handshake_time >= first_day_of_month:
mau_abs_count += 1
if handshake_time >= month_ago:
mau_count += 1
if handshake_time >= day_ago:
dau_count += 1
if handshake_time >= five_minutes_ago:
online_count += 1
self.mau_abs = mau_abs_count
self.mau = mau_count
self.dau = dau_count
self.online = online_count
self.current_month = f"{now.strftime('%Y')}-{now.strftime('%m')}"
ts = self.connection.get(peer)
if ts:
ts = float(ts)
if ts >= first_day_of_month: counts['mau_abs'] += 1
if ts >= month_ago: counts['mau'] += 1
if ts >= day_ago: counts['dau'] += 1
if ts >= five_minutes_ago: counts['online'] += 1
self.mau_abs, self.mau, self.dau, self.online = counts.values()
self.current_month = f"{now:%Y-%m}"
except redis.RedisError as e:
self.log.error(f"Error recalculating active users: {e}")
self.log.error(f"Error during recalculation: {e}")
def __getitem__(self, key):
"""
Allow dictionary-style access to internal metric attributes.
Args:
key (str): One of 'dau', 'mau', 'mau_abs', 'online'.
Returns:
int: Corresponding metric value.
"""
if key in ['dau', 'mau', 'mau_abs', 'online']:
return getattr(self, key)
raise KeyError(f"Invalid key: {key}")
class AwgShowWrapper:
"""
A wrapper class providing utility methods for parsing output from the 'awg show' command.
This class includes static methods for parsing text blocks into structured data and running 'awg show' commands.
Attributes:
None
"""
"""Handles execution and parsing of the AWG binary output."""
@staticmethod
def parse(text_block: str) -> list:
"""
Parse a text block containing information about AmneziaWG peers into a list of dictionaries.
Parse AWG output text to extract peer info.
Args:
text_block (str): The text block to parse.
"""
lines = text_block.strip().splitlines()
peers = []
for line in lines[1:]: # exclude 1st line with host data
parts = line.split()
current_peer = {}
if len(parts) >= 6:
current_peer['peer'] = parts[4]
current_peer['latest_handshake'] = parts[5]
peers.append(current_peer)
text_block (str): Output from AWG binary.
Returns:
list: List of peers with handshake info.
"""
lines = text_block.strip().splitlines()[1:] # Skip header
peers = []
for line in lines:
parts = line.split()
if len(parts) >= 6:
peers.append({'peer': parts[4], 'latest_handshake': parts[5]})
return peers
@staticmethod
def run_bin(command: list) -> str:
"""
Run an 'awg show' command (or its replacement) and return the output.
Execute a shell command and return the output.
Args:
command (list): The 'awg show' command to run.
command (list): Command to run as a list.
Returns:
str: The output of the 'awg show' command.
str: Standard output from the command.
"""
log = MyLogger('AwgShowWrapper').logger
try:
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
return process.stdout.strip()
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
log.error(f"Error: Subprocess failed with exit code {e.returncode} and stderr: {e.stderr.strip()}")
return ''
except FileNotFoundError as e:
log.error(f"{e}")
log.error('Cannot execute awg binary because of the previous exception. Exporter will not work as expected.')
return ''
log.error(f"Subprocess failed: {e.stderr.strip()}")
except FileNotFoundError:
log.error("AWG binary not found.")
except Exception as e:
log.error(f"{e}")
return ''
log.error(f"Unexpected error: {e}")
return ''
class Exporter:
"""
A class to handle exporting of metrics to Grafana Cloud or local storage based on configuration settings.
"""Prometheus exporter that collects and exposes metrics based on AWG output."""
Attributes:
config (dict): Configuration parameters for the exporter.
registry (CollectorRegistry): Registry to store Prometheus metrics.
storage (PersistenceWrapper): Redis storage wrapper to handle peer data persistence.
log (Logger): Logger for monitoring and debugging.
current_online_metric (Gauge): Gauge metric for tracking current online users.
dau_metric (Gauge): Gauge metric for daily active users.
mau_metric (Gauge): Gauge metric for monthly active users.
status (Gauge): Gauge metric for exporter status.
"""
def __init__(self, config: dict) -> None:
def __init__(self, config: ExporterConfig):
"""
Initializes the Exporter instance with the given configuration.
Initialize the exporter with configuration and setup metrics.
Args:
config (dict): Dictionary containing exporter configuration.
config (ExporterConfig): Exporter configuration object.
"""
self.config = config
self.awg_show_command = self.config['awg_executable'].split(' ')
self.log = MyLogger(self.__class__.__name__).logger
self.registry = CollectorRegistry()
self.storage = PersistenceWrapper(self.config['redis_host'], self.config['redis_port'], self.config['redis_db'])
self.current_online_metric = Gauge('awg_current_online', 'Current online users', registry=self.registry)
self.dau_metric = Gauge('awg_dau', 'Daily active users', registry=self.registry)
self.mau_metric = Gauge('awg_mau', 'Monthly active users', registry=self.registry)
self.mau_abs_metric = Gauge('awg_mau_abs', 'Monthly active users (Absolute)', ['month'], registry=self.registry)
self.status = Gauge('awg_status', 'Exporter status. 1 - OK, 0 - not OK', registry=self.registry)
self.log.info('AmneziaWG exporter initialized')
self.storage = PersistenceWrapper(config.redis_host, config.redis_port, config.redis_db)
self.awg_show_command = config.awg_executable.split()
labels = list(config.extra_labels.keys())
self.has_labels = True if len(labels) > 0 else False
self.metrics = {
'online': Gauge('awg_current_online', 'Online users', labels, registry=self.registry),
'dau': Gauge('awg_dau', 'Daily Active Users', labels, registry=self.registry),
'mau': Gauge('awg_mau', 'Monthly Active Users', labels, registry=self.registry),
'mau_abs': Gauge('awg_mau_abs', 'Absolute Monthly Active Users', ['month'] + labels, registry=self.registry),
'status': Gauge('awg_status', 'Exporter status', labels, registry=self.registry)
}
self.log.info("Exporter initialized")
def sigterm_handler(self, sig, frame) -> None:
"""
Handles SIGTERM signal for graceful shutdown.
"""
self.log.info('SIGTERM received, preparing to shut down...')
sys.exit(0)
def set_metric(self, name):
if name != 'status':
value = self.storage[name]
else:
value = 1
if self.has_labels:
if name == 'mau_abs':
self.metrics[name].labels(month=self.storage.current_month, **self.config.extra_labels).set(value)
else:
self.metrics[name].labels(**self.config.extra_labels).set(value)
else:
if name == 'mau_abs':
self.metrics[name].labels(month=self.storage.current_month).set(value)
else:
self.metrics[name].set(value)
def sigint_handler(self, sig, frame) -> None:
def update_metrics(self):
"""
Handles SIGINT (Ctrl+C) signal for graceful shutdown.
Fetch the latest peer data, update Redis, and export metrics.
"""
self.log.info('SIGINT (Ctrl+C) received, preparing to shut down...')
sys.exit(0)
def update_metrics(self) -> None:
"""
Updates and recalculates metrics for online, daily, and monthly active users.
"""
try:
awg_show_result = AwgShowWrapper.run_bin(self.awg_show_command)
peers = AwgShowWrapper.parse(awg_show_result)
if not peers:
self.status.set(0)
self.current_online_metric.set(0)
return
for peer in peers:
if peer.get('latest_handshake') == "0":
continue
output = AwgShowWrapper.run_bin(self.awg_show_command)
peers = AwgShowWrapper.parse(output)
if not peers:
self.metrics['status'].labels(**self.config.extra_labels).set(0)
self.metrics['online'].labels(**self.config.extra_labels).set(0)
return
for peer in peers:
if peer.get('latest_handshake') != '0':
self.storage.update_peer(peer['peer'], peer['latest_handshake'])
self.storage.recalculate()
self.dau_metric.set(self.storage.dau)
self.mau_metric.set(self.storage.mau)
self.mau_abs_metric.labels(self.storage.current_month).set(self.storage.mau_abs)
self.current_online_metric.set(self.storage.online)
self.status.set(1)
except Exception as e:
self.log.error(f"Error updating metrics: {e}")
self.storage.recalculate()
for metric in self.metrics.keys():
self.set_metric(metric)
def send_to_grafana_cloud(self) -> None:
def run(self):
"""
Sends collected metrics to Grafana Cloud.
Start the exporter based on the configured operational mode.
Supports 'http', 'metricsfile', and 'oneshot'.
"""
metrics = generate_latest(self.registry)
for family in text_string_to_metric_families(metrics.decode('utf-8')):
for sample in family.samples:
name = sample.name
labels = sample.labels
value = sample.value
labels_string = ','.join([f"{key}={value}" for key, value in labels.items()])
# Dirty hack: We might need to add some labels (usually Prometheus does this for us).
if self.config['grafana_additional_labels'] != '':
labels_string = f"{labels_string},{self.config['grafana_additional_labels']}"
response = requests.post(self.config['grafana_write_url'],
headers={"Authorization": f"Bearer {self.config['grafana_write_token']}", "Content-Type": "text/plain"},
data=f"{name},{labels_string} value={value}")
if response.status_code != 204:
self.log.info(f"Failed to send metrics to Grafana Cloud: {response.status_code}, {response.text}")
def validate(self) -> None:
"""
Validates the configuration, ensuring required fields are set for Grafana Cloud mode.
"""
if self.config['ops_mode'] == 'grafana_cloud':
if self.config['grafana_write_url'] == '':
self.log.error('AWG_GRAFANA_WRITE_URL variable must be set!')
sys.exit(1)
if self.config['grafana_write_token'] == '':
self.log.error('AWG_GRAFANA_WRITE_TOKEN variable must be set!')
sys.exit(1)
def main_loop(self) -> None:
self.log.info('Starting main loop')
signal.signal(signal.SIGTERM, self.sigterm_handler)
signal.signal(signal.SIGINT, self.sigint_handler)
self.validate()
if self.config['ops_mode'] == 'http':
# Start up the server to expose the metrics.
start_http_server(port=self.config['http_port'], addr=self.config['addr'], registry=self.registry)
self.log.info("Exporter running")
signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))
signal.signal(signal.SIGINT, lambda s, f: sys.exit(0))
if self.config.ops_mode == 'http':
start_http_server(self.config.http_port, addr=self.config.addr, registry=self.registry)
while True:
try:
self.update_metrics()
if self.config['ops_mode'] in ['metricsfile', 'oneshot']:
write_to_textfile(self.config['metrics_file'], self.registry)
if self.config['ops_mode'] == 'oneshot':
self.log.info("Exiting after successful metrics fetch...")
break
if self.config['ops_mode'] == 'grafana_cloud':
self.send_to_grafana_cloud()
time.sleep(self.config['scrape_interval'])
except Exception as e:
self.log.error(f"{str(e)}")
time.sleep(self.config['scrape_interval'])
self.update_metrics()
if self.config.ops_mode in ['metricsfile', 'oneshot']:
write_to_textfile(self.config.metrics_file, self.registry)
if self.config.ops_mode == 'oneshot':
break
time.sleep(self.config.scrape_interval)
def main():
"""
Entry point for the exporter script. Parses arguments, loads config,
initializes and runs the exporter.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--envfile', type=str, help='Path to env file')
args = parser.parse_args()
raw = Decouwrapper(args.envfile)
config = ExporterConfig(
scrape_interval=int(raw.get('AWG_EXPORTER_SCRAPE_INTERVAL', 60)),
http_port=int(raw.get('AWG_EXPORTER_HTTP_PORT', 9351)),
addr=raw.get('AWG_EXPORTER_LISTEN_ADDR', '0.0.0.0'),
metrics_file=raw.get('AWG_EXPORTER_METRICS_FILE', '/tmp/prometheus/awg.prom'),
ops_mode=raw.get('AWG_EXPORTER_OPS_MODE', 'http'),
awg_executable=raw.get('AWG_EXPORTER_AWG_SHOW_EXEC', 'awg show all dump'),
redis_host=raw.get('AWG_EXPORTER_REDIS_HOST', 'localhost'),
redis_port=int(raw.get('AWG_EXPORTER_REDIS_PORT', 6379)),
redis_db=int(raw.get('AWG_EXPORTER_REDIS_DB', 0)),
extra_labels=raw.discovery('AWG_EXPORTER_EXTRA_LABEL_')
)
logger = MyLogger("Main").logger
logger.info("Starting Exporter")
logger.info('Exporter config:')
for key, value in asdict(config).items():
if key == 'metrics_file' and config.ops_mode != 'metricsfile':
continue
logger.info(f"--> {key}: {value}")
exporter = Exporter(config)
exporter.run()
if __name__ == '__main__':
log = MyLogger("Main").logger
log.info('Starting AmneziaWG exporter')
config = Decouwrapper()
exporter_config = {
'scrape_interval': config('AWG_EXPORTER_SCRAPE_INTERVAL', default=60),
'http_port': config('AWG_EXPORTER_HTTP_PORT', default=9351),
'addr': config('AWG_EXPORTER_LISTEN_ADDR', default='0.0.0.0'),
'metrics_file': config('AWG_EXPORTER_METRICS_FILE', default='/tmp/prometheus/awg.prom'),
'ops_mode': config('AWG_EXPORTER_OPS_MODE', default='http'),
'grafana_write_url': config('AWG_GRAFANA_WRITE_URL', default=''),
'grafana_write_token': config('AWG_GRAFANA_WRITE_TOKEN', default=''),
'grafana_additional_labels': config('AWG_GRAFANA_ADDITIONAL_LABELS', default=''),
'awg_executable': config('AWG_EXPORTER_AWG_SHOW_EXEC', default='awg show all dump'),
'redis_host': config('AWG_EXPORTER_REDIS_HOST', default='localhost'),
'redis_port': config('AWG_EXPORTER_REDIS_PORT', default=6379),
'redis_db': config('AWG_EXPORTER_REDIS_DB', default=0)
}
log.info('Exporter config:')
for key, value in exporter_config.items():
if key == 'metrics_file' and exporter_config['ops_mode'] != 'metricsfile':
continue
log.info(f"--> {key}: {value}")
exporter = Exporter(exporter_config)
exporter.main_loop()
main()