[#16] Add GRPC metrics interceptor
All checks were successful
DCO / DCO (pull_request) Successful in 45s

Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
Pavel Pogodaev 2024-09-27 14:54:09 +03:00
parent 59356180d5
commit f01c92784d
10 changed files with 587 additions and 3 deletions

View file

@ -38,6 +38,21 @@
<artifactId>commons-codec</artifactId> <artifactId>commons-codec</artifactId>
<version>1.17.0</version> <version>1.17.0</version>
</dependency> </dependency>
<!-- Prometheus instrumentation -->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.16.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
<version>0.16.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_common</artifactId>
<version>0.16.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View file

@ -18,8 +18,11 @@ import info.frostfs.sdk.jdo.NetworkSettings;
import info.frostfs.sdk.jdo.PutObjectParameters; import info.frostfs.sdk.jdo.PutObjectParameters;
import info.frostfs.sdk.services.*; import info.frostfs.sdk.services.*;
import info.frostfs.sdk.services.impl.*; import info.frostfs.sdk.services.impl.*;
import info.frostfs.sdk.services.impl.interceptor.Configuration;
import info.frostfs.sdk.services.impl.interceptor.MonitoringClientInterceptor;
import info.frostfs.sdk.utils.Validator; import info.frostfs.sdk.utils.Validator;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import java.util.List; import java.util.List;
@ -40,6 +43,9 @@ public class FrostFSClient implements ContainerClient, ObjectClient, NetmapClien
? clientSettings.getChannel() ? clientSettings.getChannel()
: initGrpcChannel(clientSettings); : initGrpcChannel(clientSettings);
MonitoringClientInterceptor monitoringClientInterceptor = MonitoringClientInterceptor
.create(Configuration.allMetrics());
channel = ClientInterceptors.intercept(channel, monitoringClientInterceptor);
ClientEnvironment clientEnvironment = ClientEnvironment clientEnvironment =
new ClientEnvironment(clientSettings.getKey(), channel, new Version(), this); new ClientEnvironment(clientSettings.getKey(), channel, new Version(), this);

View file

@ -0,0 +1,170 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.Metadata;
import io.grpc.Status;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import static info.frostfs.sdk.services.impl.interceptor.Labels.*;
public class ClientMetrics {
private static final List<String> defaultRequestLabels =
Arrays.asList("grpc_type", "grpc_service", "grpc_method");
private static final List<String> defaultResponseLabels =
Arrays.asList("grpc_type", "grpc_service", "grpc_method", "code", "grpc_code");
private static final Counter.Builder rpcStartedBuilder =
Counter.build()
.namespace("grpc")
.subsystem("client")
.name("started")
.help("Total number of RPCs started on the client.");
private static final Counter.Builder rpcCompletedBuilder =
Counter.build()
.namespace("grpc")
.subsystem("client")
.name("completed")
.help("Total number of RPCs completed on the client, regardless of success or failure.");
private static final Histogram.Builder completedLatencySecondsBuilder =
Histogram.build()
.namespace("grpc")
.subsystem("client")
.name("completed_latency_seconds")
.help("Histogram of rpc response latency (in seconds) for completed rpcs.");
private static final Counter.Builder streamMessagesReceivedBuilder =
Counter.build()
.namespace("grpc")
.subsystem("client")
.name("msg_received")
.help("Total number of stream messages received from the server.");
private static final Counter.Builder streamMessagesSentBuilder =
Counter.build()
.namespace("grpc")
.subsystem("client")
.name("msg_sent")
.help("Total number of stream messages sent by the client.");
private final List<Metadata.Key<String>> labelHeaderKeys;
private final Counter rpcStarted;
private final Counter rpcCompleted;
private final Counter streamMessagesReceived;
private final Counter streamMessagesSent;
private final Optional<Histogram> completedLatencySeconds;
private final GrpcMethod method;
private ClientMetrics(
List<Metadata.Key<String>> labelHeaderKeys,
GrpcMethod method,
Counter rpcStarted,
Counter rpcCompleted,
Counter streamMessagesReceived,
Counter streamMessagesSent,
Optional<Histogram> completedLatencySeconds) {
this.labelHeaderKeys = labelHeaderKeys;
this.method = method;
this.rpcStarted = rpcStarted;
this.rpcCompleted = rpcCompleted;
this.streamMessagesReceived = streamMessagesReceived;
this.streamMessagesSent = streamMessagesSent;
this.completedLatencySeconds = completedLatencySeconds;
}
public void recordCallStarted(Metadata metadata) {
addLabels(rpcStarted, customLabels(metadata, labelHeaderKeys), method).inc();
}
public void recordClientHandled(Status.Code code, Metadata metadata) {
List<String> allLabels = new ArrayList<>();
allLabels.add(code.toString());
allLabels.add(code.toString());
allLabels.addAll(customLabels(metadata, labelHeaderKeys));
addLabels(rpcCompleted, allLabels, method).inc();
}
public void recordStreamMessageSent(Metadata metadata) {
addLabels(streamMessagesSent, customLabels(metadata, labelHeaderKeys), method).inc();
}
public void recordStreamMessageReceived(Metadata metadata) {
addLabels(streamMessagesReceived, customLabels(metadata, labelHeaderKeys), method).inc();
}
/**
* Only has any effect if monitoring is configured to include latency histograms. Otherwise, this
* does nothing.
*/
public void recordLatency(double latencySec, Metadata metadata) {
if (completedLatencySeconds.isEmpty()) {
return;
}
addLabels(completedLatencySeconds.get(), customLabels(metadata, labelHeaderKeys), method)
.observe(latencySec);
}
/** Knows how to produce {@link ClientMetrics} instances for individual methods. */
static class Factory {
private final List<Metadata.Key<String>> labelHeaderKeys;
private final Counter rpcStarted;
private final Counter rpcCompleted;
private final Counter streamMessagesReceived;
private final Counter streamMessagesSent;
private final Optional<Histogram> completedLatencySeconds;
Factory(Configuration configuration) {
CollectorRegistry registry = configuration.getCollectorRegistry();
this.labelHeaderKeys = metadataKeys(configuration.getLabelHeaders());
this.rpcStarted =
rpcStartedBuilder
.labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders()))
.register(registry);
this.rpcCompleted =
rpcCompletedBuilder
.labelNames(asArray(defaultResponseLabels, configuration.getSanitizedLabelHeaders()))
.register(registry);
this.streamMessagesReceived =
streamMessagesReceivedBuilder
.labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders()))
.register(registry);
this.streamMessagesSent =
streamMessagesSentBuilder
.labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders()))
.register(registry);
if (configuration.isIncludeLatencyHistograms()) {
this.completedLatencySeconds =
Optional.of(
ClientMetrics.completedLatencySecondsBuilder
.buckets(configuration.getLatencyBuckets())
.labelNames(
asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders()))
.register(registry));
} else {
this.completedLatencySeconds = Optional.empty();
}
}
/** Creates a {@link ClientMetrics} for the supplied gRPC method. */
ClientMetrics createMetricsForMethod(GrpcMethod grpcMethod) {
return new ClientMetrics(
labelHeaderKeys,
grpcMethod,
rpcStarted,
rpcCompleted,
streamMessagesReceived,
streamMessagesSent,
completedLatencySeconds);
}
}
}

