mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-20 10:16:28 +03:00
Compare commits
1 Commits
query-debu
...
feature/me
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0baa091b77 |
@@ -10,8 +10,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogsketches"
|
||||
@@ -42,11 +40,13 @@ import (
|
||||
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
|
||||
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -81,6 +81,7 @@ var (
|
||||
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 40, "The maximum number of labels per time series to be accepted. Series with superfluous labels are ignored. In this case the vm_rows_ignored_total{reason=\"too_many_labels\"} metric at /metrics page is incremented")
|
||||
maxLabelNameLen = flag.Int("maxLabelNameLen", 256, "The maximum length of label name in the accepted time series. Series with longer label name are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_name\"} metric at /metrics page is incremented")
|
||||
maxLabelValueLen = flag.Int("maxLabelValueLen", 4*1024, "The maximum length of label values in the accepted time series. Series with longer label value are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_value\"} metric at /metrics page is incremented")
|
||||
maxMemoryUsage = flag.Int("insert.circuitBreakMemoryUsage", 90, "Reject insert requests when memory usage exceeds a certain percentage. 0 means no circuit breaking. An integer value from 1-100 represents 1%-100%.")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -217,6 +218,12 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
// This is not our link.
|
||||
return false
|
||||
}
|
||||
if *maxMemoryUsage >= 1 && *maxMemoryUsage <= 100 {
|
||||
if memory.CurrentPercentage() > *maxMemoryUsage {
|
||||
httpserver.Errorf(w, r, "server overloaded, request rejected by circuit breaker")
|
||||
return true
|
||||
}
|
||||
}
|
||||
at, err := auth.NewTokenPossibleMultitenant(p.AuthToken)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "auth error: %s", err)
|
||||
|
||||
@@ -1,18 +1,28 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
|
||||
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
|
||||
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
|
||||
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
|
||||
memCheckInterval = flag.Duration("memory.checkInterval", 2*time.Second, "How often to check the memory usage.")
|
||||
)
|
||||
|
||||
var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
|
||||
@@ -20,10 +30,13 @@ var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
|
||||
})
|
||||
|
||||
var (
|
||||
allowedMemory int
|
||||
remainingMemory int
|
||||
memoryLimit int
|
||||
allowedMemory int
|
||||
remainingMemory int
|
||||
memoryLimit int
|
||||
currentMemory atomic.Int64
|
||||
currentMemoryPercentage atomic.Int32
|
||||
)
|
||||
|
||||
var once sync.Once
|
||||
|
||||
func initOnce() {
|
||||
@@ -45,6 +58,35 @@ func initOnce() {
|
||||
remainingMemory = memoryLimit - allowedMemory
|
||||
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
|
||||
}
|
||||
if *memCheckInterval == 0 {
|
||||
return
|
||||
}
|
||||
// enable memory detection if configured
|
||||
currentAvailableBytes, _ := getAvailableMemory()
|
||||
currentUsedBytes := max(0, memoryLimit-currentAvailableBytes)
|
||||
currentMemory.Store(int64(currentUsedBytes))
|
||||
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
|
||||
|
||||
go func() {
|
||||
// Register SIGHUP handler for config reload before loadRelabelConfigs.
|
||||
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||
sighupCh := procutil.NewSighupChan()
|
||||
t := time.NewTicker(*memCheckInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
return
|
||||
case <-t.C:
|
||||
currentAvailableBytes, _ = getAvailableMemory()
|
||||
currentUsedBytes = max(0, memoryLimit-currentAvailableBytes)
|
||||
currentMemory.Store(int64(currentUsedBytes))
|
||||
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
|
||||
logger.Infof("current: %dMiB, total: %dMiB, percent: %d%%", currentUsedBytes/1024/1024, memoryLimit/1024/1024, currentMemoryPercentage.Load())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Allowed returns the amount of system memory allowed to use by the app.
|
||||
@@ -62,3 +104,58 @@ func Remaining() int {
|
||||
once.Do(initOnce)
|
||||
return remainingMemory
|
||||
}
|
||||
|
||||
// Current return memory usage in byte. The value is updated every 5 seconds.
|
||||
func Current() int {
|
||||
once.Do(initOnce)
|
||||
return int(currentMemory.Load())
|
||||
}
|
||||
|
||||
// CurrentPercentage return memory usage percentage in [0-100] int. The value is updated every 5 seconds.
|
||||
func CurrentPercentage() int {
|
||||
once.Do(initOnce)
|
||||
return int(currentMemoryPercentage.Load())
|
||||
}
|
||||
|
||||
func sysCurrentMemory() int {
|
||||
am, err := getAvailableMemory()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return am
|
||||
}
|
||||
|
||||
// getAvailableMemory parse /proc/meminfo and return MemAvailable in byte.
|
||||
func getAvailableMemory() (int, error) {
|
||||
b, err := os.ReadFile("/proc/meminfo")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
s := bufio.NewScanner(bytes.NewReader(b))
|
||||
for s.Scan() {
|
||||
fields := strings.Fields(s.Text())
|
||||
if fields[0] != "MemAvailable:" {
|
||||
continue
|
||||
}
|
||||
val, err := strconv.ParseInt(fields[1], 0, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
switch len(fields) {
|
||||
case 2:
|
||||
return int(val), nil
|
||||
case 3:
|
||||
if fields[2] != "kB" {
|
||||
return 0, fmt.Errorf("%w: unsupported unit in optional 3rd field %q", ErrFileParse, fields[2])
|
||||
}
|
||||
return int(1024 * val), nil
|
||||
default:
|
||||
return 0, fmt.Errorf("%w: malformed line %q", ErrFileParse, s.Text())
|
||||
}
|
||||
}
|
||||
return 0, fmt.Errorf("AvailableMemory not found")
|
||||
}
|
||||
|
||||
var (
|
||||
ErrFileParse = errors.New("error parsing file")
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user