Compare commits

...

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
5e3be175cf lib/promscrape/discovery/kubernetes: support scraping in protobuf format, fixes #8512 2025-06-23 11:17:34 +03:00
16 changed files with 3211 additions and 916 deletions

View File

@@ -1,7 +1,10 @@
package kubernetes
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"flag"
@@ -17,14 +20,17 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/easyproto"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
)
@@ -37,12 +43,133 @@ var (
" This may reduce amount of concurrent connections to API server when watching for a big number of Kubernetes objects.")
)
const (
contentTypeJSON = "application/json"
contentTypeProtobuf = "application/vnd.kubernetes.protobuf"
acceptHeader = "application/vnd.kubernetes.protobuf, application/json"
)
var protoPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
var dataBufPool bytesutil.ByteBufferPool
func newEventUnmarshaller(r io.Reader, contentType string) func(*WatchEvent) error {
switch contentType {
case contentTypeProtobuf:
var buf []byte
br := bufio.NewReader(r)
return func(we *WatchEvent) error {
buf = slicesutil.SetLength(buf, 4)
if _, err := io.ReadFull(br, buf); err != nil {
return fmt.Errorf("failed to read size: %w", err)
}
length := int(binary.BigEndian.Uint32(buf))
buf = slicesutil.SetLength(buf, length)
if _, err := io.ReadFull(br, buf); err != nil {
return err
}
return we.unmarshalProtobuf(buf)
}
default:
d := json.NewDecoder(r)
return func(we *WatchEvent) error {
return d.Decode(we)
}
}
}
// Unknown is object, which is returned by API, when encoding is protobuf
type Unknown struct {
Raw []byte
}
// unmarshalProtobuf unmarshals Unknown according to spec
//
// See https://github.com/kubernetes/apimachinery/blob/master/pkg/runtime/generated.proto
func (r *Unknown) unmarshalProtobuf(src []byte) (err error) {
// message Unknown {
// optional bytes raw = 2;
// }
idx := len(protoPrefix)
if !bytes.Equal(protoPrefix, src[:idx]) {
return fmt.Errorf("protobuf message has invalid prefix")
}
src = src[idx:]
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Unknown: %w", err)
}
switch fc.FieldNum {
case 2:
raw, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Raw")
}
r.Raw = raw
}
}
return nil
}
func (r *Unknown) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendBytes(2, r.Raw)
}
// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set.
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
type WatchEvent struct {
Type string
Object json.RawMessage
Object []byte
}
// unmarshalProtobuf unmarshals WatchEvent according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *WatchEvent) unmarshalProtobuf(src []byte) (err error) {
// message WatchEvent {
// optional string type = 1;
// optional RawExtension object = 2;
// }
var fc easyproto.FieldContext
var rawFc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in WatchEvent: %w", err)
}
switch fc.FieldNum {
case 1:
eventType, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Type")
}
r.Type = strings.Clone(eventType)
case 2:
raw, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read RawExtension")
}
_, err := rawFc.NextField(raw)
if err != nil {
return fmt.Errorf("cannot next field in RawExtension")
}
if rawFc.FieldNum == 1 {
object, ok := rawFc.Bytes()
if !ok {
return fmt.Errorf("cannot read Raw")
}
r.Object = object
}
}
}
return nil
}
func (r *WatchEvent) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Type)
mm.AppendMessage(2).AppendBytes(1, r.Object)
}
// object is any Kubernetes object.
@@ -54,10 +181,10 @@ type object interface {
}
// parseObjectFunc must parse object from the given data.
type parseObjectFunc func(data []byte) (object, error)
type parseObjectFunc func(data []byte, contentType string) (object, error)
// parseObjectListFunc must parse objectList from the given r.
type parseObjectListFunc func(r io.Reader) (map[string]object, ListMeta, error)
type parseObjectListFunc func(data []byte, contentType string) (map[string]object, ListMeta, error)
// apiWatcher is used for watching for Kubernetes object changes and caching their latest states.
type apiWatcher struct {
@@ -506,6 +633,7 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http
if err := gw.setHeaders(req); err != nil {
return nil, fmt.Errorf("cannot set request headers: %w", err)
}
req.Header.Add("Accept", acceptHeader)
resp, err := gw.client.Do(req)
if err != nil {
return nil, err
@@ -720,7 +848,16 @@ func (uw *urlWatcher) reloadObjects() string {
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
return ""
}
objectsByKey, metadata, err := uw.parseObjectList(resp.Body)
contentType := resp.Header.Get("Content-Type")
bb := dataBufPool.Get()
defer dataBufPool.Put(bb)
_, err = bb.ReadFrom(resp.Body)
if err != nil {
logger.Errorf("cannot read objects from %q: %s", requestURL, err)
return ""
}
objectsByKey, metadata, err := uw.parseObjectList(bb.B, contentType)
_ = resp.Body.Close()
if err != nil {
logger.Errorf("cannot parse objects from %q: %s", requestURL, err)
@@ -836,7 +973,12 @@ func (uw *urlWatcher) watchForUpdates() {
continue
}
backoffDelay = minBackoffDelay
err = uw.readObjectUpdateStream(resp.Body)
contentType := resp.Header.Get("Content-Type")
idx := strings.Index(contentType, ";")
if idx >= 0 {
contentType = contentType[:idx]
}
err = uw.readObjectUpdateStream(resp.Body, contentType)
_ = resp.Body.Close()
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
@@ -850,17 +992,21 @@ func (uw *urlWatcher) watchForUpdates() {
}
// readObjectUpdateStream reads Kubernetes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader, contentType string) error {
gw := uw.gw
d := json.NewDecoder(r)
var we WatchEvent
unmarshalEvent := newEventUnmarshaller(r, contentType)
we := &WatchEvent{}
for {
if err := d.Decode(&we); err != nil {
return fmt.Errorf("cannot parse WatchEvent json response: %w", err)
if err := unmarshalEvent(we); err != nil {
return fmt.Errorf("cannot parse WatchEvent %q response: %w", contentType, err)
}
u := &Unknown{}
if err := u.unmarshalProtobuf(we.Object); err != nil {
return fmt.Errorf("cannot parse Unknown wrapper %q response: %w", contentType, err)
}
switch we.Type {
case "ADDED", "MODIFIED":
o, err := uw.parseObject(we.Object)
o, err := uw.parseObject(u.Raw, contentType)
if err != nil {
return fmt.Errorf("cannot parse %s object: %w", we.Type, err)
}
@@ -869,7 +1015,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.updateObjectLocked(key, o)
gw.mu.Unlock()
case "DELETED":
o, err := uw.parseObject(we.Object)
o, err := uw.parseObject(u.Raw, contentType)
if err != nil {
return fmt.Errorf("cannot parse %s object: %w", we.Type, err)
}
@@ -879,13 +1025,13 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
gw.mu.Unlock()
case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
bm, err := parseBookmark(we.Object)
bm, err := parseBookmark(u.Raw, contentType)
if err != nil {
return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err)
}
uw.resourceVersion = bm.Metadata.ResourceVersion
case "ERROR":
em, err := parseError(we.Object)
em, err := parseError(u.Raw, contentType)
if err != nil {
return fmt.Errorf("cannot parse error message from %q: %w", we.Object, err)
}
@@ -972,17 +1118,77 @@ type Bookmark struct {
Metadata BookmarkMetadata
}
// unmarshalProtobuf unmarshals Bookmark according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *Bookmark) unmarshalProtobuf(src []byte) (err error) {
// message Bookmark {
// BookmarkMetadata metadata = 1
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Bookmark: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Metadata")
}
m := &r.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Metadata: %w", err)
}
}
}
return nil
}
// BookmarkMetadata is metadata for Bookmark
type BookmarkMetadata struct {
ResourceVersion string
}
func parseBookmark(data []byte) (*Bookmark, error) {
var bm Bookmark
if err := json.Unmarshal(data, &bm); err != nil {
return nil, err
// unmarshalProtobuf unmarshals BookmarkMetadata according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *BookmarkMetadata) unmarshalProtobuf(src []byte) (err error) {
// message BookmarkMetadata {
// string resourceVersion = 1
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in BookmarkMetadata: %w", err)
}
switch fc.FieldNum {
case 1:
resourceVersion, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read ResourceVersion")
}
r.ResourceVersion = strings.Clone(resourceVersion)
}
}
return &bm, nil
return nil
}
func parseBookmark(data []byte, contentType string) (*Bookmark, error) {
bm := &Bookmark{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, &bm); err != nil {
return nil, err
}
case contentTypeProtobuf:
if err := bm.unmarshalProtobuf(data[len(protoPrefix):]); err != nil {
return nil, err
}
}
return bm, nil
}
// Error is an error message from Kubernetes Watch API.
@@ -990,12 +1196,47 @@ type Error struct {
Code int
}
func parseError(data []byte) (*Error, error) {
var em Error
if err := json.Unmarshal(data, &em); err != nil {
return nil, err
// unmarshalProtobuf unmarshals Error according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *Error) unmarshalProtobuf(src []byte) (err error) {
// message Error {
// int32 code = 1
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Error: %w", err)
}
switch fc.FieldNum {
case 1:
code, ok := fc.Int32()
if !ok {
return fmt.Errorf("cannot read Code")
}
r.Code = int(code)
}
}
return &em, nil
return nil
}
func parseError(data []byte, contentType string) (*Error, error) {
em := &Error{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, &em); err != nil {
return nil, err
}
case contentTypeProtobuf:
if !bytes.Equal(protoPrefix, data[:len(protoPrefix)]) {
return nil, fmt.Errorf("Error protobuf response has invalid message prefix")
}
if err := em.unmarshalProtobuf(data[len(protoPrefix):]); err != nil {
return nil, err
}
}
return em, nil
}
func getAPIPathsWithNamespaces(role string, namespaces []string, selectors []Selector) []string {

File diff suppressed because it is too large Load Diff

View File

@@ -1,14 +1,20 @@
package kubernetes
import (
"fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/easyproto"
)
// ObjectMeta represents ObjectMeta from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#objectmeta-v1-meta
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#objectmeta-v1-meta
type ObjectMeta struct {
Name string
Namespace string
@@ -18,16 +24,173 @@ type ObjectMeta struct {
OwnerReferences []OwnerReference
}
// unmarshalProtobuf unmarshals ObjectMeta according to spec
//
// See https://github.com/kubernetes/apimachinery/blob/master/pkg/apis/meta/v1/generated.proto
func (om *ObjectMeta) unmarshalProtobuf(src []byte) (err error) {
// message ObjectMeta {
// optional string name = 1;
// optional string namespace = 3;
// optional string uid = 5;
// map<string, string> labels = 11;
// map<string, string> annotations = 12;
// repeated OwnerReference ownerReferences = 13;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ObjectMeta: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
om.Name = strings.Clone(name)
case 3:
ns, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Namespace")
}
om.Namespace = strings.Clone(ns)
case 5:
uid, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read UID")
}
om.UID = strings.Clone(uid)
case 11:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Labels data")
}
if om.Labels == nil {
om.Labels = &promutil.Labels{}
}
om.Labels.Labels = slicesutil.SetLength(om.Labels.Labels, len(om.Labels.Labels)+1)
l := &om.Labels.Labels[len(om.Labels.Labels)-1]
if err := unmarshalLabel(l, data); err != nil {
return fmt.Errorf("cannot unmarshal Labels: %w", err)
}
case 12:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Annotations data")
}
if om.Annotations == nil {
om.Annotations = &promutil.Labels{}
}
om.Annotations.Labels = slicesutil.SetLength(om.Annotations.Labels, len(om.Annotations.Labels)+1)
l := &om.Annotations.Labels[len(om.Annotations.Labels)-1]
if err := unmarshalLabel(l, data); err != nil {
return fmt.Errorf("cannot unmarshal Annotations: %w", err)
}
case 13:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read OwnerReference")
}
om.OwnerReferences = slicesutil.SetLength(om.OwnerReferences, len(om.OwnerReferences)+1)
or := &om.OwnerReferences[len(om.OwnerReferences)-1]
if err := or.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("failed to unmarshal OwnerReference: %w", err)
}
}
}
return nil
}
func (om *ObjectMeta) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, om.Name)
mm.AppendString(3, om.Namespace)
mm.AppendString(5, om.UID)
if om.Labels != nil {
for _, l := range om.Labels.Labels {
marshalLabel(&l, mm.AppendMessage(11))
}
}
if om.Annotations != nil {
for _, l := range om.Annotations.Labels {
marshalLabel(&l, mm.AppendMessage(12))
}
}
for _, or := range om.OwnerReferences {
or.marshalProtobuf(mm.AppendMessage(13))
}
}
func (om *ObjectMeta) key() string {
return om.Namespace + "/" + om.Name
}
func marshalLabel(l *prompbmarshal.Label, mm *easyproto.MessageMarshaler) {
mm.AppendString(1, l.Name)
mm.AppendString(2, l.Value)
}
func unmarshalLabel(l *prompbmarshal.Label, src []byte) (err error) {
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Label: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
l.Name = strings.Clone(name)
case 2:
value, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Value")
}
l.Value = strings.Clone(value)
}
}
return nil
}
// ListMeta is a Kubernetes list metadata
// https://v1-17.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#listmeta-v1-meta
//
// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#listmeta-v1-meta
type ListMeta struct {
ResourceVersion string
}
// unmarshalProtobuf unmarshals ListMeta according to spec
//
// See https://github.com/kubernetes/apimachinery/blob/master/pkg/apis/meta/v1/generated.proto
func (r *ListMeta) unmarshalProtobuf(src []byte) (err error) {
// message ListMeta {
// optional string resourceVersion = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ListMeta: %w", err)
}
switch fc.FieldNum {
case 2:
resourceVersion, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read ResourceVersion")
}
r.ResourceVersion = strings.Clone(resourceVersion)
}
}
return nil
}
func (r *ListMeta) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(2, r.ResourceVersion)
}
func (om *ObjectMeta) registerLabelsAndAnnotations(prefix string, m *promutil.Labels) {
bb := bbPool.Get()
b := bb.B
@@ -64,16 +227,90 @@ func appendThreeStrings(dst []byte, a, b, c string) []byte {
// OwnerReference represents OwnerReferense from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#ownerreference-v1-meta
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#ownerreference-v1-meta
type OwnerReference struct {
Name string
Controller bool
Kind string
}
// unmarshalProtobuf unmarshals OwnerReference according to spec
//
// See https://github.com/kubernetes/apimachinery/blob/master/pkg/apis/meta/v1/generated.proto
func (r *OwnerReference) unmarshalProtobuf(src []byte) (err error) {
// message OwnerReference {
// optional string kind = 1;
// optional string name = 3;
// optional bool controller = 6;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ObjectMeta: %w", err)
}
switch fc.FieldNum {
case 1:
kind, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Kind")
}
r.Kind = strings.Clone(kind)
case 3:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
r.Name = strings.Clone(name)
case 6:
controller, ok := fc.Bool()
if !ok {
return fmt.Errorf("cannot read Controller")
}
r.Controller = controller
}
}
return nil
}
func (r *OwnerReference) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Kind)
mm.AppendString(3, r.Name)
mm.AppendBool(6, r.Controller)
}
// DaemonEndpoint represents DaemonEndpoint from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#daemonendpoint-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#daemonendpoint-v1-core
type DaemonEndpoint struct {
Port int
}
// unmarshalProtobuf unmarshals DaemonEndpoint according to spec
//
// See https://github.com/kubernetes/apimachinery/blob/master/pkg/apis/meta/v1/generated.proto
func (r *DaemonEndpoint) unmarshalProtobuf(src []byte) (err error) {
// message DaemonEndpoint {
// optional int32 port = 1;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in DaemonEndpoint: %w", err)
}
switch fc.FieldNum {
case 1:
port, ok := fc.Int32()
if !ok {
return fmt.Errorf("cannot read Port")
}
r.Port = int(port)
}
}
return nil
}
func (r *DaemonEndpoint) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendInt32(1, int32(r.Port))
}

