Compare commits

...

10 Commits

4 changed files with 157 additions and 8 deletions

View File

@@ -8,8 +8,6 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogsketches"
@@ -36,6 +34,7 @@ import (
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
@@ -43,6 +42,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -70,6 +70,7 @@ var (
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 40, "The maximum number of labels per time series to be accepted. Series with superfluous labels are ignored. In this case the vm_rows_ignored_total{reason=\"too_many_labels\"} metric at /metrics page is incremented")
maxLabelNameLen = flag.Int("maxLabelNameLen", 256, "The maximum length of label name in the accepted time series. Series with longer label name are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_name\"} metric at /metrics page is incremented")
maxLabelValueLen = flag.Int("maxLabelValueLen", 4*1024, "The maximum length of label values in the accepted time series. Series with longer label value are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_value\"} metric at /metrics page is incremented")
maxMemoryUsage = flag.Int("insert.circuitBreakMemoryUsage", 90, "Reject insert requests when memory usage exceeds a certain percentage. 0 means no circuit breaking. An integer value from 1-100 represents 1%-100%.")
)
var (
@@ -131,6 +132,13 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
startTime := time.Now()
defer requestDuration.UpdateDuration(startTime)
if *maxMemoryUsage >= 1 && *maxMemoryUsage <= 100 {
if memory.CurrentPercentage() > *maxMemoryUsage {
httpserver.Errorf(w, r, "server overloaded, request rejected by circuit breaker")
return true
}
}
path := strings.Replace(r.URL.Path, "//", "/", -1)
if strings.HasPrefix(path, "/static") {
staticServer.ServeHTTP(w, r)

View File

@@ -48,6 +48,19 @@ func GetMemoryLimit() int64 {
return n
}
// GetMemoryUsage returns cgroup memory usage
func GetMemoryUsage() int64 {
n, err := getMemStat("memory.usage_in_bytes")
if err == nil {
return n
}
n, err = getMemStatV2("memory.current")
if err != nil {
return 0
}
return n
}
func getMemStatV2(statName string) (int64, error) {
// See https: //www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#memory-interface-files
return getStatGeneric(statName, "/sys/fs/cgroup", "/proc/self/cgroup", "")
@@ -79,3 +92,23 @@ func getHierarchicalMemoryLimit(sysfsPrefix, cgroupPath string) (int64, error) {
}
return strconv.ParseInt(memStat, 10, 64)
}
func GetHierarchicalMemoryUsage() int64 {
n, err := getHierarchicalMemoryUsage("/sys/fs/cgroup/memory", "/proc/self/cgroup")
if err != nil {
return 0
}
return n
}
func getHierarchicalMemoryUsage(sysfsPrefix, cgroupPath string) (int64, error) {
data, err := getFileContents("memory.stat", sysfsPrefix, cgroupPath, "memory")
if err != nil {
return 0, err
}
memStat, err := grepFirstMatch(data, "hierarchical_memory_limit", 1, " ")
if err != nil {
return 0, err
}
return strconv.ParseInt(memStat, 10, 64)
}

View File

@@ -4,15 +4,19 @@ import (
"flag"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/metrics"
)
var (
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
memCheckInterval = flag.Duration("memory.checkInterval", 5*time.Second, "How often to check the memory usage.")
)
var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
@@ -20,11 +24,15 @@ var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
})
var (
allowedMemory int
remainingMemory int
memoryLimit int
allowedMemory int
remainingMemory int
memoryLimit int
currentMemory atomic.Int64
currentMemoryPercentage atomic.Int32
)
var (
once sync.Once
)
var once sync.Once
func initOnce() {
if !flag.Parsed() {
@@ -45,6 +53,36 @@ func initOnce() {
remainingMemory = memoryLimit - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
}
if *memCheckInterval == 0 {
return
}
// enable memory detection if configured
currentAvailableBytes, _ := getAvailableMemory()
currentUsedBytes := max(0, memoryLimit-currentAvailableBytes)
currentMemory.Store(int64(currentUsedBytes))
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
go func() {
// Register SIGHUP handler for config reload before loadRelabelConfigs.
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan()
t := time.NewTicker(*memCheckInterval)
defer t.Stop()
for {
select {
case <-sighupCh:
return
case <-t.C:
currentAvailableBytes, _ = getAvailableMemory()
currentUsedBytes = max(0, memoryLimit-currentAvailableBytes)
currentMemory.Store(int64(currentUsedBytes))
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
logger.Infof("current: %dMiB, total: %dMiB, percent: %d%%", currentUsedBytes/1024/1024, memoryLimit/1024/1024, currentMemoryPercentage.Load())
}
}
}()
}
// Allowed returns the amount of system memory allowed to use by the app.
@@ -62,3 +100,15 @@ func Remaining() int {
once.Do(initOnce)
return remainingMemory
}
// Current return memory usage in byte. The value is updated every 5 seconds.
func Current() int {
once.Do(initOnce)
return int(currentMemory.Load())
}
// CurrentPercentage return memory usage percentage in [0-100] int. The value is updated every 5 seconds.
func CurrentPercentage() int {
once.Do(initOnce)
return int(currentMemoryPercentage.Load())
}

View File

@@ -1,6 +1,13 @@
package memory
import (
"bufio"
"bytes"
"errors"
"fmt"
"os"
"strconv"
"strings"
"syscall"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@@ -29,3 +36,54 @@ func sysTotalMemory() int {
}
return int(mem)
}
func sysCurrentMemory() int {
am, err := getAvailableMemory()
if err != nil {
return 0
}
return am
//mem := cgroup.GetMemoryUsage()
//if mem <= 0 || int64(int(mem)) != mem || int(mem) > usedMem {
// mem = cgroup.GetHierarchicalMemoryUsage()
// if mem <= 0 || int64(int(mem)) != mem || int(mem) > usedMem {
// return usedMem
// }
//}
//return int(mem)
}
// getAvailableMemory parse /proc/meminfo and return MemAvailable in byte.
func getAvailableMemory() (int, error) {
b, err := os.ReadFile("/proc/meminfo")
if err != nil {
return 0, err
}
s := bufio.NewScanner(bytes.NewReader(b))
for s.Scan() {
fields := strings.Fields(s.Text())
if fields[0] != "MemAvailable:" {
continue
}
val, err := strconv.ParseInt(fields[1], 0, 64)
if err != nil {
return 0, err
}
switch len(fields) {
case 2:
return int(val), nil
case 3:
if fields[2] != "kB" {
return 0, fmt.Errorf("%w: unsupported unit in optional 3rd field %q", ErrFileParse, fields[2])
}
return int(1024 * val), nil
default:
return 0, fmt.Errorf("%w: malformed line %q", ErrFileParse, s.Text())
}
}
return 0, fmt.Errorf("AvailableMemory not found")
}
var (
ErrFileParse = errors.New("error parsing file")
)