Compare commits

...

1 Commits

Author SHA1 Message Date
Jiekun
0baa091b77 feature: [memory limit] add cluster version 2025-07-04 15:41:28 +08:00
2 changed files with 111 additions and 7 deletions

View File

@@ -10,8 +10,6 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogsketches"
@@ -42,11 +40,13 @@ import (
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -81,6 +81,7 @@ var (
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 40, "The maximum number of labels per time series to be accepted. Series with superfluous labels are ignored. In this case the vm_rows_ignored_total{reason=\"too_many_labels\"} metric at /metrics page is incremented")
maxLabelNameLen = flag.Int("maxLabelNameLen", 256, "The maximum length of label name in the accepted time series. Series with longer label name are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_name\"} metric at /metrics page is incremented")
maxLabelValueLen = flag.Int("maxLabelValueLen", 4*1024, "The maximum length of label values in the accepted time series. Series with longer label value are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_value\"} metric at /metrics page is incremented")
maxMemoryUsage = flag.Int("insert.circuitBreakMemoryUsage", 90, "Reject insert requests when memory usage exceeds a certain percentage. 0 means no circuit breaking. An integer value from 1-100 represents 1%-100%.")
)
var (
@@ -217,6 +218,12 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
// This is not our link.
return false
}
if *maxMemoryUsage >= 1 && *maxMemoryUsage <= 100 {
if memory.CurrentPercentage() > *maxMemoryUsage {
httpserver.Errorf(w, r, "server overloaded, request rejected by circuit breaker")
return true
}
}
at, err := auth.NewTokenPossibleMultitenant(p.AuthToken)
if err != nil {
httpserver.Errorf(w, r, "auth error: %s", err)

View File

@@ -1,18 +1,28 @@
package memory
import (
"bufio"
"bytes"
"errors"
"flag"
"fmt"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/metrics"
)
var (
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
memCheckInterval = flag.Duration("memory.checkInterval", 2*time.Second, "How often to check the memory usage.")
)
var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
@@ -20,10 +30,13 @@ var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
})
var (
allowedMemory int
remainingMemory int
memoryLimit int
allowedMemory int
remainingMemory int
memoryLimit int
currentMemory atomic.Int64
currentMemoryPercentage atomic.Int32
)
var once sync.Once
func initOnce() {
@@ -45,6 +58,35 @@ func initOnce() {
remainingMemory = memoryLimit - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
}
if *memCheckInterval == 0 {
return
}
// enable memory detection if configured
currentAvailableBytes, _ := getAvailableMemory()
currentUsedBytes := max(0, memoryLimit-currentAvailableBytes)
currentMemory.Store(int64(currentUsedBytes))
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
go func() {
// Register SIGHUP handler for config reload before loadRelabelConfigs.
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan()
t := time.NewTicker(*memCheckInterval)
defer t.Stop()
for {
select {
case <-sighupCh:
return
case <-t.C:
currentAvailableBytes, _ = getAvailableMemory()
currentUsedBytes = max(0, memoryLimit-currentAvailableBytes)
currentMemory.Store(int64(currentUsedBytes))
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
logger.Infof("current: %dMiB, total: %dMiB, percent: %d%%", currentUsedBytes/1024/1024, memoryLimit/1024/1024, currentMemoryPercentage.Load())
}
}
}()
}
// Allowed returns the amount of system memory allowed to use by the app.
@@ -62,3 +104,58 @@ func Remaining() int {
once.Do(initOnce)
return remainingMemory
}
// Current return memory usage in byte. The value is updated every 5 seconds.
func Current() int {
once.Do(initOnce)
return int(currentMemory.Load())
}
// CurrentPercentage return memory usage percentage in [0-100] int. The value is updated every 5 seconds.
func CurrentPercentage() int {
once.Do(initOnce)
return int(currentMemoryPercentage.Load())
}
func sysCurrentMemory() int {
am, err := getAvailableMemory()
if err != nil {
return 0
}
return am
}
// getAvailableMemory parse /proc/meminfo and return MemAvailable in byte.
func getAvailableMemory() (int, error) {
b, err := os.ReadFile("/proc/meminfo")
if err != nil {
return 0, err
}
s := bufio.NewScanner(bytes.NewReader(b))
for s.Scan() {
fields := strings.Fields(s.Text())
if fields[0] != "MemAvailable:" {
continue
}
val, err := strconv.ParseInt(fields[1], 0, 64)
if err != nil {
return 0, err
}
switch len(fields) {
case 2:
return int(val), nil
case 3:
if fields[2] != "kB" {
return 0, fmt.Errorf("%w: unsupported unit in optional 3rd field %q", ErrFileParse, fields[2])
}
return int(1024 * val), nil
default:
return 0, fmt.Errorf("%w: malformed line %q", ErrFileParse, s.Text())
}
}
return 0, fmt.Errorf("AvailableMemory not found")
}
var (
ErrFileParse = errors.New("error parsing file")
)