[#16] Add GRPC metrics interceptor #17

Merged
pogpp merged 1 commit from pogpp/frostfs-sdk-java:feature/16_grpc_interceptors into master 2024-10-02 08:44:12 +00:00
10 changed files with 587 additions and 3 deletions

View file

@ -38,6 +38,21 @@
<artifactId>commons-codec</artifactId> <artifactId>commons-codec</artifactId>
<version>1.17.0</version> <version>1.17.0</version>
</dependency> </dependency>
<!-- Prometheus instrumentation -->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.16.0</version>
orikik marked this conversation as resolved Outdated

Why was this particular version of the library chosen? There are also more recent ones.

Why was this particular version of the library chosen? There are also more recent ones.
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
<version>0.16.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_common</artifactId>
<version>0.16.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View file

@ -18,8 +18,11 @@ import info.frostfs.sdk.jdo.NetworkSettings;
import info.frostfs.sdk.jdo.PutObjectParameters; import info.frostfs.sdk.jdo.PutObjectParameters;
import info.frostfs.sdk.services.*; import info.frostfs.sdk.services.*;
import info.frostfs.sdk.services.impl.*; import info.frostfs.sdk.services.impl.*;
import info.frostfs.sdk.services.impl.interceptor.Configuration;
import info.frostfs.sdk.services.impl.interceptor.MonitoringClientInterceptor;
import info.frostfs.sdk.utils.Validator; import info.frostfs.sdk.utils.Validator;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import java.util.List; import java.util.List;
@ -40,6 +43,9 @@ public class FrostFSClient implements ContainerClient, ObjectClient, NetmapClien
? clientSettings.getChannel() ? clientSettings.getChannel()
: initGrpcChannel(clientSettings); : initGrpcChannel(clientSettings);
MonitoringClientInterceptor monitoringClientInterceptor = MonitoringClientInterceptor
.create(Configuration.allMetrics());
channel = ClientInterceptors.intercept(channel, monitoringClientInterceptor);
ClientEnvironment clientEnvironment = ClientEnvironment clientEnvironment =
new ClientEnvironment(clientSettings.getKey(), channel, new Version(), this); new ClientEnvironment(clientSettings.getKey(), channel, new Version(), this);

View file

@ -0,0 +1,170 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.Metadata;
import io.grpc.Status;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import static info.frostfs.sdk.services.impl.interceptor.Labels.*;
public class ClientMetrics {
private static final List<String> defaultRequestLabels =
Arrays.asList("grpc_type", "grpc_service", "grpc_method");
private static final List<String> defaultResponseLabels =
Arrays.asList("grpc_type", "grpc_service", "grpc_method", "code", "grpc_code");
private static final Counter.Builder rpcStartedBuilder =
Counter.build()
.namespace("grpc")
.subsystem("client")
.name("started")
.help("Total number of RPCs started on the client.");
private static final Counter.Builder rpcCompletedBuilder =
Counter.build()
.namespace("grpc")
.subsystem("client")
.name("completed")
.help("Total number of RPCs completed on the client, regardless of success or failure.");
private static final Histogram.Builder completedLatencySecondsBuilder =
Histogram.build()
.namespace("grpc")
.subsystem("client")
.name("completed_latency_seconds")
.help("Histogram of rpc response latency (in seconds) for completed rpcs.");
private static final Counter.Builder streamMessagesReceivedBuilder =
Counter.build()
.namespace("grpc")
.subsystem("client")
.name("msg_received")
.help("Total number of stream messages received from the server.");
private static final Counter.Builder streamMessagesSentBuilder =
Counter.build()
.namespace("grpc")
.subsystem("client")
.name("msg_sent")
.help("Total number of stream messages sent by the client.");
private final List<Metadata.Key<String>> labelHeaderKeys;
private final Counter rpcStarted;
private final Counter rpcCompleted;
private final Counter streamMessagesReceived;
private final Counter streamMessagesSent;
private final Optional<Histogram> completedLatencySeconds;
private final GrpcMethod method;
private ClientMetrics(
List<Metadata.Key<String>> labelHeaderKeys,
GrpcMethod method,
Counter rpcStarted,
Counter rpcCompleted,
Counter streamMessagesReceived,
Counter streamMessagesSent,
Optional<Histogram> completedLatencySeconds) {
this.labelHeaderKeys = labelHeaderKeys;
this.method = method;
this.rpcStarted = rpcStarted;
this.rpcCompleted = rpcCompleted;
this.streamMessagesReceived = streamMessagesReceived;
this.streamMessagesSent = streamMessagesSent;
this.completedLatencySeconds = completedLatencySeconds;
}
public void recordCallStarted(Metadata metadata) {
addLabels(rpcStarted, customLabels(metadata, labelHeaderKeys), method).inc();
}
public void recordClientHandled(Status.Code code, Metadata metadata) {
List<String> allLabels = new ArrayList<>();
allLabels.add(code.toString());
allLabels.add(code.toString());
allLabels.addAll(customLabels(metadata, labelHeaderKeys));
addLabels(rpcCompleted, allLabels, method).inc();
}
public void recordStreamMessageSent(Metadata metadata) {
addLabels(streamMessagesSent, customLabels(metadata, labelHeaderKeys), method).inc();
}
public void recordStreamMessageReceived(Metadata metadata) {
addLabels(streamMessagesReceived, customLabels(metadata, labelHeaderKeys), method).inc();
}
/**
* Only has any effect if monitoring is configured to include latency histograms. Otherwise, this
* does nothing.
*/
public void recordLatency(double latencySec, Metadata metadata) {
if (completedLatencySeconds.isEmpty()) {
return;
}
addLabels(completedLatencySeconds.get(), customLabels(metadata, labelHeaderKeys), method)
.observe(latencySec);
}
/** Knows how to produce {@link ClientMetrics} instances for individual methods. */
static class Factory {
private final List<Metadata.Key<String>> labelHeaderKeys;
private final Counter rpcStarted;
private final Counter rpcCompleted;
private final Counter streamMessagesReceived;
private final Counter streamMessagesSent;
private final Optional<Histogram> completedLatencySeconds;
Factory(Configuration configuration) {
CollectorRegistry registry = configuration.getCollectorRegistry();
this.labelHeaderKeys = metadataKeys(configuration.getLabelHeaders());
this.rpcStarted =
rpcStartedBuilder
.labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders()))
.register(registry);
this.rpcCompleted =
rpcCompletedBuilder
.labelNames(asArray(defaultResponseLabels, configuration.getSanitizedLabelHeaders()))
.register(registry);
this.streamMessagesReceived =
streamMessagesReceivedBuilder
.labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders()))
.register(registry);
this.streamMessagesSent =
streamMessagesSentBuilder
.labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders()))
.register(registry);
if (configuration.isIncludeLatencyHistograms()) {
this.completedLatencySeconds =
Optional.of(
ClientMetrics.completedLatencySecondsBuilder
.buckets(configuration.getLatencyBuckets())
.labelNames(
asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders()))
.register(registry));
} else {
this.completedLatencySeconds = Optional.empty();
}
}
/** Creates a {@link ClientMetrics} for the supplied gRPC method. */
ClientMetrics createMetricsForMethod(GrpcMethod grpcMethod) {
return new ClientMetrics(
labelHeaderKeys,
grpcMethod,
rpcStarted,
rpcCompleted,
streamMessagesReceived,
streamMessagesSent,
completedLatencySeconds);
}
}
}