View file

@ -0,0 +1,153 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.prometheus.client.CollectorRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class Configuration {
private static final double[] DEFAULT_LATENCY_BUCKETS =
new double[] {.001, .005, .01, .05, 0.075, .1, .25, .5, 1, 2, 5, 10};
private final boolean isIncludeLatencyHistograms;
private final CollectorRegistry collectorRegistry;
private final double[] latencyBuckets;
private final List<String> labelHeaders;
private final boolean isAddCodeLabelToHistograms;
private Configuration(
boolean isIncludeLatencyHistograms,
CollectorRegistry collectorRegistry,
double[] latencyBuckets,
List<String> labelHeaders,
boolean isAddCodeLabelToHistograms) {
this.isIncludeLatencyHistograms = isIncludeLatencyHistograms;
this.collectorRegistry = collectorRegistry;
this.latencyBuckets = latencyBuckets;
this.labelHeaders = labelHeaders;
this.isAddCodeLabelToHistograms = isAddCodeLabelToHistograms;
}
/** Returns a {@link Configuration} for recording all cheap metrics about the rpcs. */
public static Configuration cheapMetricsOnly() {
return new Configuration(
false /* isIncludeLatencyHistograms */,
CollectorRegistry.defaultRegistry,
DEFAULT_LATENCY_BUCKETS,
new ArrayList<>(),
false /* isAddCodeLabelToHistograms */);
}
/**
* Returns a {@link Configuration} for recording all metrics about the rpcs. This includes metrics
* which might produce a lot of data, such as latency histograms.
*/
public static Configuration allMetrics() {
return new Configuration(
true /* isIncludeLatencyHistograms */,
CollectorRegistry.defaultRegistry,
DEFAULT_LATENCY_BUCKETS,
new ArrayList<>(),
false);
}
/**
* Returns a copy {@link Configuration} with the difference that Prometheus metrics are recorded
* using the supplied {@link CollectorRegistry}.
*/
public Configuration withCollectorRegistry(CollectorRegistry collectorRegistry) {
return new Configuration(
isIncludeLatencyHistograms,
collectorRegistry,
latencyBuckets,
labelHeaders,
isAddCodeLabelToHistograms);
}
/**
* Returns a copy {@link Configuration} with the difference that the latency histogram values are
* recorded with the specified set of buckets.
*/
public Configuration withLatencyBuckets(double[] buckets) {
return new Configuration(
isIncludeLatencyHistograms,
collectorRegistry,
buckets,
labelHeaders,
isAddCodeLabelToHistograms);
}
/**
* Returns a copy {@link Configuration} that recognizes the given list of header names and uses
* their value from each request as prometheus labels.
*
* <p>Since hyphens is a common character in header names, and since Prometheus does not allow
* hyphens in label names, All hyphens in the list of header names will be converted to
* underscores before being added as metric label names.
*
* <p>If one of the headers added here is absent in one of the requests, its metric value for that
* request will be an empty string.
*
* <p>Example: {@code withLabelHeaders(Arrays.asList("User-Agent"))} will make all metrics carry a
* label "User_Agent", with label value filled in from the value of the "User-Agent" header of
* each request.
*/
public Configuration withLabelHeaders(List<String> headers) {
List<String> newHeaders = new ArrayList<>(labelHeaders);
newHeaders.addAll(headers);
return new Configuration(
isIncludeLatencyHistograms,
collectorRegistry,
latencyBuckets,
newHeaders,
isAddCodeLabelToHistograms);
}
/**
* Returns a copy {@link Configuration} with the difference that status code label will be added
* to latency histogram. If latency histogram itself is disabled, this takes no effect. Warning:
* this will increase the number of histograms by a factor of actually happened codes (up to
* {@link io.grpc.Status.Code} values count), which could lead to additional local memory usage
* and load on prometheus (storage and memory usage, query-time complexity)
*/
public Configuration withCodeLabelInLatencyHistogram() {
return new Configuration(
isIncludeLatencyHistograms,
collectorRegistry,
latencyBuckets,
labelHeaders,
true /* isAddCodeLabelToHistograms */);
}
/** Returns whether or not latency histograms for calls should be included. */
public boolean isIncludeLatencyHistograms() {
return isIncludeLatencyHistograms;
}
/** Returns the {@link CollectorRegistry} used to record stats. */
public CollectorRegistry getCollectorRegistry() {
return collectorRegistry;
}
/** Returns the histogram buckets to use for latency metrics. */
public double[] getLatencyBuckets() {
return latencyBuckets;
}
/** Returns the configured list of headers to be used as labels. */
public List<String> getLabelHeaders() {
return labelHeaders;
}
/** Returns whether or not status code label should be added to latency histogram. */
public boolean isAddCodeLabelToHistograms() {
return isAddCodeLabelToHistograms;
}
/**
* Returns the sanitized version of the label headers, after turning all hyphens to underscores.
*/
public List<String> getSanitizedLabelHeaders() {
return labelHeaders.stream().map(h -> h.replaceAll("-", "_")).collect(Collectors.toList());
}
}