View File

@@ -3,66 +3,241 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/easyproto"
)
func (eps *Endpoints) key() string {
return eps.Metadata.key()
}
func parseEndpointsList(r io.Reader) (map[string]object, ListMeta, error) {
var epsl EndpointsList
d := json.NewDecoder(r)
if err := d.Decode(&epsl); err != nil {
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointsList: %w", err)
func parseEndpointsList(data []byte, contentType string) (map[string]object, ListMeta, error) {
l := &EndpointsList{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, l); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal EndpointsList: %w", err)
}
case contentTypeProtobuf:
u := &Unknown{}
if err := u.unmarshalProtobuf(data); err != nil {
return nil, l.Metadata, err
}
if err := l.unmarshalProtobuf(u.Raw); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal EndpointsList: %w", err)
}
}
objectsByKey := make(map[string]object)
for _, eps := range epsl.Items {
objectsByKey[eps.key()] = eps
for i := range l.Items {
item := &l.Items[i]
objectsByKey[item.key()] = item
}
return objectsByKey, epsl.Metadata, nil
return objectsByKey, l.Metadata, nil
}
func parseEndpoints(data []byte) (object, error) {
var eps Endpoints
if err := json.Unmarshal(data, &eps); err != nil {
return nil, err
func parseEndpoints(data []byte, contentType string) (object, error) {
eps := &Endpoints{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, eps); err != nil {
return nil, err
}
case contentTypeProtobuf:
if err := eps.unmarshalProtobuf(data); err != nil {
return nil, err
}
}
return &eps, nil
return eps, nil
}
// EndpointsList implements k8s endpoints list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpointslist-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpointslist-v1-core
type EndpointsList struct {
Metadata ListMeta
Items []*Endpoints
Items []Endpoints
}
// unmarshalProtobuf unmarshals EndpointsList according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *EndpointsList) unmarshalProtobuf(src []byte) (err error) {
// message EndpointsList {
// optional ListMeta metadata = 1;
// repeated Endpoint items = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in EndpointsList: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ListMeta")
}
m := &r.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ListMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Items")
}
r.Items = slicesutil.SetLength(r.Items, len(r.Items)+1)
s := &r.Items[len(r.Items)-1]
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Items: %w", err)
}
}
}
return nil
}
func (r *EndpointsList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
r.Metadata.marshalProtobuf(mm.AppendMessage(1))
for _, pod := range r.Items {
pod.marshalProtobuf(mm.AppendMessage(2))
}
}
// Endpoints implements k8s endpoints.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpoints-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpoints-v1-core
type Endpoints struct {
Metadata ObjectMeta
Subsets []EndpointSubset
}
// unmarshalProtobuf unmarshals Endpoints according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (eps *Endpoints) unmarshalProtobuf(src []byte) (err error) {
// message Endpoints {
// optional ObjectMeta metadata = 1;
// repeated EndpointSubset subsets = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Endpoints: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ObjectMeta")
}
m := &eps.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ObjectMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Subset")
}
eps.Subsets = slicesutil.SetLength(eps.Subsets, len(eps.Subsets)+1)
s := &eps.Subsets[len(eps.Subsets)-1]
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Subset: %w", err)
}
}
}
return nil
}
func (eps *Endpoints) marshalProtobuf(mm *easyproto.MessageMarshaler) {
eps.Metadata.marshalProtobuf(mm.AppendMessage(1))
for _, subset := range eps.Subsets {
subset.marshalProtobuf(mm.AppendMessage(2))
}
}
// EndpointSubset implements k8s endpoint subset.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpointsubset-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpointsubset-v1-core
type EndpointSubset struct {
Addresses []EndpointAddress
NotReadyAddresses []EndpointAddress
Ports []EndpointPort
}
// unmarshalProtobuf unmarshals EndpointSubset according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *EndpointSubset) unmarshalProtobuf(src []byte) (err error) {
// message EndpointSubset {
// repeated EndpointAddress addresses = 1;
// repeated EndpointAddress notReadyAddresses = 2;
// repeated EndpointPort ports = 3;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in EndpointSubset: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Address")
}
r.Addresses = slicesutil.SetLength(r.Addresses, len(r.Addresses)+1)
a := &r.Addresses[len(r.Addresses)-1]
if err := a.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Address: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read NotReadyAddress")
}
r.NotReadyAddresses = slicesutil.SetLength(r.NotReadyAddresses, len(r.NotReadyAddresses)+1)
a := &r.NotReadyAddresses[len(r.NotReadyAddresses)-1]
if err := a.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal NotReadyAddress: %w", err)
}
case 3:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Port")
}
r.Ports = slicesutil.SetLength(r.Ports, len(r.Ports)+1)
p := &r.Ports[len(r.Ports)-1]
if err := p.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Port: %w", err)
}
}
}
return nil
}
func (r *EndpointSubset) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, addr := range r.Addresses {
addr.marshalProtobuf(mm.AppendMessage(1))
}
for _, addr := range r.NotReadyAddresses {
addr.marshalProtobuf(mm.AppendMessage(2))
}
for _, port := range r.Ports {
port.marshalProtobuf(mm.AppendMessage(3))
}
}
// EndpointAddress implements k8s endpoint address.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpointaddress-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpointaddress-v1-core
type EndpointAddress struct {
Hostname string
IP string
@@ -70,18 +245,119 @@ type EndpointAddress struct {
TargetRef ObjectReference
}
// unmarshalProtobuf unmarshals EndpointAddress according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *EndpointAddress) unmarshalProtobuf(src []byte) (err error) {
// message EndpointAddress {
// optional string ip = 1;
// optional ObjectReference targetRef = 2;
// optional string hostname = 3;
// optional string nodeName = 4;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in EndpointAddress: %w", err)
}
switch fc.FieldNum {
case 1:
ip, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read IP")
}
r.IP = strings.Clone(ip)
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read TargetRef")
}
ref := &r.TargetRef
if err := ref.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal TargetRef: %w", err)
}
case 3:
hostname, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Hostname")
}
r.Hostname = strings.Clone(hostname)
case 4:
nodeName, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read NodeName")
}
r.NodeName = strings.Clone(nodeName)
}
}
return nil
}
func (r *EndpointAddress) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.IP)
r.TargetRef.marshalProtobuf(mm.AppendMessage(2))
mm.AppendString(3, r.Hostname)
mm.AppendString(4, r.NodeName)
}
// ObjectReference implements k8s object reference.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#objectreference-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#objectreference-v1-core
type ObjectReference struct {
Kind string
Name string
Namespace string
}
// unmarshalProtobuf unmarshals ObjectReference according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *ObjectReference) unmarshalProtobuf(src []byte) (err error) {
// message ObjectReference {
// optional string kind = 1;
// optional string namespace = 2;
// optional string name = 3;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ObjectReference: %w", err)
}
switch fc.FieldNum {
case 1:
kind, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Kind")
}
r.Kind = strings.Clone(kind)
case 2:
namespace, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Namespace")
}
r.Namespace = strings.Clone(namespace)
case 3:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
r.Name = strings.Clone(name)
}
}
return nil
}
func (r *ObjectReference) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Kind)
mm.AppendString(2, r.Namespace)
mm.AppendString(3, r.Name)
}
// EndpointPort implements k8s endpoint port.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpointport-v1-discovery-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpointport-v1-discovery-k8s-io
type EndpointPort struct {
AppProtocol string
Name string
@@ -89,6 +365,59 @@ type EndpointPort struct {
Protocol string
}
// unmarshalProtobuf unmarshals EndpointPort accordint to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *EndpointPort) unmarshalProtobuf(src []byte) (err error) {
// message EndpointPort {
// optional string name = 1;
// optional int32 port = 2;
// optional string protocol = 3;
// optional string appProtocol = 4;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in EndpointPort: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
r.Name = strings.Clone(name)
case 2:
port, ok := fc.Int32()
if !ok {
return fmt.Errorf("cannot read Port")
}
r.Port = int(port)
case 3:
protocol, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Protocol")
}
r.Protocol = strings.Clone(protocol)
case 4:
appProtocol, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read AppProtocol")
}
r.AppProtocol = strings.Clone(appProtocol)
}
}
return nil
}
func (r *EndpointPort) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Name)
mm.AppendInt32(2, int32(r.Port))
mm.AppendString(3, r.Protocol)
mm.AppendString(4, r.AppProtocol)
}
// getTargetLabels returns labels for each endpoint in eps.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints

View File

@@ -1,17 +1,15 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
func TestParseEndpointsListFailure(t *testing.T) {
f := func(s string) {
f := func(s, contentType string) {
t.Helper()
r := bytes.NewBufferString(s)
objectsByKey, _, err := parseEndpointsList(r)
objectsByKey, _, err := parseEndpointsList([]byte(s), contentType)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@@ -19,10 +17,10 @@ func TestParseEndpointsListFailure(t *testing.T) {
t.Fatalf("unexpected non-empty objectsByKey: %v", objectsByKey)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
f(``, contentTypeJSON)
f(`[1,23]`, contentTypeJSON)
f(`{"items":[{"metadata":1}]}`, contentTypeJSON)
f(`{"items":[{"metadata":{"labels":[1]}}]}`, contentTypeJSON)
}
func TestParseEndpointsListSuccess(t *testing.T) {
@@ -79,8 +77,7 @@ func TestParseEndpointsListSuccess(t *testing.T) {
]
}
`
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseEndpointsList(r)
objectsByKey, meta, err := parseEndpointsList([]byte(data), contentTypeJSON)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View File

@@ -3,36 +3,56 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/easyproto"
)
func (eps *EndpointSlice) key() string {
return eps.Metadata.key()
}
func parseEndpointSliceList(r io.Reader) (map[string]object, ListMeta, error) {
var epsl EndpointSliceList
d := json.NewDecoder(r)
if err := d.Decode(&epsl); err != nil {
return nil, epsl.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList: %w", err)
func parseEndpointSliceList(data []byte, contentType string) (map[string]object, ListMeta, error) {
l := &EndpointSliceList{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, l); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList: %w", err)
}
case contentTypeProtobuf:
u := &Unknown{}
if err := u.unmarshalProtobuf(data); err != nil {
return nil, l.Metadata, err
}
if err := l.unmarshalProtobuf(u.Raw); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal EndpointSliceList: %w", err)
}
}
objectsByKey := make(map[string]object)
for _, eps := range epsl.Items {
objectsByKey[eps.key()] = eps
for i := range l.Items {
item := &l.Items[i]
objectsByKey[item.key()] = item
}
return objectsByKey, epsl.Metadata, nil
return objectsByKey, l.Metadata, nil
}
func parseEndpointSlice(data []byte) (object, error) {
var eps EndpointSlice
if err := json.Unmarshal(data, &eps); err != nil {
return nil, err
func parseEndpointSlice(data []byte, contentType string) (object, error) {
eps := &EndpointSlice{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, eps); err != nil {
return nil, err
}
case contentTypeProtobuf:
if err := eps.unmarshalProtobuf(data); err != nil {
return nil, err
}
}
return &eps, nil
return eps, nil
}
// getTargetLabels returns labels for eps.
@@ -179,28 +199,74 @@ func getEndpointSliceLabels(eps *EndpointSlice, addr string, ea Endpoint, epp En
}
if ea.Hostname != "" {
m.Add("__meta_kubernetes_endpointslice_endpoint_hostname", ea.Hostname)
}
if ea.NodeName != "" {
m.Add("__meta_kubernetes_endpointslice_endpoint_node_name", ea.NodeName)
}
for k, v := range ea.Topology {
m.Add(discoveryutil.SanitizeLabelName("__meta_kubernetes_endpointslice_endpoint_topology_"+k), v)
m.Add(discoveryutil.SanitizeLabelName("__meta_kubernetes_endpointslice_endpoint_topology_present_"+k), "true")
if ea.Zone != "" {
m.Add("__meta_kubernetes_endpointslice_endpoint_zone", ea.Zone)
}
return m
}
// EndpointSliceList - implements kubernetes endpoint slice list object, that groups service endpoints slices.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpointslicelist-v1-discovery-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpointslicelist-v1-discovery-k8s-io
type EndpointSliceList struct {
Metadata ListMeta
Items []*EndpointSlice
Items []EndpointSlice
}
// unmarshalProtobuf unmarshals EndpointSliceList according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *EndpointSliceList) unmarshalProtobuf(src []byte) (err error) {
// message EndpointSliceList {
// optional ListMeta metadata = 1;
// repeated EndpoinSlice items = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in EndpointSliceList: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ListMeta")
}
m := &r.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ListMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Items")
}
r.Items = slicesutil.SetLength(r.Items, len(r.Items)+1)
s := &r.Items[len(r.Items)-1]
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Items: %w", err)
}
}
}
return nil
}
func (r *EndpointSliceList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
r.Metadata.marshalProtobuf(mm.AppendMessage(1))
for _, item := range r.Items {
item.marshalProtobuf(mm.AppendMessage(2))
}
}
// EndpointSlice - implements kubernetes endpoint slice.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpointslice-v1-discovery-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpointslice-v1-discovery-k8s-io
type EndpointSlice struct {
Metadata ObjectMeta
Endpoints []Endpoint
@@ -208,23 +274,197 @@ type EndpointSlice struct {
Ports []EndpointPort
}
// unmarshalProtobuf unmarshals EndpointSlice according to spec
//
// See https://github.com/kubernetes/api/blob/master/discovery/v1/generated.proto
func (eps *EndpointSlice) unmarshalProtobuf(src []byte) (err error) {
// message EndpointSlice {
// optional ObjectMeta metadata = 1;
// repeated Endpoint endpoints = 2;
// repeated EndpointPort ports = 3;
// optional string addressType = 4;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in EndpointSlice: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Metadata")
}
m := &eps.Metadata
if err = m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Metadata: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Endpoint")
}
eps.Endpoints = slicesutil.SetLength(eps.Endpoints, len(eps.Endpoints)+1)
e := &eps.Endpoints[len(eps.Endpoints)-1]
if err := e.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Endpoint: %w", err)
}
case 3:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read EndpointPort")
}
eps.Ports = slicesutil.SetLength(eps.Ports, len(eps.Ports)+1)
p := &eps.Ports[len(eps.Ports)-1]
if err := p.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal EndpointPort: %w", err)
}
case 4:
addressType, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read addressType")
}
eps.AddressType = strings.Clone(addressType)
}
}
return nil
}
func (eps *EndpointSlice) marshalProtobuf(mm *easyproto.MessageMarshaler) {
eps.Metadata.marshalProtobuf(mm.AppendMessage(1))
for _, ep := range eps.Endpoints {
ep.marshalProtobuf(mm.AppendMessage(2))
}
for _, p := range eps.Ports {
p.marshalProtobuf(mm.AppendMessage(3))
}
mm.AppendString(4, eps.AddressType)
}
// Endpoint implements kubernetes object endpoint for endpoint slice.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpoint-v1-discovery-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpoint-v1-discovery-k8s-io
type Endpoint struct {
Addresses []string
Conditions EndpointConditions
Hostname string
TargetRef ObjectReference
Topology map[string]string
NodeName string
Zone string
}
// unmarshalProtobuf unmarshals Endpoint according to spec
//
// See https://github.com/kubernetes/api/blob/master/discovery/v1/generated.proto
func (r *Endpoint) unmarshalProtobuf(src []byte) (err error) {
// message Endpoint {
// repeated string addresses = 1;
// optional EndpointConditions conditions = 2;
// optional string hostname = 3;
// optional ObjectReference targetRef = 4;
// optional string nodeName = 6;
// optional string zone = 7;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Endpoint: %w", err)
}
switch fc.FieldNum {
case 1:
address, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Address")
}
r.Addresses = append(r.Addresses, address)
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Conditions")
}
c := &r.Conditions
if err := c.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Conditions: %w", err)
}
case 3:
hostname, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Hostname")
}
r.Hostname = strings.Clone(hostname)
case 4:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read TargetRef")
}
t := &r.TargetRef
if err := t.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal TargetRef: %w", err)
}
case 6:
nodeName, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read NodeName")
}
r.NodeName = strings.Clone(nodeName)
case 7:
zone, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Zone")
}
r.Zone = strings.Clone(zone)
}
}
return nil
}
func (r *Endpoint) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, addr := range r.Addresses {
mm.AppendString(1, addr)
}
r.Conditions.marshalProtobuf(mm.AppendMessage(2))
mm.AppendString(3, r.Hostname)
r.TargetRef.marshalProtobuf(mm.AppendMessage(4))
mm.AppendString(6, r.NodeName)
mm.AppendString(7, r.Zone)
}
// EndpointConditions implements kubernetes endpoint condition.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#endpointconditions-v1-discovery-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#endpointconditions-v1-discovery-k8s-io
type EndpointConditions struct {
Ready bool
Serving bool
Terminating bool
}
// unmarshalProtobuf unmarshals EndpointConditions according to spec
//
// See https://github.com/kubernetes/api/blob/master/discovery/v1/generated.proto
func (r *EndpointConditions) unmarshalProtobuf(src []byte) (err error) {
// message EndpointConditions {
// optional bool ready = 1;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in EndpointConditions: %w", err)
}
switch fc.FieldNum {
case 1:
ready, ok := fc.Bool()
if !ok {
return fmt.Errorf("cannot read Ready")
}
r.Ready = ready
}
}
return nil
}
func (r *EndpointConditions) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendBool(1, r.Ready)
}

View File

@@ -1,16 +1,14 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
func TestParseEndpointSliceListFail(t *testing.T) {
f := func(data string) {
r := bytes.NewBufferString(data)
objectsByKey, _, err := parseEndpointSliceList(r)
f := func(data, contentType string) {
objectsByKey, _, err := parseEndpointSliceList([]byte(data), contentType)
if err == nil {
t.Fatalf("unexpected result, test must fail! data: %s", data)
}
@@ -19,12 +17,12 @@ func TestParseEndpointSliceListFail(t *testing.T) {
}
}
f(``)
f(`{"items": [1,2,3]`)
f(``, contentTypeJSON)
f(`{"items": [1,2,3]`, contentTypeJSON)
f(`{"items": [
{
"metadata": {
"name": "kubernetes"}]}`)
"name": "kubernetes"}]}`, contentTypeJSON)
}
@@ -137,9 +135,7 @@ func TestParseEndpointSliceListSuccess(t *testing.T) {
"uid": "36a545ff-dbba-4192-a5f6-1dbb0c21c73d",
"resourceVersion": "603"
},
"topology": {
"kubernetes.io/hostname": "kind-control-plane"
}
"nodeName": "kind-control-plane"
}
],
"ports": [
@@ -157,8 +153,7 @@ func TestParseEndpointSliceListSuccess(t *testing.T) {
}
]
}`
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseEndpointSliceList(r)
objectsByKey, meta, err := parseEndpointSliceList([]byte(data), contentTypeJSON)
if err != nil {
t.Fatalf("cannot parse data for EndpointSliceList: %v", err)
}
@@ -178,8 +173,7 @@ func TestParseEndpointSliceListSuccess(t *testing.T) {
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_io_hostname": "kind-control-plane",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_io_hostname": "true",
"__meta_kubernetes_endpointslice_endpoint_node_name": "kind-control-plane",
"__meta_kubernetes_endpointslice_label_endpointslice_kubernetes_io_managed_by": "endpointslice-controller.k8s.io",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "kube-dns",
"__meta_kubernetes_endpointslice_labelpresent_endpointslice_kubernetes_io_managed_by": "true",
@@ -200,8 +194,7 @@ func TestParseEndpointSliceListSuccess(t *testing.T) {
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_topology_kubernetes_io_hostname": "kind-control-plane",
"__meta_kubernetes_endpointslice_endpoint_topology_present_kubernetes_io_hostname": "true",
"__meta_kubernetes_endpointslice_endpoint_node_name": "kind-control-plane",
"__meta_kubernetes_endpointslice_label_endpointslice_kubernetes_io_managed_by": "endpointslice-controller.k8s.io",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "kube-dns",
"__meta_kubernetes_endpointslice_labelpresent_endpointslice_kubernetes_io_managed_by": "true",
@@ -266,9 +259,6 @@ func TestGetEndpointsliceLabels(t *testing.T) {
Namespace: "default",
Name: "test-pod",
},
Topology: map[string]string{
"x": "y",
},
},
},
AddressType: "foobar",
@@ -378,8 +368,6 @@ func TestGetEndpointsliceLabels(t *testing.T) {
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_hostname": "foo.bar",
"__meta_kubernetes_endpointslice_endpoint_node_name": "test-node",
"__meta_kubernetes_endpointslice_endpoint_topology_present_x": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_x": "y",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "test-svc",
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
"__meta_kubernetes_endpointslice_name": "test-eps",
@@ -429,8 +417,6 @@ func TestGetEndpointsliceLabels(t *testing.T) {
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_hostname": "foo.bar",
"__meta_kubernetes_endpointslice_endpoint_node_name": "test-node",
"__meta_kubernetes_endpointslice_endpoint_topology_present_x": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_x": "y",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "test-svc",
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
"__meta_kubernetes_endpointslice_name": "test-eps",
@@ -507,10 +493,8 @@ func TestGetEndpointsliceLabels(t *testing.T) {
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_hostname": "foo.bar",
"__meta_kubernetes_endpointslice_endpoint_node_name": "test-node",
"__meta_kubernetes_endpointslice_endpoint_topology_present_x": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_x": "y",
"__meta_kubernetes_endpointslice_endpoint_hostname": "foo.bar",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "test-svc",
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
"__meta_kubernetes_endpointslice_name": "test-eps",
@@ -566,8 +550,6 @@ func TestGetEndpointsliceLabels(t *testing.T) {
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_hostname": "foo.bar",
"__meta_kubernetes_endpointslice_endpoint_node_name": "test-node",
"__meta_kubernetes_endpointslice_endpoint_topology_present_x": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_x": "y",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "test-svc",
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
"__meta_kubernetes_endpointslice_name": "test-eps",

View File

@@ -3,91 +3,392 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/easyproto"
)
func (ig *Ingress) key() string {
return ig.Metadata.key()
}
func parseIngressList(r io.Reader) (map[string]object, ListMeta, error) {
var igl IngressList
d := json.NewDecoder(r)
if err := d.Decode(&igl); err != nil {
return nil, igl.Metadata, fmt.Errorf("cannot unmarshal IngressList: %w", err)
func parseIngressList(data []byte, contentType string) (map[string]object, ListMeta, error) {
l := &IngressList{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, l); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal IngressList: %w", err)
}
case contentTypeProtobuf:
u := &Unknown{}
if err := u.unmarshalProtobuf(data); err != nil {
return nil, l.Metadata, err
}
if err := l.unmarshalProtobuf(u.Raw); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal IngressList: %w", err)
}
}
objectsByKey := make(map[string]object)
for _, ig := range igl.Items {
objectsByKey[ig.key()] = ig
for i := range l.Items {
item := &l.Items[i]
objectsByKey[item.key()] = item
}
return objectsByKey, igl.Metadata, nil
return objectsByKey, l.Metadata, nil
}
func parseIngress(data []byte) (object, error) {
var ig Ingress
if err := json.Unmarshal(data, &ig); err != nil {
return nil, err
func parseIngress(data []byte, contentType string) (object, error) {
ig := &Ingress{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, ig); err != nil {
return nil, err
}
case contentTypeProtobuf:
if err := ig.unmarshalProtobuf(data); err != nil {
return nil, err
}
}
return &ig, nil
return ig, nil
}
// IngressList represents ingress list in k8s.
//
// See https://v1-21.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#ingresslist-v1-networking-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#ingresslist-v1-networking-k8s-io
type IngressList struct {
Metadata ListMeta
Items []*Ingress
Items []Ingress
}
// unmarshalProtobuf unmarshals IngressList according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *IngressList) unmarshalProtobuf(src []byte) (err error) {
// message IngressList {
// optional ListMeta metadata = 1;
// repeated Ingress items = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in IngressList: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ListMeta")
}
m := &r.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ListMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Items")
}
r.Items = slicesutil.SetLength(r.Items, len(r.Items)+1)
i := &r.Items[len(r.Items)-1]
if err := i.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Items: %w", err)
}
}
}
return nil
}
func (r *IngressList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
r.Metadata.marshalProtobuf(mm.AppendMessage(1))
for _, item := range r.Items {
item.marshalProtobuf(mm.AppendMessage(2))
}
}
// Ingress represents ingress in k8s.
//
// See https://v1-21.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#ingress-v1-networking-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#ingress-v1-networking-k8s-io
type Ingress struct {
Metadata ObjectMeta
Spec IngressSpec
}
func (ig *Ingress) marshalProtobuf(mm *easyproto.MessageMarshaler) {
ig.Metadata.marshalProtobuf(mm.AppendMessage(1))
ig.Spec.marshalProtobuf(mm.AppendMessage(2))
}
// unmarshalProtobuf unmarshals Ingress according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (ig *Ingress) unmarshalProtobuf(src []byte) (err error) {
// message Ingress {
// optional ObjectMeta metadata = 1;
// repeated IngressSpec spec = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Ingress: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ObjectMeta")
}
m := &ig.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ObjectMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Spec")
}
s := &ig.Spec
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Spec: %w", err)
}
}
}
return nil
}
// IngressSpec represents ingress spec in k8s.
//
// See https://v1-21.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#ingressspec-v1-networking-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#ingressspec-v1-networking-k8s-io
type IngressSpec struct {
TLS []IngressTLS `json:"tls"`
Rules []IngressRule
IngressClassName string
}
// unmarshalProtobuf unmarshals IngressSpec according to spec
//
// See https://github.com/kubernetes/api/blob/master/extensions/v1beta1/generated.proto
func (r *IngressSpec) unmarshalProtobuf(src []byte) (err error) {
// message IngressSpec {
// repeated IngressTLS tls = 2;
// repeated IngressRule rules = 3;
// optional string ingressClassName = 4;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in IngressSpec: %w", err)
}
switch fc.FieldNum {
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read IngressTLS")
}
r.TLS = slicesutil.SetLength(r.TLS, len(r.TLS)+1)
t := &r.TLS[len(r.TLS)-1]
if err := t.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal IngressTLS: %w", err)
}
case 3:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read IngressRule")
}
r.Rules = slicesutil.SetLength(r.Rules, len(r.Rules)+1)
rule := &r.Rules[len(r.Rules)-1]
if err := rule.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal IngressRule: %w", err)
}
case 4:
ingressClassName, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read IngressClassName")
}
r.IngressClassName = strings.Clone(ingressClassName)
}
}
return nil
}
func (r *IngressSpec) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, tls := range r.TLS {
tls.marshalProtobuf(mm.AppendMessage(2))
}
for _, rule := range r.Rules {
rule.marshalProtobuf(mm.AppendMessage(3))
}
mm.AppendString(4, r.IngressClassName)
}
// IngressTLS represents ingress TLS spec in k8s.
//
// See https://v1-21.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#ingresstls-v1-networking-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#ingresstls-v1-networking-k8s-io
type IngressTLS struct {
Hosts []string
}
// unmarshalProtobuf unmarshals IngressTLS according to spec
//
// See https://github.com/kubernetes/api/blob/master/extensions/v1beta1/generated.proto
func (r *IngressTLS) unmarshalProtobuf(src []byte) (err error) {
// message IngressTLS {
// repeated string hosts = 1;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in IngressTLS: %w", err)
}
switch fc.FieldNum {
case 1:
host, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Host")
}
r.Hosts = append(r.Hosts, host)
}
}
return nil
}
func (r *IngressTLS) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, host := range r.Hosts {
mm.AppendString(1, host)
}
}
// IngressRule represents ingress rule in k8s.
//
// See https://v1-21.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#ingressrule-v1-networking-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#ingressrule-v1-networking-k8s-io
type IngressRule struct {
Host string
HTTP HTTPIngressRuleValue `json:"http"`
}
// unmarshalProtobuf unmarshals IngressRule according to spec
//
// See https://github.com/kubernetes/api/blob/master/extensions/v1beta1/generated.proto
func (r *IngressRule) unmarshalProtobuf(src []byte) (err error) {
// message IngressRule {
// optional string host = 1;
// optional IngressRuleValue ingressRuleValue = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in IngressRule: %w", err)
}
switch fc.FieldNum {
case 1:
host, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Host")
}
r.Host = strings.Clone(host)
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read IngressRuleValue")
}
v := &r.HTTP
if err := v.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal IngressRuleValue: %w", err)
}
}
}
return nil
}
func (r *IngressRule) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Host)
r.HTTP.marshalProtobuf(mm.AppendMessage(2))
}
// HTTPIngressRuleValue represents HTTP ingress rule value in k8s.
//
// See https://v1-21.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#httpingressrulevalue-v1-networking-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#httpingressrulevalue-v1-networking-k8s-io
type HTTPIngressRuleValue struct {
Paths []HTTPIngressPath
}
// unmarshalProtobuf unmarshals HTTPIngressRuleValue according to spec
//
// See https://github.com/kubernetes/api/blob/master/extensions/v1beta1/generated.proto
func (r *HTTPIngressRuleValue) unmarshalProtobuf(src []byte) (err error) {
// message HTTPIngressRuleValue {
// repeated HTTPIngressPath paths = 1;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ObjectMeta: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read HTTPIngressPath")
}
r.Paths = slicesutil.SetLength(r.Paths, len(r.Paths)+1)
p := r.Paths[len(r.Paths)-1]
if err := p.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal HTTPIngressPath: %w", err)
}
}
}
return nil
}
func (r *HTTPIngressRuleValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, value := range r.Paths {
value.marshalProtobuf(mm.AppendMessage(1))
}
}
// HTTPIngressPath represents HTTP ingress path in k8s.
//
// See https://v1-21.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#httpingresspath-v1-networking-k8s-io
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#httpingresspath-v1-networking-k8s-io
type HTTPIngressPath struct {
Path string
}
// unmarshalProtobuf unmarshals HTTPIngressPath according to spec
//
// See https://github.com/kubernetes/api/blob/master/extensions/v1beta1/generated.proto
func (r *HTTPIngressPath) unmarshalProtobuf(src []byte) (err error) {
// message HTTPIngressPath {
// optional string path = 1;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ObjectMeta: %w", err)
}
switch fc.FieldNum {
case 1:
path, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Path")
}
r.Path = strings.Clone(path)
}
}
return nil
}
func (r *HTTPIngressPath) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Path)
}
// getTargetLabels returns labels for ig.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ingress

View File

@@ -1,7 +1,6 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
@@ -28,10 +27,9 @@ func TestMatchesHostPattern(t *testing.T) {
}
func TestParseIngressListFailure(t *testing.T) {
f := func(s string) {
f := func(s string, contentType string) {
t.Helper()
r := bytes.NewBufferString(s)
objectsByKey, _, err := parseIngressList(r)
objectsByKey, _, err := parseIngressList([]byte(s), contentType)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@@ -39,10 +37,10 @@ func TestParseIngressListFailure(t *testing.T) {
t.Fatalf("unexpected non-empty IngressList: %v", objectsByKey)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
f(``, contentTypeJSON)
f(`[1,23]`, contentTypeJSON)
f(`{"items":[{"metadata":1}]}`, contentTypeJSON)
f(`{"items":[{"metadata":{"labels":[1]}}]}`, contentTypeJSON)
}
func TestParseIngressListSuccess(t *testing.T) {
@@ -92,8 +90,7 @@ func TestParseIngressListSuccess(t *testing.T) {
}
]
}`
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseIngressList(r)
objectsByKey, meta, err := parseIngressList([]byte(data), contentTypeJSON)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View File

@@ -3,10 +3,12 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/easyproto"
)
// getNodesLabels returns labels for k8s nodes obtained from the given cfg
@@ -14,74 +16,336 @@ func (n *Node) key() string {
return n.Metadata.key()
}
func parseNodeList(r io.Reader) (map[string]object, ListMeta, error) {
var nl NodeList
d := json.NewDecoder(r)
if err := d.Decode(&nl); err != nil {
return nil, nl.Metadata, fmt.Errorf("cannot unmarshal NodeList: %w", err)
func parseNodeList(data []byte, contentType string) (map[string]object, ListMeta, error) {
l := &NodeList{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, l); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal NodeList: %w", err)
}
case contentTypeProtobuf:
u := &Unknown{}
if err := u.unmarshalProtobuf(data); err != nil {
return nil, l.Metadata, err
}
if err := l.unmarshalProtobuf(u.Raw); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal NodeList: %w", err)
}
}
objectsByKey := make(map[string]object)
for _, n := range nl.Items {
objectsByKey[n.key()] = n
for i := range l.Items {
item := &l.Items[i]
objectsByKey[item.key()] = item
}
return objectsByKey, nl.Metadata, nil
return objectsByKey, l.Metadata, nil
}
func parseNode(data []byte) (object, error) {
var n Node
if err := json.Unmarshal(data, &n); err != nil {
return nil, err
func parseNode(data []byte, contentType string) (object, error) {
n := &Node{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, n); err != nil {
return nil, err
}
case contentTypeProtobuf:
if err := n.unmarshalProtobuf(data); err != nil {
return nil, err
}
}
return &n, nil
return n, nil
}
// NodeList represents NodeList from k8s API.
//
// See https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/node-v1/#NodeList
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#nodelist-v1-core
type NodeList struct {
Metadata ListMeta
Items []*Node
Items []Node
}
// unmarshalProtobuf unmarshals NodeList according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *NodeList) unmarshalProtobuf(src []byte) (err error) {
// message NodeList {
// optional ListMeta metadata = 1;
// repeated Node items = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in NodeList: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ListMeta")
}
m := &r.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ListMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Items")
}
r.Items = slicesutil.SetLength(r.Items, len(r.Items)+1)
s := &r.Items[len(r.Items)-1]
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Items: %w", err)
}
}
}
return nil
}
func (r *NodeList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
r.Metadata.marshalProtobuf(mm.AppendMessage(1))
for _, item := range r.Items {
item.marshalProtobuf(mm.AppendMessage(2))
}
}
// Node represents Node from k8s API.
//
// See https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/node-v1/
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#node-v1-core
type Node struct {
Metadata ObjectMeta
Status NodeStatus
Spec NodeSpec
}
// unmarshalProtobuf unmarshals Node according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (n *Node) unmarshalProtobuf(src []byte) (err error) {
// message Node {
// optional ObjectMeta metadata = 1;
// repeated NodeSpec spec = 2;
// optional NodeStatus status = 3;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Node: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ObjectMeta")
}
m := &n.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ObjectMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Spec")
}
s := &n.Spec
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Spec: %w", err)
}
case 3:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Status")
}
s := &n.Status
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Status: %w", err)
}
}
}
return nil
}
func (n *Node) marshalProtobuf(mm *easyproto.MessageMarshaler) {
n.Metadata.marshalProtobuf(mm.AppendMessage(1))
n.Spec.marshalProtobuf(mm.AppendMessage(2))
n.Status.marshalProtobuf(mm.AppendMessage(3))
}
// NodeStatus represents NodeStatus from k8s API.
//
// See https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/node-v1/#NodeStatus
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#nodestatus-v1-core
type NodeStatus struct {
Addresses []NodeAddress
DaemonEndpoints NodeDaemonEndpoints
}
// unmarshalProtobuf unmarshals NodeStatus according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *NodeStatus) unmarshalProtobuf(src []byte) (err error) {
// message NodeStatus {
// repeated NodeAddress addresses = 5;
// optional NodeDaemonEndpoints daemonEndpoints = 6;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in NodeStatus: %w", err)
}
switch fc.FieldNum {
case 5:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Address")
}
r.Addresses = slicesutil.SetLength(r.Addresses, len(r.Addresses)+1)
a := &r.Addresses[len(r.Addresses)-1]
if err := a.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Address: %w", err)
}
case 6:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read DaemonEndpoints")
}
e := &r.DaemonEndpoints
if err := e.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal DaemonEndpoint: %w", err)
}
}
}
return nil
}
func (r *NodeStatus) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, addr := range r.Addresses {
addr.marshalProtobuf(mm.AppendMessage(5))
}
r.DaemonEndpoints.marshalProtobuf(mm.AppendMessage(6))
}
// NodeSpec represents NodeSpec from k8s API.
//
// See https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/node-v1/#NodeSpec
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#nodespec-v1-core
type NodeSpec struct {
ProviderID string
}
// unmarshalProtobuf unmarshals NodeSpec according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *NodeSpec) unmarshalProtobuf(src []byte) (err error) {
// message NodeSpec {
// optional string providerID = 3;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in NodeSpec: %w", err)
}
switch fc.FieldNum {
case 1:
providerID, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read ProviderID")
}
r.ProviderID = strings.Clone(providerID)
}
}
return nil
}
func (r *NodeSpec) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(3, r.ProviderID)
}
// NodeAddress represents NodeAddress from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodeaddress-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#nodeaddress-v1-core
type NodeAddress struct {
Type string
Address string
}
// unmarshalProtobuf unmarshals NodeAddress according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *NodeAddress) unmarshalProtobuf(src []byte) (err error) {
// message NodeAddress {
// optional string type = 1;
// optional string address = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in NodeAddress: %w", err)
}
switch fc.FieldNum {
case 1:
addrType, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Type")
}
r.Type = strings.Clone(addrType)
case 2:
addr, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Address")
}
r.Address = strings.Clone(addr)
}
}
return nil
}
func (r *NodeAddress) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Type)
mm.AppendString(2, r.Address)
}
// NodeDaemonEndpoints represents NodeDaemonEndpoints from k8s API.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#nodedaemonendpoints-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#nodedaemonendpoints-v1-core
type NodeDaemonEndpoints struct {
KubeletEndpoint DaemonEndpoint
}
// unmarshalProtobuf unmarshals NodeDaemonEndpoints according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *NodeDaemonEndpoints) unmarshalProtobuf(src []byte) (err error) {
// message NodeDaemonEndpoints {
// optional DaemonEndpoint kubeletEndpoint = 1;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in NodeDaemonEndpoint: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read KubeletEndpoint")
}
e := &r.KubeletEndpoint
if err := e.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal KubeletEndpoint: %w", err)
}
}
}
return nil
}
func (r *NodeDaemonEndpoints) marshalProtobuf(mm *easyproto.MessageMarshaler) {
r.KubeletEndpoint.marshalProtobuf(mm.AppendMessage(1))
}
// getTargetLabels returns labels for the given n.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#node