View file

@ -0,0 +1,153 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.prometheus.client.CollectorRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class Configuration {
private static final double[] DEFAULT_LATENCY_BUCKETS =
new double[] {.001, .005, .01, .05, 0.075, .1, .25, .5, 1, 2, 5, 10};
private final boolean isIncludeLatencyHistograms;
private final CollectorRegistry collectorRegistry;
private final double[] latencyBuckets;
private final List<String> labelHeaders;
private final boolean isAddCodeLabelToHistograms;
private Configuration(
boolean isIncludeLatencyHistograms,
CollectorRegistry collectorRegistry,
double[] latencyBuckets,
List<String> labelHeaders,
boolean isAddCodeLabelToHistograms) {
this.isIncludeLatencyHistograms = isIncludeLatencyHistograms;
this.collectorRegistry = collectorRegistry;
this.latencyBuckets = latencyBuckets;
this.labelHeaders = labelHeaders;
this.isAddCodeLabelToHistograms = isAddCodeLabelToHistograms;
}
/** Returns a {@link Configuration} for recording all cheap metrics about the rpcs. */
public static Configuration cheapMetricsOnly() {
return new Configuration(
false /* isIncludeLatencyHistograms */,
CollectorRegistry.defaultRegistry,
DEFAULT_LATENCY_BUCKETS,
new ArrayList<>(),
false /* isAddCodeLabelToHistograms */);
}
/**
* Returns a {@link Configuration} for recording all metrics about the rpcs. This includes metrics
* which might produce a lot of data, such as latency histograms.
*/
public static Configuration allMetrics() {
return new Configuration(
true /* isIncludeLatencyHistograms */,
CollectorRegistry.defaultRegistry,
DEFAULT_LATENCY_BUCKETS,
new ArrayList<>(),
false);
}
/**
* Returns a copy {@link Configuration} with the difference that Prometheus metrics are recorded
* using the supplied {@link CollectorRegistry}.
*/
public Configuration withCollectorRegistry(CollectorRegistry collectorRegistry) {
return new Configuration(
isIncludeLatencyHistograms,
collectorRegistry,
latencyBuckets,
labelHeaders,
isAddCodeLabelToHistograms);
}
/**
* Returns a copy {@link Configuration} with the difference that the latency histogram values are
* recorded with the specified set of buckets.
*/
public Configuration withLatencyBuckets(double[] buckets) {
return new Configuration(
isIncludeLatencyHistograms,
collectorRegistry,
buckets,
labelHeaders,
isAddCodeLabelToHistograms);
}
/**
* Returns a copy {@link Configuration} that recognizes the given list of header names and uses
* their value from each request as prometheus labels.
*
* <p>Since hyphens is a common character in header names, and since Prometheus does not allow
* hyphens in label names, All hyphens in the list of header names will be converted to
* underscores before being added as metric label names.
*
* <p>If one of the headers added here is absent in one of the requests, its metric value for that
* request will be an empty string.
*
* <p>Example: {@code withLabelHeaders(Arrays.asList("User-Agent"))} will make all metrics carry a
* label "User_Agent", with label value filled in from the value of the "User-Agent" header of
* each request.
*/
public Configuration withLabelHeaders(List<String> headers) {
List<String> newHeaders = new ArrayList<>(labelHeaders);
newHeaders.addAll(headers);
return new Configuration(
isIncludeLatencyHistograms,
collectorRegistry,
latencyBuckets,
newHeaders,
isAddCodeLabelToHistograms);
}
/**
* Returns a copy {@link Configuration} with the difference that status code label will be added
* to latency histogram. If latency histogram itself is disabled, this takes no effect. Warning:
* this will increase the number of histograms by a factor of actually happened codes (up to
* {@link io.grpc.Status.Code} values count), which could lead to additional local memory usage
* and load on prometheus (storage and memory usage, query-time complexity)
*/
public Configuration withCodeLabelInLatencyHistogram() {
return new Configuration(
isIncludeLatencyHistograms,
collectorRegistry,
latencyBuckets,
labelHeaders,
true /* isAddCodeLabelToHistograms */);
}
/** Returns whether or not latency histograms for calls should be included. */
public boolean isIncludeLatencyHistograms() {
return isIncludeLatencyHistograms;
}
/** Returns the {@link CollectorRegistry} used to record stats. */
public CollectorRegistry getCollectorRegistry() {
return collectorRegistry;
}
/** Returns the histogram buckets to use for latency metrics. */
public double[] getLatencyBuckets() {
return latencyBuckets;
}
/** Returns the configured list of headers to be used as labels. */
public List<String> getLabelHeaders() {
return labelHeaders;
}
/** Returns whether or not status code label should be added to latency histogram. */
public boolean isAddCodeLabelToHistograms() {
return isAddCodeLabelToHistograms;
}
/**
* Returns the sanitized version of the label headers, after turning all hyphens to underscores.
*/
public List<String> getSanitizedLabelHeaders() {
return labelHeaders.stream().map(h -> h.replaceAll("-", "_")).collect(Collectors.toList());
}
}

