diff --git a/client/pom.xml b/client/pom.xml index c920473..660ee62 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -38,6 +38,22 @@ commons-codec 1.17.0 + + + io.prometheus + simpleclient + 0.0.16 + + + io.prometheus + simpleclient_hotspot + 0.0.16 + + + io.prometheus + simpleclient_common + 0.0.16 + \ No newline at end of file diff --git a/client/src/main/java/info/frostfs/sdk/FrostFSClient.java b/client/src/main/java/info/frostfs/sdk/FrostFSClient.java index 76d946f..3b4ecb5 100644 --- a/client/src/main/java/info/frostfs/sdk/FrostFSClient.java +++ b/client/src/main/java/info/frostfs/sdk/FrostFSClient.java @@ -18,8 +18,11 @@ import info.frostfs.sdk.jdo.NetworkSettings; import info.frostfs.sdk.jdo.PutObjectParameters; import info.frostfs.sdk.services.*; import info.frostfs.sdk.services.impl.*; +import info.frostfs.sdk.services.impl.interceptor.Configuration; +import info.frostfs.sdk.services.impl.interceptor.MonitoringClientInterceptor; import info.frostfs.sdk.utils.Validator; import io.grpc.Channel; +import io.grpc.ClientInterceptors; import java.util.List; @@ -40,6 +43,9 @@ public class FrostFSClient implements ContainerClient, ObjectClient, NetmapClien ? clientSettings.getChannel() : initGrpcChannel(clientSettings); + MonitoringClientInterceptor monitoringClientInterceptor = MonitoringClientInterceptor + .create(Configuration.allMetrics()); + channel = ClientInterceptors.intercept(channel, monitoringClientInterceptor); ClientEnvironment clientEnvironment = new ClientEnvironment(clientSettings.getKey(), channel, new Version(), this); diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/ClientMetrics.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/ClientMetrics.java new file mode 100644 index 0000000..149c145 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/ClientMetrics.java @@ -0,0 +1,170 @@ +package info.frostfs.sdk.services.impl.interceptor; + +import io.grpc.Metadata; +import io.grpc.Status; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static info.frostfs.sdk.services.impl.interceptor.Labels.*; + +public class ClientMetrics { + private static final List defaultRequestLabels = + Arrays.asList("grpc_type", "grpc_service", "grpc_method"); + + private static final List defaultResponseLabels = + Arrays.asList("grpc_type", "grpc_service", "grpc_method", "code", "grpc_code"); + + private static final Counter.Builder rpcStartedBuilder = + Counter.build() + .namespace("grpc") + .subsystem("client") + .name("started") + .help("Total number of RPCs started on the client."); + + private static final Counter.Builder rpcCompletedBuilder = + Counter.build() + .namespace("grpc") + .subsystem("client") + .name("completed") + .help("Total number of RPCs completed on the client, regardless of success or failure."); + + private static final Histogram.Builder completedLatencySecondsBuilder = + Histogram.build() + .namespace("grpc") + .subsystem("client") + .name("completed_latency_seconds") + .help("Histogram of rpc response latency (in seconds) for completed rpcs."); + + private static final Counter.Builder streamMessagesReceivedBuilder = + Counter.build() + .namespace("grpc") + .subsystem("client") + .name("msg_received") + .help("Total number of stream messages received from the server."); + + private static final Counter.Builder streamMessagesSentBuilder = + Counter.build() + .namespace("grpc") + .subsystem("client") + .name("msg_sent") + .help("Total number of stream messages sent by the client."); + + private final List> labelHeaderKeys; + private final Counter rpcStarted; + private final Counter rpcCompleted; + private final Counter streamMessagesReceived; + private final Counter streamMessagesSent; + private final Optional completedLatencySeconds; + + private final GrpcMethod method; + + private ClientMetrics( + List> labelHeaderKeys, + GrpcMethod method, + Counter rpcStarted, + Counter rpcCompleted, + Counter streamMessagesReceived, + Counter streamMessagesSent, + Optional completedLatencySeconds) { + this.labelHeaderKeys = labelHeaderKeys; + this.method = method; + this.rpcStarted = rpcStarted; + this.rpcCompleted = rpcCompleted; + this.streamMessagesReceived = streamMessagesReceived; + this.streamMessagesSent = streamMessagesSent; + this.completedLatencySeconds = completedLatencySeconds; + } + + public void recordCallStarted(Metadata metadata) { + addLabels(rpcStarted, customLabels(metadata, labelHeaderKeys), method).inc(); + } + + public void recordClientHandled(Status.Code code, Metadata metadata) { + List allLabels = new ArrayList<>(); + allLabels.add(code.toString()); + allLabels.add(code.toString()); + allLabels.addAll(customLabels(metadata, labelHeaderKeys)); + addLabels(rpcCompleted, allLabels, method).inc(); + } + + public void recordStreamMessageSent(Metadata metadata) { + addLabels(streamMessagesSent, customLabels(metadata, labelHeaderKeys), method).inc(); + } + + public void recordStreamMessageReceived(Metadata metadata) { + addLabels(streamMessagesReceived, customLabels(metadata, labelHeaderKeys), method).inc(); + } + + /** + * Only has any effect if monitoring is configured to include latency histograms. Otherwise, this + * does nothing. + */ + public void recordLatency(double latencySec, Metadata metadata) { + if (completedLatencySeconds.isEmpty()) { + return; + } + addLabels(completedLatencySeconds.get(), customLabels(metadata, labelHeaderKeys), method) + .observe(latencySec); + } + + /** Knows how to produce {@link ClientMetrics} instances for individual methods. */ + static class Factory { + private final List> labelHeaderKeys; + private final Counter rpcStarted; + private final Counter rpcCompleted; + private final Counter streamMessagesReceived; + private final Counter streamMessagesSent; + private final Optional completedLatencySeconds; + + Factory(Configuration configuration) { + CollectorRegistry registry = configuration.getCollectorRegistry(); + this.labelHeaderKeys = metadataKeys(configuration.getLabelHeaders()); + this.rpcStarted = + rpcStartedBuilder + .labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders())) + .register(registry); + this.rpcCompleted = + rpcCompletedBuilder + .labelNames(asArray(defaultResponseLabels, configuration.getSanitizedLabelHeaders())) + .register(registry); + this.streamMessagesReceived = + streamMessagesReceivedBuilder + .labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders())) + .register(registry); + this.streamMessagesSent = + streamMessagesSentBuilder + .labelNames(asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders())) + .register(registry); + + if (configuration.isIncludeLatencyHistograms()) { + this.completedLatencySeconds = + Optional.of( + ClientMetrics.completedLatencySecondsBuilder + .buckets(configuration.getLatencyBuckets()) + .labelNames( + asArray(defaultRequestLabels, configuration.getSanitizedLabelHeaders())) + .register(registry)); + } else { + this.completedLatencySeconds = Optional.empty(); + } + } + + /** Creates a {@link ClientMetrics} for the supplied gRPC method. */ + ClientMetrics createMetricsForMethod(GrpcMethod grpcMethod) { + return new ClientMetrics( + labelHeaderKeys, + grpcMethod, + rpcStarted, + rpcCompleted, + streamMessagesReceived, + streamMessagesSent, + completedLatencySeconds); + } + } +} diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Configuration.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Configuration.java new file mode 100644 index 0000000..d4adcca --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Configuration.java @@ -0,0 +1,153 @@ +package info.frostfs.sdk.services.impl.interceptor; + + +import io.prometheus.client.CollectorRegistry; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +public class Configuration { + private static final double[] DEFAULT_LATENCY_BUCKETS = + new double[] {.001, .005, .01, .05, 0.075, .1, .25, .5, 1, 2, 5, 10}; + + private final boolean isIncludeLatencyHistograms; + private final CollectorRegistry collectorRegistry; + private final double[] latencyBuckets; + private final List labelHeaders; + private final boolean isAddCodeLabelToHistograms; + private Configuration( + boolean isIncludeLatencyHistograms, + CollectorRegistry collectorRegistry, + double[] latencyBuckets, + List labelHeaders, + boolean isAddCodeLabelToHistograms) { + this.isIncludeLatencyHistograms = isIncludeLatencyHistograms; + this.collectorRegistry = collectorRegistry; + this.latencyBuckets = latencyBuckets; + this.labelHeaders = labelHeaders; + this.isAddCodeLabelToHistograms = isAddCodeLabelToHistograms; + } + + + /** Returns a {@link Configuration} for recording all cheap metrics about the rpcs. */ + public static Configuration cheapMetricsOnly() { + return new Configuration( + false /* isIncludeLatencyHistograms */, + CollectorRegistry.defaultRegistry, + DEFAULT_LATENCY_BUCKETS, + new ArrayList<>(), + false /* isAddCodeLabelToHistograms */); + } + + /** + * Returns a {@link Configuration} for recording all metrics about the rpcs. This includes metrics + * which might produce a lot of data, such as latency histograms. + */ + public static Configuration allMetrics() { + return new Configuration( + true /* isIncludeLatencyHistograms */, + CollectorRegistry.defaultRegistry, + DEFAULT_LATENCY_BUCKETS, + new ArrayList<>(), + false); + } + + /** + * Returns a copy {@link Configuration} with the difference that Prometheus metrics are recorded + * using the supplied {@link CollectorRegistry}. + */ + public Configuration withCollectorRegistry(CollectorRegistry collectorRegistry) { + return new Configuration( + isIncludeLatencyHistograms, + collectorRegistry, + latencyBuckets, + labelHeaders, + isAddCodeLabelToHistograms); + } + + /** + * Returns a copy {@link Configuration} with the difference that the latency histogram values are + * recorded with the specified set of buckets. + */ + public Configuration withLatencyBuckets(double[] buckets) { + return new Configuration( + isIncludeLatencyHistograms, + collectorRegistry, + buckets, + labelHeaders, + isAddCodeLabelToHistograms); + } + + /** + * Returns a copy {@link Configuration} that recognizes the given list of header names and uses + * their value from each request as prometheus labels. + * + *

Since hyphens is a common character in header names, and since Prometheus does not allow + * hyphens in label names, All hyphens in the list of header names will be converted to + * underscores before being added as metric label names. + * + *

If one of the headers added here is absent in one of the requests, its metric value for that + * request will be an empty string. + * + *

Example: {@code withLabelHeaders(Arrays.asList("User-Agent"))} will make all metrics carry a + * label "User_Agent", with label value filled in from the value of the "User-Agent" header of + * each request. + */ + public Configuration withLabelHeaders(List headers) { + List newHeaders = new ArrayList<>(labelHeaders); + newHeaders.addAll(headers); + return new Configuration( + isIncludeLatencyHistograms, + collectorRegistry, + latencyBuckets, + newHeaders, + isAddCodeLabelToHistograms); + } + + /** + * Returns a copy {@link Configuration} with the difference that status code label will be added + * to latency histogram. If latency histogram itself is disabled, this takes no effect. Warning: + * this will increase the number of histograms by a factor of actually happened codes (up to + * {@link io.grpc.Status.Code} values count), which could lead to additional local memory usage + * and load on prometheus (storage and memory usage, query-time complexity) + */ + public Configuration withCodeLabelInLatencyHistogram() { + return new Configuration( + isIncludeLatencyHistograms, + collectorRegistry, + latencyBuckets, + labelHeaders, + true /* isAddCodeLabelToHistograms */); + } + + /** Returns whether or not latency histograms for calls should be included. */ + public boolean isIncludeLatencyHistograms() { + return isIncludeLatencyHistograms; + } + + /** Returns the {@link CollectorRegistry} used to record stats. */ + public CollectorRegistry getCollectorRegistry() { + return collectorRegistry; + } + + /** Returns the histogram buckets to use for latency metrics. */ + public double[] getLatencyBuckets() { + return latencyBuckets; + } + + /** Returns the configured list of headers to be used as labels. */ + public List getLabelHeaders() { + return labelHeaders; + } + + /** Returns whether or not status code label should be added to latency histogram. */ + public boolean isAddCodeLabelToHistograms() { + return isAddCodeLabelToHistograms; + } + + /** + * Returns the sanitized version of the label headers, after turning all hyphens to underscores. + */ + public List getSanitizedLabelHeaders() { + return labelHeaders.stream().map(h -> h.replaceAll("-", "_")).collect(Collectors.toList()); + } +} diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/GrpcMethod.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/GrpcMethod.java new file mode 100644 index 0000000..8f9aa4c --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/GrpcMethod.java @@ -0,0 +1,45 @@ +package info.frostfs.sdk.services.impl.interceptor; + +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; + +public class GrpcMethod { + private final String serviceName; + private final String methodName; + private final MethodType type; + + private GrpcMethod(String serviceName, String methodName, MethodType type) { + this.serviceName = serviceName; + this.methodName = methodName; + this.type = type; + } + + static GrpcMethod of(MethodDescriptor method) { + String serviceName = MethodDescriptor.extractFullServiceName(method.getFullMethodName()); + + // Full method names are of the form: "full.serviceName/MethodName". We extract the last part. + String methodName = method.getFullMethodName().substring(serviceName.length() + 1); + return new GrpcMethod(serviceName, methodName, method.getType()); + } + + + String serviceName() { + return serviceName; + } + + String methodName() { + return methodName; + } + + String type() { + return type.toString(); + } + + boolean streamsRequests() { + return type == MethodType.CLIENT_STREAMING || type == MethodType.BIDI_STREAMING; + } + + boolean streamsResponses() { + return type == MethodType.SERVER_STREAMING || type == MethodType.BIDI_STREAMING; + } +} diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Labels.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Labels.java new file mode 100644 index 0000000..d026854 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Labels.java @@ -0,0 +1,53 @@ +package info.frostfs.sdk.services.impl.interceptor; + +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.prometheus.client.SimpleCollector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class Labels { + /** Merges two string lists into an array, maintaining order of first list then second list. */ + static String[] asArray(List firstList, List secondList) { + List list = new ArrayList<>(firstList); + list.addAll(secondList); + return list.toArray(new String[0]); + } + + /** Converts a list of strings to a list of grpc metadata keys. */ + static List> metadataKeys(List headerNames) { + List> keys = new ArrayList<>(); + for (String name : headerNames) { + keys.add(Key.of(name, Metadata.ASCII_STRING_MARSHALLER)); + } + return Collections.unmodifiableList(keys); + } + + /** + * Returns the ordered list of custom label values, by looking into metadata for values of + * selected custom headers. + */ + static List customLabels(Metadata metadata, List> labelHeaderKeys) { + List labels = new ArrayList<>(); + for (Key key : labelHeaderKeys) { + if (metadata.containsKey(key)) { + labels.add(metadata.get(key)); + } else { + labels.add(""); + } + } + return Collections.unmodifiableList(labels); + } + + /** Adds standard labels, as well as custom ones, in order, to a given collector. */ + static T addLabels(SimpleCollector collector, List labels, GrpcMethod method) { + List allLabels = new ArrayList<>(); + allLabels.add(method.type()); + allLabels.add(method.serviceName()); + allLabels.add(method.methodName()); + allLabels.addAll(labels); + return collector.labels(allLabels.toArray(new String[0])); + } +} diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientCall.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientCall.java new file mode 100644 index 0000000..c116ca7 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientCall.java @@ -0,0 +1,47 @@ +package info.frostfs.sdk.services.impl.interceptor; + +import io.grpc.ClientCall; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; + +import java.time.Clock; + +public class MonitoringClientCall extends ForwardingClientCall.SimpleForwardingClientCall { + private final ClientMetrics clientMetrics; + private final GrpcMethod grpcMethod; + private final Configuration configuration; + private final Clock clock; + private Metadata requestMetadata; + + MonitoringClientCall( + ClientCall delegate, + ClientMetrics clientMetrics, + GrpcMethod grpcMethod, + Configuration configuration, + Clock clock) { + super(delegate); + this.clientMetrics = clientMetrics; + this.grpcMethod = grpcMethod; + this.configuration = configuration; + this.clock = clock; + } + + @Override + public void start(Listener delegate, Metadata metadata) { + this.requestMetadata = metadata; + clientMetrics.recordCallStarted(metadata); + super.start( + new MonitoringClientCallListener<>( + delegate, clientMetrics, grpcMethod, configuration, clock, metadata), + metadata); + } + + @Override + public void sendMessage(R requestMessage) { + if (grpcMethod.streamsRequests()) { + clientMetrics.recordStreamMessageSent( + requestMetadata == null ? new Metadata() : requestMetadata); + } + super.sendMessage(requestMessage); + } +} diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientCallListener.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientCallListener.java new file mode 100644 index 0000000..e8c9caf --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientCallListener.java @@ -0,0 +1,61 @@ +package info.frostfs.sdk.services.impl.interceptor; + +import io.grpc.ClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.Status; + +import java.time.Clock; +import java.time.Instant; + +public class MonitoringClientCallListener extends ForwardingClientCallListener { + private static final long MILLIS_PER_SECOND = 1000L; + + private final ClientCall.Listener delegate; + private final ClientMetrics clientMetrics; + private final GrpcMethod grpcMethod; + private final Configuration configuration; + private final Clock clock; + private final Instant startInstant; + private final Metadata requestMetadata; + + MonitoringClientCallListener( + ClientCall.Listener delegate, + ClientMetrics clientMetrics, + GrpcMethod grpcMethod, + Configuration configuration, + Clock clock, + Metadata requestMetadata) { + this.delegate = delegate; + this.clientMetrics = clientMetrics; + this.grpcMethod = grpcMethod; + this.configuration = configuration; + this.clock = clock; + this.startInstant = clock.instant(); + this.requestMetadata = requestMetadata; + } + + @Override + protected ClientCall.Listener delegate() { + return delegate; + } + + @Override + public void onClose(Status status, Metadata metadata) { + clientMetrics.recordClientHandled(status.getCode(), requestMetadata); + if (configuration.isIncludeLatencyHistograms()) { + double latencySec = + (clock.millis() - startInstant.toEpochMilli()) / (double) MILLIS_PER_SECOND; + clientMetrics.recordLatency(latencySec, requestMetadata); + } + super.onClose(status, metadata); + } + + @Override + public void onMessage(S responseMessage) { + if (grpcMethod.streamsResponses()) { + clientMetrics.recordStreamMessageReceived(requestMetadata); + } + super.onMessage(responseMessage); + } +} diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientInterceptor.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientInterceptor.java new file mode 100644 index 0000000..ada4761 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientInterceptor.java @@ -0,0 +1,35 @@ +package info.frostfs.sdk.services.impl.interceptor; + +import java.time.Clock; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.MethodDescriptor; + + +public class MonitoringClientInterceptor implements ClientInterceptor { + private final Clock clock; + private final Configuration configuration; + private final ClientMetrics.Factory clientMetricsFactory; + + private MonitoringClientInterceptor( + Clock clock, Configuration configuration, ClientMetrics.Factory clientMetricsFactory) { + this.clock = clock; + this.configuration = configuration; + this.clientMetricsFactory = clientMetricsFactory; + } + public static MonitoringClientInterceptor create(Configuration configuration) { + return new MonitoringClientInterceptor( + Clock.systemDefaultZone(), configuration, new ClientMetrics.Factory(configuration)); + } + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + GrpcMethod grpcMethod = GrpcMethod.of(methodDescriptor); + ClientMetrics metrics = clientMetricsFactory.createMetricsForMethod(grpcMethod); + return new MonitoringClientCall<>( + channel.newCall(methodDescriptor, callOptions), metrics, grpcMethod, configuration, clock); + } +} diff --git a/cryptography/src/main/java/info/frostfs/sdk/Base58.java b/cryptography/src/main/java/info/frostfs/sdk/Base58.java index 477412b..d302763 100644 --- a/cryptography/src/main/java/info/frostfs/sdk/Base58.java +++ b/cryptography/src/main/java/info/frostfs/sdk/Base58.java @@ -58,8 +58,7 @@ public class Base58 { byte[] checksum = getSha256(getSha256(data)); var buffer = concat(data, Arrays.copyOfRange(checksum, 0, 4)); - var ret = encode(buffer); - return ret; + return encode(buffer); } public static String encode(byte[] input) {