View File

@@ -1,7 +1,6 @@
package kubernetes
import (
"bytes"
"reflect"
"sort"
"strconv"
@@ -11,10 +10,9 @@ import (
)
func TestParseNodeListFailure(t *testing.T) {
f := func(s string) {
f := func(s string, contentType string) {
t.Helper()
r := bytes.NewBufferString(s)
objectsByKey, _, err := parseNodeList(r)
objectsByKey, _, err := parseNodeList([]byte(s), contentType)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@@ -22,10 +20,10 @@ func TestParseNodeListFailure(t *testing.T) {
t.Fatalf("unexpected non-empty objectsByKey: %v", objectsByKey)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
f(``, contentTypeJSON)
f(`[1,23]`, contentTypeJSON)
f(`{"items":[{"metadata":1}]}`, contentTypeJSON)
f(`{"items":[{"metadata":{"labels":[1]}}]}`, contentTypeJSON)
}
func TestParseNodeListSuccess(t *testing.T) {
@@ -231,8 +229,7 @@ func TestParseNodeListSuccess(t *testing.T) {
]
}
`
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseNodeList(r)
objectsByKey, meta, err := parseNodeList([]byte(data), contentTypeJSON)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View File

@@ -3,68 +3,244 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/easyproto"
)
func (p *Pod) key() string {
return p.Metadata.key()
}
func parsePodList(r io.Reader) (map[string]object, ListMeta, error) {
var pl PodList
d := json.NewDecoder(r)
if err := d.Decode(&pl); err != nil {
return nil, pl.Metadata, fmt.Errorf("cannot unmarshal PodList: %w", err)
func parsePodList(data []byte, contentType string) (map[string]object, ListMeta, error) {
l := &PodList{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, l); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal PodList: %w", err)
}
case contentTypeProtobuf:
u := &Unknown{}
if err := u.unmarshalProtobuf(data); err != nil {
return nil, l.Metadata, err
}
if err := l.unmarshalProtobuf(u.Raw); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal PodList: %w", err)
}
}
objectsByKey := make(map[string]object)
for _, p := range pl.Items {
objectsByKey[p.key()] = p
for i := range l.Items {
item := &l.Items[i]
objectsByKey[item.key()] = item
}
return objectsByKey, pl.Metadata, nil
return objectsByKey, l.Metadata, nil
}
func parsePod(data []byte) (object, error) {
var p Pod
if err := json.Unmarshal(data, &p); err != nil {
return nil, err
func parsePod(data []byte, contentType string) (object, error) {
p := &Pod{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, p); err != nil {
return nil, err
}
case contentTypeProtobuf:
if err := p.unmarshalProtobuf(data); err != nil {
return nil, err
}
}
return &p, nil
return p, nil
}
// PodList implements k8s pod list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#podlist-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#podlist-v1-core
type PodList struct {
Metadata ListMeta
Items []*Pod
Items []Pod
}
// unmarshalProtobuf unmarshals PodList according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *PodList) unmarshalProtobuf(src []byte) (err error) {
// message PodList {
// optional ListMeta metadata = 1;
// repeated Pod items = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in PodList: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ListMeta")
}
m := &r.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ListMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Items")
}
r.Items = slicesutil.SetLength(r.Items, len(r.Items)+1)
s := &r.Items[len(r.Items)-1]
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Items: %w", err)
}
}
}
return nil
}
func (r *PodList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
r.Metadata.marshalProtobuf(mm.AppendMessage(1))
for _, item := range r.Items {
item.marshalProtobuf(mm.AppendMessage(2))
}
}
// Pod implements k8s pod.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#pod-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#pod-v1-core
type Pod struct {
Metadata ObjectMeta
Spec PodSpec
Status PodStatus
}
// unmarshalProtobuf unmarshals Pod according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (p *Pod) unmarshalProtobuf(src []byte) (err error) {
// message Pod {
// optional ObjectMeta metadata = 1;
// optional PodSpec spec = 2;
// optional PodStatus status = 3;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Pod: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ObjectMeta")
}
m := &p.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ObjectMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Spec")
}
s := &p.Spec
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Spec: %w", err)
}
case 3:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Status")
}
s := &p.Status
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Status: %w", err)
}
}
}
return nil
}
func (p *Pod) marshalProtobuf(mm *easyproto.MessageMarshaler) {
p.Metadata.marshalProtobuf(mm.AppendMessage(1))
p.Spec.marshalProtobuf(mm.AppendMessage(2))
p.Status.marshalProtobuf(mm.AppendMessage(3))
}
// PodSpec implements k8s pod spec.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#podspec-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#podspec-v1-core
type PodSpec struct {
NodeName string
Containers []Container
InitContainers []Container
}
// unmarshalProtobuf unmarshals PodSpec according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *PodSpec) unmarshalProtobuf(src []byte) (err error) {
// message PodSpec {
// repeated Container containers = 2;
// optional string nodeName = 10;
// repeated Container initContainers = 20;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in PodSpec: %w", err)
}
switch fc.FieldNum {
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Container")
}
r.Containers = slicesutil.SetLength(r.Containers, len(r.Containers)+1)
c := &r.Containers[len(r.Containers)-1]
if err := c.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Container: %w", err)
}
case 10:
nodeName, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read NodeName")
}
r.NodeName = strings.Clone(nodeName)
case 20:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read InitContainer")
}
r.InitContainers = slicesutil.SetLength(r.InitContainers, len(r.InitContainers)+1)
c := &r.InitContainers[len(r.InitContainers)-1]
if err := c.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal InitContainer: %w", err)
}
}
}
return nil
}
func (r *PodSpec) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, container := range r.Containers {
container.marshalProtobuf(mm.AppendMessage(2))
}
mm.AppendString(10, r.NodeName)
for _, container := range r.InitContainers {
container.marshalProtobuf(mm.AppendMessage(20))
}
}
// Container implements k8s container.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#container-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#container-v1-core
type Container struct {
Name string
Image string
@@ -72,6 +248,65 @@ type Container struct {
RestartPolicy string
}
// unmarshalProtobuf unmarshals Container according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *Container) unmarshalProtobuf(src []byte) (err error) {
// message Container {
// optional string name = 1;
// optional string image = 2;
// repeated ContainerPort ports = 6;
// optional string restartPolicy = 24;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Container: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
r.Name = strings.Clone(name)
case 2:
image, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Image")
}
r.Image = strings.Clone(image)
case 6:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ContainerPort")
}
r.Ports = slicesutil.SetLength(r.Ports, len(r.Ports)+1)
p := &r.Ports[len(r.Ports)-1]
if err := p.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ContainerPort: %w", err)
}
case 24:
restartPolicy, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read RestartPolicy")
}
r.RestartPolicy = strings.Clone(restartPolicy)
}
}
return nil
}
func (r *Container) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Name)
mm.AppendString(2, r.Image)
for _, p := range r.Ports {
p.marshalProtobuf(mm.AppendMessage(6))
}
mm.AppendString(24, r.RestartPolicy)
}
// ContainerPort implements k8s container port.
type ContainerPort struct {
Name string
@@ -79,9 +314,54 @@ type ContainerPort struct {
Protocol string
}
// unmarshalProtobuf unmarshals ContainerPort according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *ContainerPort) unmarshalProtobuf(src []byte) (err error) {
// message ContainerPort {
// optional string name = 1;
// optional int32 containerPort = 3;
// optional string protocol = 4;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ContainerPort: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
r.Name = strings.Clone(name)
case 3:
containerPort, ok := fc.Int32()
if !ok {
return fmt.Errorf("cannot read ContainerPort")
}
r.ContainerPort = int(containerPort)
case 4:
protocol, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Protocol")
}
r.Protocol = strings.Clone(protocol)
}
}
return nil
}
func (r *ContainerPort) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Name)
mm.AppendInt32(3, int32(r.ContainerPort))
mm.AppendString(4, r.Protocol)
}
// PodStatus implements k8s pod status.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#podstatus-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#podstatus-v1-core
type PodStatus struct {
Phase string
PodIP string
@@ -91,37 +371,273 @@ type PodStatus struct {
InitContainerStatuses []ContainerStatus
}
// unmarshalProtobuf unmarshals PodStatus according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *PodStatus) unmarshalProtobuf(src []byte) (err error) {
// message PodStatus {
// optional string phase = 1;
// repeated PodCondition conditions = 2;
// optional string hostIP = 5;
// optional string podIP = 6;
// repeated ContainerStatus containerStatuses = 8;
// repeated ContainerStatus initContainerStatuses = 10;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in PodState: %w", err)
}
switch fc.FieldNum {
case 1:
phase, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Phase")
}
r.Phase = strings.Clone(phase)
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read PodCondition")
}
r.Conditions = slicesutil.SetLength(r.Conditions, len(r.Conditions)+1)
c := &r.Conditions[len(r.Conditions)-1]
if err := c.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal PodCondition: %w", err)
}
case 5:
hostIP, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read HostIP")
}
r.HostIP = strings.Clone(hostIP)
case 6:
podIP, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read PodIP")
}
r.PodIP = strings.Clone(podIP)
case 8:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ContainerStatus")
}
r.ContainerStatuses = slicesutil.SetLength(r.ContainerStatuses, len(r.ContainerStatuses)+1)
s := &r.ContainerStatuses[len(r.ContainerStatuses)-1]
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ContainerStatus: %w", err)
}
case 10:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read InitContainerStatus")
}
r.InitContainerStatuses = slicesutil.SetLength(r.InitContainerStatuses, len(r.InitContainerStatuses)+1)
s := &r.InitContainerStatuses[len(r.InitContainerStatuses)-1]
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal InitContainerStatus: %w", err)
}
}
}
return nil
}
func (r *PodStatus) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Phase)
for _, condition := range r.Conditions {
condition.marshalProtobuf(mm.AppendMessage(2))
}
mm.AppendString(5, r.HostIP)
mm.AppendString(6, r.PodIP)
for _, status := range r.ContainerStatuses {
status.marshalProtobuf(mm.AppendMessage(8))
}
for _, status := range r.InitContainerStatuses {
status.marshalProtobuf(mm.AppendMessage(10))
}
}
// PodCondition implements k8s pod condition.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.31/#podcondition-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#podcondition-v1-core
type PodCondition struct {
Type string
Status string
}
// unmarshalProtobuf unmarshals PodCondition according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *PodCondition) unmarshalProtobuf(src []byte) (err error) {
// message PodCondition {
// optional string type = 1;
// optional string status = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in PodCondition: %w", err)
}
switch fc.FieldNum {
case 1:
conditionType, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Type")
}
r.Type = strings.Clone(conditionType)
case 2:
status, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Status")
}
r.Status = strings.Clone(status)
}
}
return nil
}
func (r *PodCondition) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Type)
mm.AppendString(2, r.Status)
}
// ContainerStatus implements k8s container status.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#containerstatus-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#containerstatus-v1-core
type ContainerStatus struct {
Name string
ContainerID string
State ContainerState
}
// unmarshalProtobuf unmarshals ContainerStatus according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *ContainerStatus) unmarshalProtobuf(src []byte) (err error) {
// message ContainerStatus {
// optional string name = 1;
// optional ContainerState state = 2;
// optional string containerID = 8;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ContainerStatus: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
r.Name = strings.Clone(name)
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ContainerState")
}
t := &r.State
if err := t.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ContainerState: %w", err)
}
case 8:
containerID, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read ContainerID")
}
r.ContainerID = strings.Clone(containerID)
}
}
return nil
}
func (r *ContainerStatus) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Name)
r.State.marshalProtobuf(mm.AppendMessage(2))
mm.AppendString(8, r.ContainerID)
}
// ContainerState implements k8s container state.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#containerstatus-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#containerstatus-v1-core
type ContainerState struct {
Terminated *ContainerStateTerminated
}
// unmarshalProtobuf unmarshals ContainerState according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *ContainerState) unmarshalProtobuf(src []byte) (err error) {
// message ContainerState {
// optional ContainerStateTerminated terminated = 3;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ContainerState: %w", err)
}
switch fc.FieldNum {
case 3:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Terminated")
}
t := &ContainerStateTerminated{}
if err := t.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Terminated: %w", err)
}
r.Terminated = t
}
}
return nil
}
func (r *ContainerState) marshalProtobuf(mm *easyproto.MessageMarshaler) {
if r.Terminated != nil {
r.Terminated.marshalProtobuf(mm.AppendMessage(3))
}
}
// ContainerStateTerminated implements k8s terminated container state.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#containerstatus-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#containerstatus-v1-core
type ContainerStateTerminated struct {
ExitCode int
}
// unmarshalProtobuf unmarshals ContainerStateTerminated according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *ContainerStateTerminated) unmarshalProtobuf(src []byte) (err error) {
// message ContainerStateTerminated {
// optional int32 exitCode = 1;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ContainerStateTerminated: %w", err)
}
switch fc.FieldNum {
case 1:
exitCode, ok := fc.Int32()
if !ok {
return fmt.Errorf("cannot read ExitCode")
}
r.ExitCode = int(exitCode)
}
}
return nil
}
func (r *ContainerStateTerminated) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendInt32(1, int32(r.ExitCode))
}
func getContainerID(p *Pod, containerName string, isInit bool) string {
cs := p.getContainerStatus(containerName, isInit)
if cs == nil {

View File

@@ -1,17 +1,15 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
func TestParsePodListFailure(t *testing.T) {
f := func(s string) {
f := func(s, contentType string) {
t.Helper()
r := bytes.NewBufferString(s)
objectsByKey, _, err := parsePodList(r)
objectsByKey, _, err := parsePodList([]byte(s), contentType)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@@ -19,10 +17,10 @@ func TestParsePodListFailure(t *testing.T) {
t.Fatalf("unexpected non-empty objectsByKey: %v", objectsByKey)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
f(``, contentTypeJSON)
f(`[1,23]`, contentTypeJSON)
f(`{"items":[{"metadata":1}]}`, contentTypeJSON)
f(`{"items":[{"metadata":{"labels":[1]}}]}`, contentTypeJSON)
}
const testPodsList = `
@@ -691,10 +689,9 @@ const testPodsListIPv6AddressNoPorts = `
`
func TestParsePodListSuccess(t *testing.T) {
f := func(response string, expectedLabels []*promutil.Labels) {
f := func(response, contentType string, expectedLabels []*promutil.Labels) {
t.Helper()
r := bytes.NewBufferString(response)
objectsByKey, meta, err := parsePodList(r)
objectsByKey, meta, err := parsePodList([]byte(response), contentType)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -708,7 +705,7 @@ func TestParsePodListSuccess(t *testing.T) {
}
}
f(testPodsList, []*promutil.Labels{
f(testPodsList, contentTypeJSON, []*promutil.Labels{
promutil.NewLabelsFromMap(map[string]string{
"__address__": "172.17.0.2:1234",
@@ -751,7 +748,7 @@ func TestParsePodListSuccess(t *testing.T) {
}),
})
f(testPodsListIPv6Address, []*promutil.Labels{
f(testPodsListIPv6Address, contentTypeJSON, []*promutil.Labels{
promutil.NewLabelsFromMap(map[string]string{
"__address__": "[fd01:10:100:0:e38f:6f4b:9c53:9e4e]:1234",
@@ -793,7 +790,7 @@ func TestParsePodListSuccess(t *testing.T) {
"__meta_kubernetes_pod_annotationpresent_kubernetes_io_config_source": "true",
}),
})
f(testPodsListIPv6AddressNoPorts, []*promutil.Labels{
f(testPodsListIPv6AddressNoPorts, contentTypeJSON, []*promutil.Labels{
promutil.NewLabelsFromMap(map[string]string{
"__address__": "[fd01:10:100:0:e38f:6f4b:9c53:9e4e]",

View File

@@ -1,14 +1,12 @@
package kubernetes
import (
"bytes"
"fmt"
"testing"
)
func BenchmarkPodGetTargetLabels(b *testing.B) {
r := bytes.NewBufferString(testPodsList)
objectsByKey, _, err := parsePodList(r)
objectsByKey, _, err := parsePodList([]byte(testPodsList), contentTypeJSON)
if err != nil {
panic(fmt.Errorf("BUG: unexpected error: %w", err))
}

View File

@@ -3,57 +3,166 @@ package kubernetes
import (
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/easyproto"
)
func (s *Service) key() string {
return s.Metadata.key()
}
func parseServiceList(r io.Reader) (map[string]object, ListMeta, error) {
var sl ServiceList
d := json.NewDecoder(r)
if err := d.Decode(&sl); err != nil {
return nil, sl.Metadata, fmt.Errorf("cannot unmarshal ServiceList: %w", err)
func parseServiceList(data []byte, contentType string) (map[string]object, ListMeta, error) {
l := &ServiceList{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, l); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal ServiceList: %w", err)
}
case contentTypeProtobuf:
u := &Unknown{}
if err := u.unmarshalProtobuf(data); err != nil {
return nil, l.Metadata, err
}
if err := l.unmarshalProtobuf(u.Raw); err != nil {
return nil, l.Metadata, fmt.Errorf("cannot unmarshal ServiceList: %w", err)
}
}
objectsByKey := make(map[string]object)
for _, s := range sl.Items {
objectsByKey[s.key()] = s
for i := range l.Items {
item := &l.Items[i]
objectsByKey[item.key()] = item
}
return objectsByKey, sl.Metadata, nil
return objectsByKey, l.Metadata, nil
}
func parseService(data []byte) (object, error) {
var s Service
if err := json.Unmarshal(data, &s); err != nil {
return nil, err
func parseService(data []byte, contentType string) (object, error) {
s := &Service{}
switch contentType {
case contentTypeJSON:
if err := json.Unmarshal(data, s); err != nil {
return nil, err
}
case contentTypeProtobuf:
if err := s.unmarshalProtobuf(data); err != nil {
return nil, err
}
}
return &s, nil
return s, nil
}
// ServiceList is k8s service list.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicelist-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#servicelist-v1-core
type ServiceList struct {
Metadata ListMeta
Items []*Service
Items []Service
}
// unmarshalProtobuf unmarshals ServiceList according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *ServiceList) unmarshalProtobuf(src []byte) (err error) {
// message ServiceList {
// optional ListMeta metadata = 1;
// repeated Service items = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in ServiceList: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ListMeta")
}
m := &r.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ListMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Items")
}
r.Items = slicesutil.SetLength(r.Items, len(r.Items)+1)
s := &r.Items[len(r.Items)-1]
if err := s.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Items: %w", err)
}
}
}
return nil
}
func (r *ServiceList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
r.Metadata.marshalProtobuf(mm.AppendMessage(1))
for _, item := range r.Items {
item.marshalProtobuf(mm.AppendMessage(2))
}
}
// Service is k8s service.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#service-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#service-v1-core
type Service struct {
Metadata ObjectMeta
Spec ServiceSpec
}
// unmarshalProtobuf unmarshals Service according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (s *Service) unmarshalProtobuf(src []byte) (err error) {
// message Service {
// optional ObjectMeta metadata = 1;
// repeated ServiceSpec spec = 2;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Service: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read ObjectMeta")
}
m := &s.Metadata
if err := m.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal ObjectMeta: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Spec")
}
spec := &s.Spec
if err := spec.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Spec: %w", err)
}
}
}
return nil
}
func (s *Service) marshalProtobuf(mm *easyproto.MessageMarshaler) {
s.Metadata.marshalProtobuf(mm.AppendMessage(1))
s.Spec.marshalProtobuf(mm.AppendMessage(2))
}
// ServiceSpec is k8s service spec.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#servicespec-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#servicespec-v1-core
type ServiceSpec struct {
ClusterIP string
ExternalName string
@@ -61,15 +170,119 @@ type ServiceSpec struct {
Ports []ServicePort
}
// unmarshalProtobuf unmarshals ServiceSpec according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *ServiceSpec) unmarshalProtobuf(src []byte) (err error) {
// message ServiceSpec {
// repeated ServicePort ports = 1;
// optional string clusterIP = 3;
// optional string type = 4;
// optional string externalName = 10;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Service: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read Port")
}
r.Ports = slicesutil.SetLength(r.Ports, len(r.Ports)+1)
p := &r.Ports[len(r.Ports)-1]
if err := p.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal Port: %w", err)
}
case 3:
clusterIP, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read ClusterIP")
}
r.ClusterIP = strings.Clone(clusterIP)
case 4:
serviceType, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Type")
}
r.Type = strings.Clone(serviceType)
case 10:
externalName, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read ExternalName")
}
r.ExternalName = strings.Clone(externalName)
}
}
return nil
}
func (r *ServiceSpec) marshalProtobuf(mm *easyproto.MessageMarshaler) {
for _, p := range r.Ports {
p.marshalProtobuf(mm.AppendMessage(1))
}
mm.AppendString(3, r.ClusterIP)
mm.AppendString(4, r.Type)
mm.AppendString(10, r.ExternalName)
}
// ServicePort is k8s service port.
//
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#serviceport-v1-core
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#serviceport-v1-core
type ServicePort struct {
Name string
Protocol string
Port int
}
// unmarshalProtobuf unmarshals ServicePort according to spec
//
// See https://github.com/kubernetes/api/blob/master/core/v1/generated.proto
func (r *ServicePort) unmarshalProtobuf(src []byte) (err error) {
// message ServicePort {
// optional string name = 1;
// optional string protocol = 2;
// optional int32 port = 3;
// }
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read next field in Service: %w", err)
}
switch fc.FieldNum {
case 1:
name, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Name")
}
r.Name = strings.Clone(name)
case 2:
protocol, ok := fc.String()
if !ok {
return fmt.Errorf("cannot read Protocol")
}
r.Protocol = strings.Clone(protocol)
case 3:
port, ok := fc.Int32()
if !ok {
return fmt.Errorf("cannot read Port")
}
r.Port = int(port)
}
}
return nil
}
func (r *ServicePort) marshalProtobuf(mm *easyproto.MessageMarshaler) {
mm.AppendString(1, r.Name)
mm.AppendString(2, r.Protocol)
mm.AppendInt32(3, int32(r.Port))
}
// getTargetLabels returns labels for each port of the given s.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#service