View file

@ -0,0 +1,45 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
public class GrpcMethod {
private final String serviceName;
private final String methodName;
private final MethodType type;
private GrpcMethod(String serviceName, String methodName, MethodType type) {
this.serviceName = serviceName;
this.methodName = methodName;
this.type = type;
}
static GrpcMethod of(MethodDescriptor<?, ?> method) {
String serviceName = MethodDescriptor.extractFullServiceName(method.getFullMethodName());
// Full method names are of the form: "full.serviceName/MethodName". We extract the last part.
String methodName = method.getFullMethodName().substring(serviceName.length() + 1);
return new GrpcMethod(serviceName, methodName, method.getType());
}
String serviceName() {
return serviceName;
}
String methodName() {
return methodName;
}
String type() {
return type.toString();
}
boolean streamsRequests() {
return type == MethodType.CLIENT_STREAMING || type == MethodType.BIDI_STREAMING;
}
boolean streamsResponses() {
return type == MethodType.SERVER_STREAMING || type == MethodType.BIDI_STREAMING;
}
}

View file

@ -0,0 +1,53 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.prometheus.client.SimpleCollector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Labels {
/** Merges two string lists into an array, maintaining order of first list then second list. */
static String[] asArray(List<String> firstList, List<String> secondList) {
List<String> list = new ArrayList<>(firstList);
list.addAll(secondList);
return list.toArray(new String[0]);
}
/** Converts a list of strings to a list of grpc metadata keys. */
static List<Key<String>> metadataKeys(List<String> headerNames) {
List<Key<String>> keys = new ArrayList<>();
for (String name : headerNames) {
keys.add(Key.of(name, Metadata.ASCII_STRING_MARSHALLER));
}
return Collections.unmodifiableList(keys);
}
/**
* Returns the ordered list of custom label values, by looking into metadata for values of
* selected custom headers.
*/
static List<String> customLabels(Metadata metadata, List<Key<String>> labelHeaderKeys) {
List<String> labels = new ArrayList<>();
for (Key<String> key : labelHeaderKeys) {
if (metadata.containsKey(key)) {
labels.add(metadata.get(key));
} else {
labels.add("");
}
}
return Collections.unmodifiableList(labels);
}
/** Adds standard labels, as well as custom ones, in order, to a given collector. */
static <T> T addLabels(SimpleCollector<T> collector, List<String> labels, GrpcMethod method) {
List<String> allLabels = new ArrayList<>();
allLabels.add(method.type());
allLabels.add(method.serviceName());
allLabels.add(method.methodName());
allLabels.addAll(labels);
return collector.labels(allLabels.toArray(new String[0]));
}
}

