lib/httpserver: use github.com/klauspost/compress/gzhttp for compressing http responses

This allows removing gzip-related code from lib/httpserver.
This commit is contained in:
Aliaksandr Valialkin
2023-02-27 10:16:58 -08:00
parent 27c9446520
commit 1a6f2f07fd
29 changed files with 15004 additions and 8452 deletions

View File

@@ -1,7 +1,6 @@
package httpserver
import (
"bufio"
"context"
"crypto/tls"
"errors"
@@ -26,7 +25,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/gzhttp"
"github.com/valyala/fastrand"
)
@@ -74,9 +73,7 @@ type RequestHandler func(w http.ResponseWriter, r *http.Request) bool
// Serve starts an http server on the given addr with the given optional rh.
//
// By default all the responses are transparently compressed, since Google
// charges a lot for the egress traffic. The compression may be disabled
// by calling DisableResponseCompression before writing the first byte to w.
// By default all the responses are transparently compressed, since egress traffic is usually expensive.
//
// The compression is also disabled if -http.disableResponseCompression flag is set.
//
@@ -195,17 +192,23 @@ func Stop(addr string) error {
}
func gzipHandler(s *server, rh RequestHandler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w = maybeGzipResponseWriter(w, r)
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handlerWrapper(s, w, r, rh)
if zrw, ok := w.(*gzipResponseWriter); ok {
if err := zrw.Close(); err != nil && !netutil.IsTrivialNetworkError(err) {
logger.Warnf("gzipResponseWriter.Close: %s", err)
}
}
})
if *disableResponseCompression {
return h
}
return gzipHandlerWrapper(h)
}
var gzipHandlerWrapper = func() func(http.Handler) http.HandlerFunc {
hw, err := gzhttp.NewWrapper(gzhttp.CompressionLevel(1))
if err != nil {
panic(fmt.Errorf("BUG: cannot initialize gzip http wrapper: %s", err))
}
return hw
}()
var metricsHandlerDuration = metrics.NewHistogram(`vm_http_request_duration_seconds{path="/metrics"}`)
var connTimeoutClosedConns = metrics.NewCounter(`vm_http_conn_timeout_closed_conns_total`)
@@ -326,7 +329,6 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques
if !CheckAuthFlag(w, r, *pprofAuthKey, "pprofAuthKey") {
return
}
DisableResponseCompression(w)
pprofHandler(r.URL.Path[len("/debug/pprof/"):], w, r)
return
}
@@ -374,179 +376,12 @@ func CheckBasicAuth(w http.ResponseWriter, r *http.Request) bool {
return false
}
func maybeGzipResponseWriter(w http.ResponseWriter, r *http.Request) http.ResponseWriter {
if *disableResponseCompression {
return w
}
if r.Header.Get("Connection") == "Upgrade" {
return w
}
ae := r.Header.Get("Accept-Encoding")
if ae == "" {
return w
}
ae = strings.ToLower(ae)
n := strings.Index(ae, "gzip")
if n < 0 {
// Do not apply gzip encoding to the response.
return w
}
// Apply gzip encoding to the response.
zw := getGzipWriter(w)
bw := getBufioWriter(zw)
zrw := &gzipResponseWriter{
rw: w,
zw: zw,
bw: bw,
}
return zrw
}
// DisableResponseCompression disables response compression on w.
//
// The function must be called before the first w.Write* call.
func DisableResponseCompression(w http.ResponseWriter) {
zrw, ok := w.(*gzipResponseWriter)
if !ok {
return
}
if zrw.firstWriteDone {
logger.Panicf("BUG: DisableResponseCompression must be called before sending the response")
}
zrw.disableCompression = true
}
// EnableCORS enables https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS
// on the response.
func EnableCORS(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
}
func getGzipWriter(w io.Writer) *gzip.Writer {
v := gzipWriterPool.Get()
if v == nil {
zw, err := gzip.NewWriterLevel(w, 1)
if err != nil {
logger.Panicf("BUG: cannot create gzip writer: %s", err)
}
return zw
}
zw := v.(*gzip.Writer)
zw.Reset(w)
return zw
}
func putGzipWriter(zw *gzip.Writer) {
gzipWriterPool.Put(zw)
}
var gzipWriterPool sync.Pool
type gzipResponseWriter struct {
rw http.ResponseWriter
zw *gzip.Writer
bw *bufio.Writer
statusCode int
firstWriteDone bool
disableCompression bool
}
// Implements http.ResponseWriter.Header method.
func (zrw *gzipResponseWriter) Header() http.Header {
return zrw.rw.Header()
}
// Implements http.ResponseWriter.Write method.
func (zrw *gzipResponseWriter) Write(p []byte) (int, error) {
if !zrw.firstWriteDone {
h := zrw.Header()
if zrw.statusCode == http.StatusNoContent {
zrw.disableCompression = true
}
if h.Get("Content-Encoding") != "" {
zrw.disableCompression = true
}
if !zrw.disableCompression {
h.Set("Content-Encoding", "gzip")
h.Del("Content-Length")
if h.Get("Content-Type") == "" {
// Disable auto-detection of content-type, since it
// is incorrectly detected after the compression.
h.Set("Content-Type", "text/html; charset=utf-8")
}
}
zrw.writeHeader()
zrw.firstWriteDone = true
}
if zrw.disableCompression {
return zrw.rw.Write(p)
}
return zrw.bw.Write(p)
}
// Implements http.ResponseWriter.WriteHeader method.
func (zrw *gzipResponseWriter) WriteHeader(statusCode int) {
zrw.statusCode = statusCode
}
func (zrw *gzipResponseWriter) writeHeader() {
if zrw.statusCode == 0 {
zrw.statusCode = http.StatusOK
}
zrw.rw.WriteHeader(zrw.statusCode)
}
// Implements http.Flusher
func (zrw *gzipResponseWriter) Flush() {
if !zrw.firstWriteDone {
_, _ = zrw.Write(nil)
}
if !zrw.disableCompression {
if err := zrw.bw.Flush(); err != nil && !netutil.IsTrivialNetworkError(err) {
logger.Warnf("gzipResponseWriter.Flush (buffer): %s", err)
}
if err := zrw.zw.Flush(); err != nil && !netutil.IsTrivialNetworkError(err) {
logger.Warnf("gzipResponseWriter.Flush (gzip): %s", err)
}
}
if fw, ok := zrw.rw.(http.Flusher); ok {
fw.Flush()
}
}
func (zrw *gzipResponseWriter) Close() error {
if !zrw.firstWriteDone {
_, _ = zrw.Write(nil)
}
zrw.Flush()
var err error
if !zrw.disableCompression {
err = zrw.zw.Close()
}
putGzipWriter(zrw.zw)
zrw.zw = nil
putBufioWriter(zrw.bw)
zrw.bw = nil
return err
}
func getBufioWriter(w io.Writer) *bufio.Writer {
v := bufioWriterPool.Get()
if v == nil {
return bufio.NewWriterSize(w, 16*1024)
}
bw := v.(*bufio.Writer)
bw.Reset(w)
return bw
}
func putBufioWriter(bw *bufio.Writer) {
bufioWriterPool.Put(bw)
}
var bufioWriterPool sync.Pool
func pprofHandler(profileName string, w http.ResponseWriter, r *http.Request) {
// This switch has been stolen from init func at https://golang.org/src/net/http/pprof/pprof.go
switch profileName {