View File

@@ -1,17 +1,15 @@
package kubernetes
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
func TestParseServiceListFailure(t *testing.T) {
f := func(s string) {
f := func(s, contentType string) {
t.Helper()
r := bytes.NewBufferString(s)
objectsByKey, _, err := parseServiceList(r)
objectsByKey, _, err := parseServiceList([]byte(s), contentType)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@@ -19,10 +17,10 @@ func TestParseServiceListFailure(t *testing.T) {
t.Fatalf("unexpected non-empty objectsByKey: %v", objectsByKey)
}
}
f(``)
f(`[1,23]`)
f(`{"items":[{"metadata":1}]}`)
f(`{"items":[{"metadata":{"labels":[1]}}]}`)
f(``, contentTypeJSON)
f(`[1,23]`, contentTypeJSON)
f(`{"items":[{"metadata":1}]}`, contentTypeJSON)
f(`{"items":[{"metadata":{"labels":[1]}}]}`, contentTypeJSON)
}
func TestParseServiceListSuccess(t *testing.T) {
@@ -89,8 +87,7 @@ func TestParseServiceListSuccess(t *testing.T) {
]
}
`
r := bytes.NewBufferString(data)
objectsByKey, meta, err := parseServiceList(r)
objectsByKey, meta, err := parseServiceList([]byte(data), contentTypeJSON)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}