View file

@ -0,0 +1,47 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import java.time.Clock;
public class MonitoringClientCall<R, S> extends ForwardingClientCall.SimpleForwardingClientCall<R, S> {
private final ClientMetrics clientMetrics;
private final GrpcMethod grpcMethod;
private final Configuration configuration;
private final Clock clock;
private Metadata requestMetadata;
MonitoringClientCall(
ClientCall<R, S> delegate,
ClientMetrics clientMetrics,
GrpcMethod grpcMethod,
Configuration configuration,
Clock clock) {
super(delegate);
this.clientMetrics = clientMetrics;
this.grpcMethod = grpcMethod;
this.configuration = configuration;
this.clock = clock;
}
@Override
public void start(Listener<S> delegate, Metadata metadata) {
this.requestMetadata = metadata;
clientMetrics.recordCallStarted(metadata);
super.start(
new MonitoringClientCallListener<>(
delegate, clientMetrics, grpcMethod, configuration, clock, metadata),
metadata);
}
@Override
public void sendMessage(R requestMessage) {
if (grpcMethod.streamsRequests()) {
clientMetrics.recordStreamMessageSent(
requestMetadata == null ? new Metadata() : requestMetadata);
}
super.sendMessage(requestMessage);
}
}

View file

@ -0,0 +1,61 @@
package info.frostfs.sdk.services.impl.interceptor;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.Status;
import java.time.Clock;
import java.time.Instant;
public class MonitoringClientCallListener<S> extends ForwardingClientCallListener<S> {
private static final long MILLIS_PER_SECOND = 1000L;
private final ClientCall.Listener<S> delegate;
private final ClientMetrics clientMetrics;
private final GrpcMethod grpcMethod;
private final Configuration configuration;
private final Clock clock;
private final Instant startInstant;
private final Metadata requestMetadata;
MonitoringClientCallListener(
ClientCall.Listener<S> delegate,
ClientMetrics clientMetrics,
GrpcMethod grpcMethod,
Configuration configuration,
Clock clock,
Metadata requestMetadata) {
this.delegate = delegate;
this.clientMetrics = clientMetrics;
this.grpcMethod = grpcMethod;
this.configuration = configuration;
this.clock = clock;
this.startInstant = clock.instant();
this.requestMetadata = requestMetadata;
}
@Override
protected ClientCall.Listener<S> delegate() {
return delegate;
}
@Override
public void onClose(Status status, Metadata metadata) {
clientMetrics.recordClientHandled(status.getCode(), requestMetadata);
if (configuration.isIncludeLatencyHistograms()) {
double latencySec =
(clock.millis() - startInstant.toEpochMilli()) / (double) MILLIS_PER_SECOND;
clientMetrics.recordLatency(latencySec, requestMetadata);
}
super.onClose(status, metadata);
}
@Override
public void onMessage(S responseMessage) {
if (grpcMethod.streamsResponses()) {
clientMetrics.recordStreamMessageReceived(requestMetadata);
}
super.onMessage(responseMessage);
}
}

