mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-09 11:54:31 +03:00
Cluster mode is enabled when -storageNode command-line flag is passed to VictoriaLogs. In this mode it spreads the ingested logs among storage nodes specified in the -storageNode flag. It also queries storage nodes during `select` queries. Cluster mode allows building multi-level cluster setup when top-level select node can query multiple lower-level clusters and get global querying view. See https://docs.victoriametrics.com/victorialogs/cluster/ Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5077 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7950 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8223
97 lines
2.7 KiB
Go
97 lines
2.7 KiB
Go
package internalinsert
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage/netinsert"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
|
)
|
|
|
|
var (
|
|
disableInsert = flag.Bool("internalinsert.disable", false, "Whether to disable /internal/insert HTTP endpoint")
|
|
maxRequestSize = flagutil.NewBytes("internalinsert.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single request, which can be accepted at /internal/insert HTTP endpoint")
|
|
)
|
|
|
|
// RequestHandler processes /internal/insert requests.
|
|
func RequestHandler(w http.ResponseWriter, r *http.Request) {
|
|
if *disableInsert {
|
|
httpserver.Errorf(w, r, "requests to /internal/insert are disabled with -internalinsert.disable command-line flag")
|
|
return
|
|
}
|
|
|
|
startTime := time.Now()
|
|
if r.Method != "POST" {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
version := r.FormValue("version")
|
|
if version != netinsert.ProtocolVersion {
|
|
httpserver.Errorf(w, r, "unsupported protocol version=%q; want %q", version, netinsert.ProtocolVersion)
|
|
return
|
|
}
|
|
|
|
requestsTotal.Inc()
|
|
|
|
cp, err := insertutil.GetCommonParams(r)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return
|
|
}
|
|
if err := vlstorage.CanWriteData(); err != nil {
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return
|
|
}
|
|
|
|
encoding := r.Header.Get("Content-Encoding")
|
|
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
|
|
lmp := cp.NewLogMessageProcessor("internalinsert", false)
|
|
irp := lmp.(insertutil.InsertRowProcessor)
|
|
err := parseData(irp, data)
|
|
lmp.MustClose()
|
|
return err
|
|
})
|
|
if err != nil {
|
|
errorsTotal.Inc()
|
|
httpserver.Errorf(w, r, "cannot parse internal insert request: %s", err)
|
|
return
|
|
}
|
|
|
|
requestDuration.UpdateDuration(startTime)
|
|
}
|
|
|
|
func parseData(irp insertutil.InsertRowProcessor, data []byte) error {
|
|
r := logstorage.GetInsertRow()
|
|
src := data
|
|
i := 0
|
|
for len(src) > 0 {
|
|
tail, err := r.UnmarshalInplace(src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse row #%d: %s", i, err)
|
|
}
|
|
src = tail
|
|
i++
|
|
|
|
irp.AddInsertRow(r)
|
|
}
|
|
logstorage.PutInsertRow(r)
|
|
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/internal/insert"}`)
|
|
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/internal/insert"}`)
|
|
|
|
requestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/internal/insert"}`)
|
|
)
|