View file

@ -0,0 +1,45 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
public class GrpcMethod {
private final String serviceName;
private final String methodName;
private final MethodType type;
private GrpcMethod(String serviceName, String methodName, MethodType type) {
this.serviceName = serviceName;
this.methodName = methodName;
this.type = type;
}
static GrpcMethod of(MethodDescriptor<?, ?> method) {
String serviceName = MethodDescriptor.extractFullServiceName(method.getFullMethodName());
// Full method names are of the form: "full.serviceName/MethodName". We extract the last part.
String methodName = method.getFullMethodName().substring(serviceName.length() + 1);
return new GrpcMethod(serviceName, methodName, method.getType());
}
String serviceName() {
return serviceName;
}
String methodName() {
return methodName;
}
String type() {
return type.toString();
}
boolean streamsRequests() {
return type == MethodType.CLIENT_STREAMING || type == MethodType.BIDI_STREAMING;
}
boolean streamsResponses() {
return type == MethodType.SERVER_STREAMING || type == MethodType.BIDI_STREAMING;
}
}

View file

@ -0,0 +1,53 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.prometheus.client.SimpleCollector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Labels {
/** Merges two string lists into an array, maintaining order of first list then second list. */
static String[] asArray(List<String> firstList, List<String> secondList) {
List<String> list = new ArrayList<>(firstList);
list.addAll(secondList);
return list.toArray(new String[0]);
}
/** Converts a list of strings to a list of grpc metadata keys. */
static List<Key<String>> metadataKeys(List<String> headerNames) {
List<Key<String>> keys = new ArrayList<>();
for (String name : headerNames) {
keys.add(Key.of(name, Metadata.ASCII_STRING_MARSHALLER));
}
return Collections.unmodifiableList(keys);
}
/**
* Returns the ordered list of custom label values, by looking into metadata for values of
* selected custom headers.
*/
static List<String> customLabels(Metadata metadata, List<Key<String>> labelHeaderKeys) {
List<String> labels = new ArrayList<>();
for (Key<String> key : labelHeaderKeys) {
if (metadata.containsKey(key)) {
labels.add(metadata.get(key));
} else {
labels.add("");
}
}
return Collections.unmodifiableList(labels);
}
/** Adds standard labels, as well as custom ones, in order, to a given collector. */
static <T> T addLabels(SimpleCollector<T> collector, List<String> labels, GrpcMethod method) {
List<String> allLabels = new ArrayList<>();
allLabels.add(method.type());
allLabels.add(method.serviceName());
allLabels.add(method.methodName());
allLabels.addAll(labels);
return collector.labels(allLabels.toArray(new String[0]));
}
}