View file

@ -0,0 +1,35 @@
package info.frostfs.sdk.services.impl.interceptor;
import java.time.Clock;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
public class MonitoringClientInterceptor implements ClientInterceptor {
private final Clock clock;
private final Configuration configuration;
private final ClientMetrics.Factory clientMetricsFactory;
private MonitoringClientInterceptor(
Clock clock, Configuration configuration, ClientMetrics.Factory clientMetricsFactory) {
this.clock = clock;
this.configuration = configuration;
this.clientMetricsFactory = clientMetricsFactory;
}
public static MonitoringClientInterceptor create(Configuration configuration) {
return new MonitoringClientInterceptor(
Clock.systemDefaultZone(), configuration, new ClientMetrics.Factory(configuration));
}
@Override
public <R, S> ClientCall<R, S> interceptCall(
MethodDescriptor<R, S> methodDescriptor, CallOptions callOptions, Channel channel) {
GrpcMethod grpcMethod = GrpcMethod.of(methodDescriptor);
ClientMetrics metrics = clientMetricsFactory.createMetricsForMethod(grpcMethod);
return new MonitoringClientCall<>(
channel.newCall(methodDescriptor, callOptions), metrics, grpcMethod, configuration, clock);
}
}

View file

@ -58,8 +58,7 @@ public class Base58 {
byte[] checksum = getSha256(getSha256(data)); byte[] checksum = getSha256(getSha256(data));
var buffer = concat(data, Arrays.copyOfRange(checksum, 0, 4)); var buffer = concat(data, Arrays.copyOfRange(checksum, 0, 4));
var ret = encode(buffer); return encode(buffer);
return ret;
} }
public static String encode(byte[] input) { public static String encode(byte[] input) {