View file

@ -0,0 +1,47 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import java.time.Clock;
public class MonitoringClientCall<R, S> extends ForwardingClientCall.SimpleForwardingClientCall<R, S> {
private final ClientMetrics clientMetrics;
private final GrpcMethod grpcMethod;
private final Configuration configuration;
private final Clock clock;
private Metadata requestMetadata;
MonitoringClientCall(
ClientCall<R, S> delegate,
ClientMetrics clientMetrics,
GrpcMethod grpcMethod,
Configuration configuration,
Clock clock) {
super(delegate);
this.clientMetrics = clientMetrics;
this.grpcMethod = grpcMethod;
this.configuration = configuration;
this.clock = clock;
}
@Override
public void start(Listener<S> delegate, Metadata metadata) {
this.requestMetadata = metadata;
clientMetrics.recordCallStarted(metadata);
super.start(
new MonitoringClientCallListener<>(
delegate, clientMetrics, grpcMethod, configuration, clock, metadata),
metadata);
}
@Override
public void sendMessage(R requestMessage) {
if (grpcMethod.streamsRequests()) {
clientMetrics.recordStreamMessageSent(
requestMetadata == null ? new Metadata() : requestMetadata);
}
super.sendMessage(requestMessage);
}
}

View file

@ -0,0 +1,61 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.Status;
import java.time.Clock;
import java.time.Instant;
public class MonitoringClientCallListener<S> extends ForwardingClientCallListener<S> {
private static final long MILLIS_PER_SECOND = 1000L;
private final ClientCall.Listener<S> delegate;
private final ClientMetrics clientMetrics;
private final GrpcMethod grpcMethod;
private final Configuration configuration;
private final Clock clock;
private final Instant startInstant;
private final Metadata requestMetadata;
MonitoringClientCallListener(
ClientCall.Listener<S> delegate,
ClientMetrics clientMetrics,
GrpcMethod grpcMethod,
Configuration configuration,
Clock clock,
Metadata requestMetadata) {
this.delegate = delegate;
this.clientMetrics = clientMetrics;
this.grpcMethod = grpcMethod;
this.configuration = configuration;
this.clock = clock;
this.startInstant = clock.instant();
this.requestMetadata = requestMetadata;
}
@Override
protected ClientCall.Listener<S> delegate() {
return delegate;
}
@Override
public void onClose(Status status, Metadata metadata) {
clientMetrics.recordClientHandled(status.getCode(), requestMetadata);
if (configuration.isIncludeLatencyHistograms()) {
double latencySec =
(clock.millis() - startInstant.toEpochMilli()) / (double) MILLIS_PER_SECOND;
clientMetrics.recordLatency(latencySec, requestMetadata);
}
super.onClose(status, metadata);
}
@Override
public void onMessage(S responseMessage) {
if (grpcMethod.streamsResponses()) {
clientMetrics.recordStreamMessageReceived(requestMetadata);
}
super.onMessage(responseMessage);
}
}

View file

@ -0,0 +1,35 @@
package info.frostfs.sdk.services.impl.interceptor;
import java.time.Clock;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
public class MonitoringClientInterceptor implements ClientInterceptor {
private final Clock clock;
private final Configuration configuration;
private final ClientMetrics.Factory clientMetricsFactory;
private MonitoringClientInterceptor(
Clock clock, Configuration configuration, ClientMetrics.Factory clientMetricsFactory) {
this.clock = clock;
this.configuration = configuration;
this.clientMetricsFactory = clientMetricsFactory;
}
public static MonitoringClientInterceptor create(Configuration configuration) {
return new MonitoringClientInterceptor(
Clock.systemDefaultZone(), configuration, new ClientMetrics.Factory(configuration));
}
@Override
public <R, S> ClientCall<R, S> interceptCall(
MethodDescriptor<R, S> methodDescriptor, CallOptions callOptions, Channel channel) {
GrpcMethod grpcMethod = GrpcMethod.of(methodDescriptor);
ClientMetrics metrics = clientMetricsFactory.createMetricsForMethod(grpcMethod);
return new MonitoringClientCall<>(
channel.newCall(methodDescriptor, callOptions), metrics, grpcMethod, configuration, clock);
}
}

View file

@ -58,8 +58,7 @@ public class Base58 {
byte[] checksum = getSha256(getSha256(data)); byte[] checksum = getSha256(getSha256(data));
var buffer = concat(data, Arrays.copyOfRange(checksum, 0, 4)); var buffer = concat(data, Arrays.copyOfRange(checksum, 0, 4));
var ret = encode(buffer); return encode(buffer);
return ret;
} }
public static String encode(byte[] input) { public static String encode(byte[] input) {