From e9e94807018514e2e4fd175ece8844f1dfccde43 Mon Sep 17 00:00:00 2001 From: Ori Bruk Date: Thu, 16 Jan 2025 19:15:43 +0300 Subject: [PATCH] [#32] Provide a pool of clients to grpc Signed-off-by: Ori Bruk --- client/pom.xml | 13 +- .../java/info/frostfs/sdk/FrostFSClient.java | 60 +- .../info/frostfs/sdk/constants/PoolConst.java | 14 + .../info/frostfs/sdk/enums/HealthyStatus.java | 23 + .../info/frostfs/sdk/enums/MethodIndex.java | 29 + .../exceptions/ResponseFrostFSException.java | 2 +- .../frostfs/sdk/jdo/ClientEnvironment.java | 8 + .../info/frostfs/sdk/jdo/ClientSettings.java | 6 +- .../frostfs/sdk/jdo/pool/NodeParameters.java | 12 + .../sdk/jdo/pool/PoolInitParameters.java | 44 ++ .../info/frostfs/sdk/pool/ClientStatus.java | 27 + .../frostfs/sdk/pool/ClientStatusMonitor.java | 114 ++++ .../info/frostfs/sdk/pool/ClientWrapper.java | 130 +++++ .../java/info/frostfs/sdk/pool/InnerPool.java | 54 ++ .../info/frostfs/sdk/pool/MethodStatus.java | 31 ++ .../info/frostfs/sdk/pool/NodeStatistic.java | 13 + .../info/frostfs/sdk/pool/NodesParam.java | 19 + .../main/java/info/frostfs/sdk/pool/Pool.java | 520 ++++++++++++++++++ .../frostfs/sdk/pool/RebalanceParameters.java | 15 + .../info/frostfs/sdk/pool/RequestInfo.java | 15 + .../java/info/frostfs/sdk/pool/Sampler.java | 77 +++ .../info/frostfs/sdk/pool/SessionCache.java | 46 ++ .../java/info/frostfs/sdk/pool/Statistic.java | 15 + .../info/frostfs/sdk/pool/StatusSnapshot.java | 13 + .../java/info/frostfs/sdk/pool/WorkList.java | 22 + .../info/frostfs/sdk/pool/WrapperPrm.java | 26 + .../sdk/services/AccountingClient.java | 7 + .../frostfs/sdk/services/CommonClient.java | 5 + .../services/impl/AccountingClientImpl.java | 46 ++ .../sdk/services/impl/ObjectToolsImpl.java | 16 +- .../impl/interceptor/ClientMetrics.java | 12 +- .../services/impl/interceptor/GrpcMethod.java | 12 +- .../sdk/services/impl/interceptor/Labels.java | 1 + .../MonitoringClientInterceptor.java | 10 +- .../info/frostfs/sdk/tools/GrpcClient.java | 15 +- .../frostfs/sdk/utils/FrostFSMessages.java | 20 + .../sdk/services/AccountingClientTest.java | 102 ++++ .../testgenerator/AccountingGenerator.java | 42 ++ cryptography/pom.xml | 4 +- exceptions/pom.xml | 2 +- .../frostfs/sdk/constants/ErrorConst.java | 5 + .../sdk/exceptions/FrostFSException.java | 15 + .../exceptions/ProcessFrostFSException.java | 2 +- .../SessionExpiredFrostFSException.java | 8 + .../SessionNotFoundFrostFSException.java | 8 + .../exceptions/TimeoutFrostFSException.java | 2 +- .../ValidationFrostFSException.java | 2 +- models/pom.xml | 8 +- .../info/frostfs/sdk/constants/AppConst.java | 2 + .../frostfs/sdk/constants/AttributeConst.java | 10 + .../frostfs/sdk/constants/XHeaderConst.java | 7 +- .../frostfs/sdk/dto/container/Container.java | 3 + .../mappers/container/ContainerMapper.java | 30 +- .../container/ContainerMapperTest.java | 23 + pom.xml | 2 +- protos/pom.xml | 2 +- 56 files changed, 1712 insertions(+), 59 deletions(-) create mode 100644 client/src/main/java/info/frostfs/sdk/constants/PoolConst.java create mode 100644 client/src/main/java/info/frostfs/sdk/enums/HealthyStatus.java create mode 100644 client/src/main/java/info/frostfs/sdk/enums/MethodIndex.java create mode 100644 client/src/main/java/info/frostfs/sdk/jdo/pool/NodeParameters.java create mode 100644 client/src/main/java/info/frostfs/sdk/jdo/pool/PoolInitParameters.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/ClientStatus.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/ClientStatusMonitor.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/ClientWrapper.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/InnerPool.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/MethodStatus.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/NodeStatistic.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/NodesParam.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/Pool.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/RebalanceParameters.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/RequestInfo.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/Sampler.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/SessionCache.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/Statistic.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/StatusSnapshot.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/WorkList.java create mode 100644 client/src/main/java/info/frostfs/sdk/pool/WrapperPrm.java create mode 100644 client/src/main/java/info/frostfs/sdk/services/AccountingClient.java create mode 100644 client/src/main/java/info/frostfs/sdk/services/CommonClient.java create mode 100644 client/src/main/java/info/frostfs/sdk/services/impl/AccountingClientImpl.java create mode 100644 client/src/main/java/info/frostfs/sdk/utils/FrostFSMessages.java create mode 100644 client/src/test/java/info/frostfs/sdk/services/AccountingClientTest.java create mode 100644 client/src/test/java/info/frostfs/sdk/testgenerator/AccountingGenerator.java create mode 100644 exceptions/src/main/java/info/frostfs/sdk/exceptions/FrostFSException.java create mode 100644 exceptions/src/main/java/info/frostfs/sdk/exceptions/SessionExpiredFrostFSException.java create mode 100644 exceptions/src/main/java/info/frostfs/sdk/exceptions/SessionNotFoundFrostFSException.java create mode 100644 models/src/main/java/info/frostfs/sdk/constants/AttributeConst.java diff --git a/client/pom.xml b/client/pom.xml index a3f7ff7..e73703d 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -6,7 +6,7 @@ info.frostfs.sdk frostfs-sdk-java - 0.1.0 + 0.2.0 client @@ -21,17 +21,17 @@ info.frostfs.sdk cryptography - 0.1.0 + 0.2.0 info.frostfs.sdk models - 0.1.0 + 0.2.0 info.frostfs.sdk exceptions - 0.1.0 + 0.2.0 commons-codec @@ -54,5 +54,10 @@ simpleclient_common 0.16.0 + + org.slf4j + slf4j-api + 2.0.16 + \ No newline at end of file diff --git a/client/src/main/java/info/frostfs/sdk/FrostFSClient.java b/client/src/main/java/info/frostfs/sdk/FrostFSClient.java index 634c956..cb93dbb 100644 --- a/client/src/main/java/info/frostfs/sdk/FrostFSClient.java +++ b/client/src/main/java/info/frostfs/sdk/FrostFSClient.java @@ -1,5 +1,6 @@ package info.frostfs.sdk; +import frostfs.accounting.Types; import info.frostfs.sdk.dto.chain.Chain; import info.frostfs.sdk.dto.chain.ChainTarget; import info.frostfs.sdk.dto.container.Container; @@ -17,13 +18,16 @@ import info.frostfs.sdk.jdo.ClientEnvironment; import info.frostfs.sdk.jdo.ClientSettings; import info.frostfs.sdk.jdo.NetworkSettings; import info.frostfs.sdk.jdo.PutObjectParameters; -import info.frostfs.sdk.services.*; +import info.frostfs.sdk.pool.SessionCache; +import info.frostfs.sdk.pool.WrapperPrm; +import info.frostfs.sdk.services.CommonClient; import info.frostfs.sdk.services.impl.*; import info.frostfs.sdk.services.impl.interceptor.Configuration; import info.frostfs.sdk.services.impl.interceptor.MonitoringClientInterceptor; import info.frostfs.sdk.utils.Validator; import io.grpc.Channel; import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; import java.util.List; @@ -31,26 +35,28 @@ import static info.frostfs.sdk.constants.ErrorConst.VERSION_UNSUPPORTED_TEMPLATE import static info.frostfs.sdk.tools.GrpcClient.initGrpcChannel; import static java.util.Objects.nonNull; -public class FrostFSClient - implements ContainerClient, ObjectClient, ApeManagerClient, NetmapClient, SessionClient, ToolsClient { +public class FrostFSClient implements CommonClient { + private static final MonitoringClientInterceptor MONITORING_CLIENT_INTERCEPTOR = + MonitoringClientInterceptor.create(Configuration.allMetrics()); + private final ContainerClientImpl containerClientImpl; private final ObjectClientImpl objectClientImpl; private final ApeManagerClientImpl apeManagerClient; private final NetmapClientImpl netmapClientImpl; private final SessionClientImpl sessionClientImpl; private final ObjectToolsImpl objectToolsImpl; + private final AccountingClientImpl accountingClient; + private final ManagedChannel channel; public FrostFSClient(ClientSettings clientSettings) { Validator.validate(clientSettings); - Channel channel = nonNull(clientSettings.getChannel()) + this.channel = nonNull(clientSettings.getChannel()) ? clientSettings.getChannel() : initGrpcChannel(clientSettings); - MonitoringClientInterceptor monitoringClientInterceptor = MonitoringClientInterceptor - .create(Configuration.allMetrics()); - channel = ClientInterceptors.intercept(channel, monitoringClientInterceptor); + Channel interceptChannel = ClientInterceptors.intercept(channel, MONITORING_CLIENT_INTERCEPTOR); ClientEnvironment clientEnvironment = - new ClientEnvironment(clientSettings.getKey(), channel, new Version(), this); + new ClientEnvironment(clientSettings.getKey(), interceptChannel, new Version(), this); Validator.validate(clientEnvironment); @@ -60,10 +66,30 @@ public class FrostFSClient this.netmapClientImpl = new NetmapClientImpl(clientEnvironment); this.sessionClientImpl = new SessionClientImpl(clientEnvironment); this.objectToolsImpl = new ObjectToolsImpl(clientEnvironment); - checkFrostFsVersionSupport(clientEnvironment.getVersion()); + this.accountingClient = new AccountingClientImpl(clientEnvironment); + checkFrostFSVersionSupport(clientEnvironment.getVersion()); } - private void checkFrostFsVersionSupport(Version version) { + public FrostFSClient(WrapperPrm prm, SessionCache cache) { + this.channel = initGrpcChannel(prm.getAddress()); + + Channel interceptChannel = ClientInterceptors.intercept(channel, MONITORING_CLIENT_INTERCEPTOR); + ClientEnvironment clientEnvironment = + new ClientEnvironment(prm.getKey(), interceptChannel, new Version(), this); + + Validator.validate(clientEnvironment); + + this.containerClientImpl = new ContainerClientImpl(clientEnvironment); + this.objectClientImpl = new ObjectClientImpl(clientEnvironment); + this.apeManagerClient = new ApeManagerClientImpl(clientEnvironment); + this.netmapClientImpl = new NetmapClientImpl(clientEnvironment); + this.sessionClientImpl = new SessionClientImpl(clientEnvironment); + this.objectToolsImpl = new ObjectToolsImpl(clientEnvironment); + this.accountingClient = new AccountingClientImpl(clientEnvironment); + checkFrostFSVersionSupport(clientEnvironment.getVersion()); + } + + private void checkFrostFSVersionSupport(Version version) { var localNodeInfo = netmapClientImpl.getLocalNodeInfo(); if (!localNodeInfo.getVersion().isSupported(version)) { throw new ProcessFrostFSException( @@ -165,4 +191,18 @@ public class FrostFSClient public ObjectId calculateObjectId(ObjectHeader header) { return objectToolsImpl.calculateObjectId(header); } + + @Override + public Types.Decimal getBalance() { + return accountingClient.getBalance(); + } + + public String dial() { + accountingClient.getBalance(); + return null; + } + + public void close() { + channel.shutdown(); + } } diff --git a/client/src/main/java/info/frostfs/sdk/constants/PoolConst.java b/client/src/main/java/info/frostfs/sdk/constants/PoolConst.java new file mode 100644 index 0000000..30d853f --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/constants/PoolConst.java @@ -0,0 +1,14 @@ +package info.frostfs.sdk.constants; + +public class PoolConst { + public static final int DEFAULT_SESSION_TOKEN_EXPIRATION_DURATION = 100; // in epochs + public static final int DEFAULT_ERROR_THRESHOLD = 100; + public static final int DEFAULT_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT = 10; // Seconds + public static final int DEFAULT_REBALANCE_INTERVAL = 15; // Seconds + public static final int DEFAULT_HEALTHCHECK_TIMEOUT = 4; // Seconds + public static final int DEFAULT_DIAL_TIMEOUT = 5; // Seconds + public static final int DEFAULT_STREAM_TIMEOUT = 10; // Seconds + + private PoolConst() { + } +} diff --git a/client/src/main/java/info/frostfs/sdk/enums/HealthyStatus.java b/client/src/main/java/info/frostfs/sdk/enums/HealthyStatus.java new file mode 100644 index 0000000..f88a8db --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/enums/HealthyStatus.java @@ -0,0 +1,23 @@ +package info.frostfs.sdk.enums; + +public enum HealthyStatus { + // status HEALTHY is set when connection is ready to be used by the pool. + HEALTHY(1), + + // status UNHEALTHY_ON_REQUEST is set when communication after dialing to the + // endpoint is failed due to immediate or accumulated errors, connection is + // available and pool should close it before re-establishing connection once again. + UNHEALTHY_ON_REQUEST(2), + + // status UNHEALTHY_ON_DIAL is set when dialing to the endpoint is failed, + // so there is no connection to the endpoint, and pool should not close it + // before re-establishing connection once again. + UNHEALTHY_ON_DIAL(3), + ; + + public final int value; + + HealthyStatus(int value) { + this.value = value; + } +} diff --git a/client/src/main/java/info/frostfs/sdk/enums/MethodIndex.java b/client/src/main/java/info/frostfs/sdk/enums/MethodIndex.java new file mode 100644 index 0000000..af8f4a5 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/enums/MethodIndex.java @@ -0,0 +1,29 @@ +package info.frostfs.sdk.enums; + +public enum MethodIndex { + METHOD_BALANCE_GET("balanceGet"), + METHOD_CONTAINER_PUT("containerPut"), + METHOD_CONTAINER_GET("ContainerGet"), + METHOD_CONTAINER_LIST("ContainerList"), + METHOD_CONTAINER_DELETE("ContainerDelete"), + METHOD_ENDPOINT_INFO("EndpointInfo"), + METHOD_NETWORK_INFO("NetworkInfo"), + METHOD_NETMAP_SNAPSHOT("NetMapSnapshot"), + METHOD_OBJECT_PUT("ObjectPut"), + METHOD_OBJECT_DELETE("ObjectDelete"), + METHOD_OBJECT_GET("ObjectGet"), + METHOD_OBJECT_HEAD("ObjectHead"), + METHOD_OBJECT_RANGE("ObjectRange"), + METHOD_OBJECT_PATCH("ObjectPatch"), + METHOD_SESSION_CREATE("SessionCreate"), + METHOD_APE_MANAGER_ADD_CHAIN("APEManagerAddChain"), + METHOD_APE_MANAGER_REMOVE_CHAIN("APEManagerRemoveChain"), + METHOD_APE_MANAGER_LIST_CHAINS("APEManagerListChains"), + ; + + public final String methodName; + + MethodIndex(String methodName) { + this.methodName = methodName; + } +} diff --git a/client/src/main/java/info/frostfs/sdk/exceptions/ResponseFrostFSException.java b/client/src/main/java/info/frostfs/sdk/exceptions/ResponseFrostFSException.java index 6b216a6..c266395 100644 --- a/client/src/main/java/info/frostfs/sdk/exceptions/ResponseFrostFSException.java +++ b/client/src/main/java/info/frostfs/sdk/exceptions/ResponseFrostFSException.java @@ -4,7 +4,7 @@ import info.frostfs.sdk.dto.response.ResponseStatus; import lombok.Getter; @Getter -public class ResponseFrostFSException extends RuntimeException { +public class ResponseFrostFSException extends FrostFSException { private final ResponseStatus status; public ResponseFrostFSException(ResponseStatus status) { diff --git a/client/src/main/java/info/frostfs/sdk/jdo/ClientEnvironment.java b/client/src/main/java/info/frostfs/sdk/jdo/ClientEnvironment.java index 34eca67..015ba6d 100644 --- a/client/src/main/java/info/frostfs/sdk/jdo/ClientEnvironment.java +++ b/client/src/main/java/info/frostfs/sdk/jdo/ClientEnvironment.java @@ -38,4 +38,12 @@ public class ClientEnvironment { this.channel = channel; this.frostFSClient = frostFSClient; } + + public ClientEnvironment(ECDsa key, Channel channel, Version version, FrostFSClient frostFSClient) { + this.key = key; + this.ownerId = new OwnerId(key.getPublicKeyByte()); + this.version = version; + this.channel = channel; + this.frostFSClient = frostFSClient; + } } diff --git a/client/src/main/java/info/frostfs/sdk/jdo/ClientSettings.java b/client/src/main/java/info/frostfs/sdk/jdo/ClientSettings.java index bca70ce..3f8d2df 100644 --- a/client/src/main/java/info/frostfs/sdk/jdo/ClientSettings.java +++ b/client/src/main/java/info/frostfs/sdk/jdo/ClientSettings.java @@ -2,8 +2,8 @@ package info.frostfs.sdk.jdo; import info.frostfs.sdk.annotations.AtLeastOneIsFilled; import info.frostfs.sdk.annotations.NotNull; -import io.grpc.Channel; import io.grpc.ChannelCredentials; +import io.grpc.ManagedChannel; import lombok.Getter; import lombok.experimental.FieldNameConstants; @@ -17,7 +17,7 @@ public class ClientSettings { private String host; private ChannelCredentials credentials; - private Channel channel; + private ManagedChannel channel; public ClientSettings(String key, String host) { this.key = key; @@ -30,7 +30,7 @@ public class ClientSettings { this.credentials = credentials; } - public ClientSettings(String key, Channel channel) { + public ClientSettings(String key, ManagedChannel channel) { this.key = key; this.channel = channel; } diff --git a/client/src/main/java/info/frostfs/sdk/jdo/pool/NodeParameters.java b/client/src/main/java/info/frostfs/sdk/jdo/pool/NodeParameters.java new file mode 100644 index 0000000..01c9df0 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/jdo/pool/NodeParameters.java @@ -0,0 +1,12 @@ +package info.frostfs.sdk.jdo.pool; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class NodeParameters { + private final int priority; + private final String address; + private final double weight; +} diff --git a/client/src/main/java/info/frostfs/sdk/jdo/pool/PoolInitParameters.java b/client/src/main/java/info/frostfs/sdk/jdo/pool/PoolInitParameters.java new file mode 100644 index 0000000..71eb670 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/jdo/pool/PoolInitParameters.java @@ -0,0 +1,44 @@ +package info.frostfs.sdk.jdo.pool; + +import info.frostfs.sdk.jdo.ECDsa; +import info.frostfs.sdk.pool.ClientWrapper; +import io.grpc.ClientInterceptors; +import io.netty.channel.ChannelOption; +import lombok.Getter; +import lombok.Setter; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.Function; + +@Getter +@Setter +public class PoolInitParameters { + private ECDsa key; + + private long nodeDialTimeout; + + private long nodeStreamTimeout; + + private long healthCheckTimeout; + + private long clientRebalanceInterval; + + private long sessionExpirationDuration; + + private int errorThreshold; + + private NodeParameters[] nodeParams; + + private ChannelOption[] dialOptions; + + private Function clientBuilder; + + private long gracefulCloseOnSwitchTimeout; + + private Logger logger; + + private Collection interceptors = new ArrayList<>(); +} + diff --git a/client/src/main/java/info/frostfs/sdk/pool/ClientStatus.java b/client/src/main/java/info/frostfs/sdk/pool/ClientStatus.java new file mode 100644 index 0000000..53b04c1 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/ClientStatus.java @@ -0,0 +1,27 @@ +package info.frostfs.sdk.pool; + +public interface ClientStatus { + // isHealthy checks if the connection can handle requests. + boolean isHealthy(); + + // isDialed checks if the connection was created. + boolean isDialed(); + + // setUnhealthy marks client as unhealthy. + void setUnhealthy(); + + // address return address of endpoint. + String getAddress(); + + // currentErrorRate returns current errors rate. + // After specific threshold connection is considered as unhealthy. + // Pool.startRebalance routine can make this connection healthy again. + int getCurrentErrorRate(); + + // overallErrorRate returns the number of all happened errors. + long getOverallErrorRate(); + + // methodsStatus returns statistic for all used methods. + StatusSnapshot[] getMethodsStatus(); +} + diff --git a/client/src/main/java/info/frostfs/sdk/pool/ClientStatusMonitor.java b/client/src/main/java/info/frostfs/sdk/pool/ClientStatusMonitor.java new file mode 100644 index 0000000..0770482 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/ClientStatusMonitor.java @@ -0,0 +1,114 @@ +package info.frostfs.sdk.pool; + +import info.frostfs.sdk.enums.HealthyStatus; +import info.frostfs.sdk.enums.MethodIndex; +import info.frostfs.sdk.utils.FrostFSMessages; +import lombok.Getter; +import lombok.Setter; +import org.slf4j.Logger; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +@Getter +@Setter +public class ClientStatusMonitor implements ClientStatus { + private final ReentrantLock lock = new ReentrantLock(); + private final Logger logger; + private final AtomicInteger healthy = new AtomicInteger(); + + private final String address; + private final MethodStatus[] methods; + + private int errorThreshold; + private int currentErrorCount; + private long overallErrorCount; + + public ClientStatusMonitor(Logger logger, String address) { + this.logger = logger; + this.healthy.set(HealthyStatus.HEALTHY.value); + + this.address = address; + this.methods = Arrays.stream(MethodIndex.values()) + .map(t -> new MethodStatus(t.methodName)) + .toArray(MethodStatus[]::new); + } + + @Override + public boolean isHealthy() { + return healthy.get() == HealthyStatus.HEALTHY.value; + } + + @Override + public boolean isDialed() { + return healthy.get() != HealthyStatus.UNHEALTHY_ON_DIAL.value; + } + + public void setHealthy() { + healthy.set(HealthyStatus.HEALTHY.ordinal()); + } + + @Override + public void setUnhealthy() { + healthy.set(HealthyStatus.UNHEALTHY_ON_REQUEST.value); + } + + public void setUnhealthyOnDial() { + healthy.set(HealthyStatus.UNHEALTHY_ON_DIAL.value); + } + + public void incErrorRate() { + boolean thresholdReached; + lock.lock(); + try { + currentErrorCount++; + overallErrorCount++; + + thresholdReached = currentErrorCount >= errorThreshold; + + if (thresholdReached) { + setUnhealthy(); + currentErrorCount = 0; + } + } finally { + lock.unlock(); + } + + if (thresholdReached && logger != null) { + FrostFSMessages.errorThresholdReached(logger, address, errorThreshold); + } + } + + @Override + public int getCurrentErrorRate() { + lock.lock(); + try { + return currentErrorCount; + } finally { + lock.unlock(); + } + } + + @Override + public long getOverallErrorRate() { + lock.lock(); + try { + return overallErrorCount; + } finally { + lock.unlock(); + } + } + + @Override + public StatusSnapshot[] getMethodsStatus() { + StatusSnapshot[] result = new StatusSnapshot[methods.length]; + + for (int i = 0; i < result.length; i++) { + result[i] = methods[i].getSnapshot(); + } + + return result; + } +} + diff --git a/client/src/main/java/info/frostfs/sdk/pool/ClientWrapper.java b/client/src/main/java/info/frostfs/sdk/pool/ClientWrapper.java new file mode 100644 index 0000000..0189cc1 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/ClientWrapper.java @@ -0,0 +1,130 @@ +package info.frostfs.sdk.pool; + +import info.frostfs.sdk.FrostFSClient; +import info.frostfs.sdk.enums.MethodIndex; +import info.frostfs.sdk.exceptions.ResponseFrostFSException; +import info.frostfs.sdk.exceptions.ValidationFrostFSException; +import info.frostfs.sdk.utils.WaitUtil; +import lombok.AccessLevel; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static info.frostfs.sdk.constants.ErrorConst.POOL_CLIENT_UNHEALTHY; + +@Getter +public class ClientWrapper extends ClientStatusMonitor { + @Getter(value = AccessLevel.NONE) + private final Lock lock = new ReentrantLock(); + private final SessionCache sessionCache; + private final WrapperPrm wrapperPrm; + private FrostFSClient client; + + public ClientWrapper(WrapperPrm wrapperPrm, Pool pool) { + super(wrapperPrm.getLogger(), wrapperPrm.getAddress()); + this.wrapperPrm = wrapperPrm; + setErrorThreshold(wrapperPrm.getErrorThreshold()); + + this.sessionCache = pool.getSessionCache(); + this.client = new FrostFSClient(wrapperPrm, sessionCache); + } + + + public FrostFSClient getClient() { + lock.lock(); + try { + if (isHealthy()) { + return client; + } + return null; + } finally { + lock.unlock(); + } + } + + public void dial() { + FrostFSClient client = getClient(); + if (client == null) { + throw new ValidationFrostFSException(POOL_CLIENT_UNHEALTHY); + } + client.dial(); + } + + public void handleError(Exception exp) { + if (exp instanceof ResponseFrostFSException && ((ResponseFrostFSException) exp).getStatus() != null) { + switch (((ResponseFrostFSException) exp).getStatus().getCode()) { + case INTERNAL: + case WRONG_MAGIC_NUMBER: + case SIGNATURE_VERIFICATION_FAILURE: + case NODE_UNDER_MAINTENANCE: + incErrorRate(); + } + return; + } + + incErrorRate(); + } + + private void scheduleGracefulClose() { + if (client == null) { + return; + } + + WaitUtil.sleep(wrapperPrm.getGracefulCloseOnSwitchTimeout()); + client.close(); + } + + public CompletableFuture restartIfUnhealthy() { + try { + client.getLocalNodeInfo(); + return CompletableFuture.completedFuture(false); + } catch (Exception ignored) { + } + + if (isDialed()) { + scheduleGracefulClose(); + } + + return CompletableFuture.completedFuture(restartClient()); + } + + private boolean restartClient() { + FrostFSClient newClient = null; + try { + newClient = new FrostFSClient(wrapperPrm, sessionCache); + + var error = newClient.dial(); + if (StringUtils.isNotBlank(error)) { + setUnhealthyOnDial(); + newClient.close(); + return true; + } + + lock.lock(); + client = newClient; + lock.unlock(); + } catch (Exception exp) { + if (newClient != null) { + newClient.close(); + } + } + + try { + client.getLocalNodeInfo(); + } catch (Exception exp) { + setUnhealthy(); + return true; + } + + setHealthy(); + return false; + } + + public void incRequests(long elapsed, MethodIndex method) { + var methodStat = getMethods()[method.ordinal()]; + methodStat.incRequests(elapsed); + } +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/InnerPool.java b/client/src/main/java/info/frostfs/sdk/pool/InnerPool.java new file mode 100644 index 0000000..94a8be4 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/InnerPool.java @@ -0,0 +1,54 @@ +package info.frostfs.sdk.pool; + +import java.util.concurrent.locks.ReentrantLock; + +class InnerPool { + private static final int ATTEMPTS_COUNT = 3; + private final ReentrantLock lock = new ReentrantLock(); + private final ClientWrapper[] clients; + private Sampler sampler; + + InnerPool(Sampler sampler, ClientWrapper[] clients) { + this.sampler = sampler; + this.clients = clients; + } + + Sampler getSampler() { + return sampler; + } + + void setSampler(Sampler sampler) { + this.sampler = sampler; + } + + ClientWrapper[] getClients() { + return clients; + } + + ClientWrapper connection() { + lock.lock(); + try { + if (clients.length == 1) { + ClientWrapper client = clients[0]; + if (client.isHealthy()) { + return client; + } + } else { + int attempts = ATTEMPTS_COUNT * clients.length; + + for (int i = 0; i < attempts; i++) { + int index = sampler.next(); + + if (clients[index].isHealthy()) { + return clients[index]; + } + } + } + + return null; + } finally { + lock.unlock(); + } + } +} + diff --git a/client/src/main/java/info/frostfs/sdk/pool/MethodStatus.java b/client/src/main/java/info/frostfs/sdk/pool/MethodStatus.java new file mode 100644 index 0000000..5309c93 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/MethodStatus.java @@ -0,0 +1,31 @@ +package info.frostfs.sdk.pool; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; + +import java.util.concurrent.locks.ReentrantLock; + +@Setter +@Getter +public class MethodStatus { + @Getter(AccessLevel.NONE) + private final ReentrantLock lock = new ReentrantLock(); + private final String name; + private StatusSnapshot snapshot; + + public MethodStatus(String name) { + this.name = name; + this.snapshot = new StatusSnapshot(); + } + + void incRequests(long elapsed) { + lock.lock(); + try { + snapshot.setAllTime(snapshot.getAllTime() + elapsed); + snapshot.setAllRequests(snapshot.getAllRequests() + 1); + } finally { + lock.unlock(); + } + } +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/NodeStatistic.java b/client/src/main/java/info/frostfs/sdk/pool/NodeStatistic.java new file mode 100644 index 0000000..2ad4cdc --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/NodeStatistic.java @@ -0,0 +1,13 @@ +package info.frostfs.sdk.pool; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class NodeStatistic { + private String address; + private StatusSnapshot[] methods; + private long overallErrors; + private int currentErrors; +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/NodesParam.java b/client/src/main/java/info/frostfs/sdk/pool/NodesParam.java new file mode 100644 index 0000000..0221852 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/NodesParam.java @@ -0,0 +1,19 @@ +package info.frostfs.sdk.pool; + +import lombok.Getter; + +import java.util.ArrayList; +import java.util.List; + +@Getter +public class NodesParam { + private final int priority; + private final List address; + private final List weight; + + public NodesParam(int priority) { + this.priority = priority; + this.address = new ArrayList<>(); + this.weight = new ArrayList<>(); + } +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/Pool.java b/client/src/main/java/info/frostfs/sdk/pool/Pool.java new file mode 100644 index 0000000..3306ea5 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/Pool.java @@ -0,0 +1,520 @@ +package info.frostfs.sdk.pool; + +import frostfs.refs.Types; +import info.frostfs.sdk.dto.chain.Chain; +import info.frostfs.sdk.dto.chain.ChainTarget; +import info.frostfs.sdk.dto.container.Container; +import info.frostfs.sdk.dto.container.ContainerId; +import info.frostfs.sdk.dto.netmap.NetmapSnapshot; +import info.frostfs.sdk.dto.netmap.NodeInfo; +import info.frostfs.sdk.dto.object.*; +import info.frostfs.sdk.dto.session.SessionToken; +import info.frostfs.sdk.exceptions.FrostFSException; +import info.frostfs.sdk.exceptions.SessionExpiredFrostFSException; +import info.frostfs.sdk.exceptions.SessionNotFoundFrostFSException; +import info.frostfs.sdk.exceptions.ValidationFrostFSException; +import info.frostfs.sdk.jdo.ECDsa; +import info.frostfs.sdk.jdo.NetworkSettings; +import info.frostfs.sdk.jdo.PutObjectParameters; +import info.frostfs.sdk.jdo.pool.NodeParameters; +import info.frostfs.sdk.jdo.pool.PoolInitParameters; +import info.frostfs.sdk.services.CommonClient; +import info.frostfs.sdk.utils.FrostFSMessages; +import info.frostfs.sdk.utils.WaitUtil; +import lombok.Getter; +import org.slf4j.Logger; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; + +import static info.frostfs.sdk.Helper.getHexString; +import static info.frostfs.sdk.constants.ErrorConst.*; +import static info.frostfs.sdk.constants.PoolConst.*; + +@Getter +public class Pool implements CommonClient { + + private final ReentrantLock lock = new ReentrantLock(); + private final ECDsa key; + private final SessionCache sessionCache; + private final long sessionTokenDuration; + private final RebalanceParameters rebalanceParams; + private final Function clientBuilder; + private final Logger logger; + private InnerPool[] innerPools; + private Types.OwnerID ownerID; + private OwnerId ownerId; + private boolean disposedValue; + private long maxObjectSize; + private ClientStatus clientStatus; + + public Pool(PoolInitParameters options) { + if (options == null || options.getKey() == null) { + throw new ValidationFrostFSException( + String.format( + PARAMS_ARE_MISSING_TEMPLATE, + String.join( + FIELDS_DELIMITER_COMMA, PoolInitParameters.class.getName(), ECDsa.class.getName() + ) + ) + ); + } + + List nodesParams = adjustNodeParams(options.getNodeParams()); + SessionCache cache = new SessionCache(options.getSessionExpirationDuration()); + fillDefaultInitParams(options, this); + + this.key = options.getKey(); + this.sessionCache = cache; + this.logger = options.getLogger(); + this.sessionTokenDuration = options.getSessionExpirationDuration(); + + this.rebalanceParams = new RebalanceParameters( + nodesParams.toArray(new NodesParam[0]), + options.getHealthCheckTimeout(), + options.getClientRebalanceInterval(), + options.getSessionExpirationDuration()); + + this.clientBuilder = options.getClientBuilder(); + } + + private static List adjustNodeParams(NodeParameters[] nodeParams) { + if (nodeParams == null || nodeParams.length == 0) { + throw new ValidationFrostFSException(POOL_PEERS_IS_MISSING); + } + + Map nodesParamsDict = new HashMap<>(nodeParams.length); + for (NodeParameters nodeParam : nodeParams) { + var nodesParam = nodesParamsDict + .computeIfAbsent(nodeParam.getPriority(), k -> new NodesParam(nodeParam.getPriority())); + nodesParam.getAddress().add(nodeParam.getAddress()); + nodesParam.getWeight().add(nodeParam.getWeight()); + } + + List nodesParams = new ArrayList<>(nodesParamsDict.values()); + nodesParams.sort(Comparator.comparingInt(NodesParam::getPriority)); + + for (NodesParam nodes : nodesParams) { + double[] newWeights = adjustWeights(nodes.getWeight().stream().mapToDouble(Double::doubleValue).toArray()); + nodes.getWeight().clear(); + for (double weight : newWeights) { + nodes.getWeight().add(weight); + } + } + + return nodesParams; + } + + private static double[] adjustWeights(double[] weights) { + double[] adjusted = new double[weights.length]; + double sum = Arrays.stream(weights).sum(); + + if (sum > 0) { + for (int i = 0; i < weights.length; i++) { + adjusted[i] = weights[i] / sum; + } + } + + return adjusted; + } + + private static void fillDefaultInitParams(PoolInitParameters parameters, Pool pool) { + if (parameters.getSessionExpirationDuration() == 0) { + parameters.setSessionExpirationDuration(DEFAULT_SESSION_TOKEN_EXPIRATION_DURATION); + } + + if (parameters.getErrorThreshold() == 0) { + parameters.setErrorThreshold(DEFAULT_ERROR_THRESHOLD); + } + + if (parameters.getClientRebalanceInterval() <= 0) { + parameters.setClientRebalanceInterval(DEFAULT_REBALANCE_INTERVAL); + } + + if (parameters.getGracefulCloseOnSwitchTimeout() <= 0) { + parameters.setGracefulCloseOnSwitchTimeout(DEFAULT_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT); + } + + if (parameters.getHealthCheckTimeout() <= 0) { + parameters.setHealthCheckTimeout(DEFAULT_HEALTHCHECK_TIMEOUT); + } + + if (parameters.getNodeDialTimeout() <= 0) { + parameters.setNodeDialTimeout(DEFAULT_DIAL_TIMEOUT); + } + + if (parameters.getNodeStreamTimeout() <= 0) { + parameters.setNodeStreamTimeout(DEFAULT_STREAM_TIMEOUT); + } + + if (parameters.getSessionExpirationDuration() == 0) { + parameters.setSessionExpirationDuration(DEFAULT_SESSION_TOKEN_EXPIRATION_DURATION); + } + + if (parameters.getClientBuilder() == null) { + parameters.setClientBuilder(address -> { + WrapperPrm wrapperPrm = new WrapperPrm(); + wrapperPrm.setAddress(address); + wrapperPrm.setKey(parameters.getKey()); + wrapperPrm.setLogger(parameters.getLogger()); + wrapperPrm.setDialTimeout(parameters.getNodeDialTimeout()); + wrapperPrm.setStreamTimeout(parameters.getNodeStreamTimeout()); + wrapperPrm.setErrorThreshold(parameters.getErrorThreshold()); + wrapperPrm.setGracefulCloseOnSwitchTimeout(parameters.getGracefulCloseOnSwitchTimeout()); + wrapperPrm.setInterceptors(parameters.getInterceptors()); + return new ClientWrapper(wrapperPrm, pool); + }); + } + } + + private static SessionToken initSessionForDuration(ClientWrapper cw, long duration) { + var client = cw.getClient(); + NetworkSettings networkInfo = client.getNetworkSettings(); + + long epoch = networkInfo.getEpochDuration(); + long exp = (Long.MAX_VALUE - epoch < duration) ? Long.MAX_VALUE : (epoch + duration); + + return client.createSession(exp); + } + + private static String formCacheKey(String address, String key) { + return address + key; + } + + public String dial() { + InnerPool[] inner = new InnerPool[rebalanceParams.getNodesParams().length]; + boolean atLeastOneHealthy = false; + int i = 0; + + for (NodesParam nodeParams : rebalanceParams.getNodesParams()) { + ClientWrapper[] clients = new ClientWrapper[nodeParams.getWeight().size()]; + + for (int j = 0; j < nodeParams.getAddress().size(); j++) { + ClientWrapper client = clients[j] = clientBuilder.apply(nodeParams.getAddress().get(j)); + boolean dialed = false; + + try { + client.dial(); + dialed = true; + + SessionToken token = initSessionForDuration(client, rebalanceParams.getSessionExpirationDuration()); + String cacheKey = formCacheKey( + nodeParams.getAddress().get(j), + getHexString(key.getPublicKeyByte()) + ); + sessionCache.setValue(cacheKey, token); + + atLeastOneHealthy = true; + } catch (ValidationFrostFSException exp) { + break; + } catch (Exception exp) { + if (!dialed) { + client.setUnhealthyOnDial(); + } else { + client.setUnhealthy(); + } + + if (logger != null) { + FrostFSMessages + .sessionCreationError(logger, client.getWrapperPrm().getAddress(), exp.getMessage()); + } + } + } + + Sampler sampler = new Sampler(nodeParams.getWeight().stream().mapToDouble(Double::doubleValue).toArray()); + inner[i] = new InnerPool(sampler, clients); + i++; + } + + if (!atLeastOneHealthy) { + return POOL_NODES_UNHEALTHY; + } + + this.innerPools = inner; + + NetworkSettings networkSettings = getNetworkSettings(); + + this.maxObjectSize = networkSettings.getMaxObjectSize(); + startRebalance(); + + return null; + } + + private ClientWrapper connection() { + for (InnerPool pool : innerPools) { + ClientWrapper client = pool.connection(); + if (client != null) { + return client; + } + } + + throw new FrostFSException(POOL_CLIENTS_UNHEALTHY); + } + + public void close() { + if (innerPools != null) { + for (InnerPool innerPool : innerPools) { + for (ClientWrapper client : innerPool.getClients()) { + if (client.isDialed()) { + client.getClient().close(); + } + } + } + } + } + + public void startRebalance() { + double[][] buffers = new double[rebalanceParams.getNodesParams().length][]; + + for (int i = 0; i < rebalanceParams.getNodesParams().length; i++) { + NodesParam parameters = rebalanceParams.getNodesParams()[i]; + buffers[i] = new double[parameters.getWeight().size()]; + + CompletableFuture.runAsync(() -> { + WaitUtil.sleep(rebalanceParams.getClientRebalanceInterval()); + updateNodesHealth(buffers); + }); + } + } + + private void updateNodesHealth(double[][] buffers) { + CompletableFuture[] tasks = new CompletableFuture[innerPools.length]; + + for (int i = 0; i < innerPools.length; i++) { + double[] bufferWeights = buffers[i]; + int finalI = i; + tasks[i] = CompletableFuture.runAsync(() -> updateInnerNodesHealth(finalI, bufferWeights)); + } + + CompletableFuture.allOf(tasks).join(); + } + + private void updateInnerNodesHealth(int poolIndex, double[] bufferWeights) { + if (poolIndex > innerPools.length - 1) { + return; + } + + InnerPool pool = innerPools[poolIndex]; + RebalanceParameters options = rebalanceParams; + int[] healthyChanged = {0}; + + CompletableFuture[] tasks = new CompletableFuture[pool.getClients().length]; + + for (int j = 0; j < pool.getClients().length; j++) { + ClientWrapper client = pool.getClients()[j]; + AtomicBoolean healthy = new AtomicBoolean(false); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean changed = new AtomicBoolean(false); + + int finalJ = j; + tasks[j] = client.restartIfUnhealthy().handle((unused, throwable) -> { + if (throwable != null) { + error.set(throwable.getMessage()); + bufferWeights[finalJ] = 0; + sessionCache.deleteByPrefix(client.getAddress()); + } else { + changed.set(unused); + healthy.set(true); + bufferWeights[finalJ] = options.getNodesParams()[poolIndex].getWeight().get(finalJ); + } + return null; + }).thenRun(() -> { + if (changed.get()) { + if (error.get() != null && logger != null) { + FrostFSMessages.healthChanged(logger, client.getAddress(), healthy.get(), error.get()); + } + healthyChanged[0] = 1; + } + }); + } + + CompletableFuture.allOf(tasks).thenRun(() -> { + if (healthyChanged[0] == 1) { + double[] probabilities = adjustWeights(bufferWeights); + lock.lock(); + try { + pool.setSampler(new Sampler(probabilities)); + } finally { + lock.unlock(); + } + } + }); + } + + private boolean checkSessionTokenErr(Exception error, String address) { + if (error == null) { + return false; + } + + if (error instanceof SessionNotFoundFrostFSException || error instanceof SessionExpiredFrostFSException) { + sessionCache.deleteByPrefix(address); + return true; + } + + return false; + } + + public Statistic statistic() { + if (innerPools == null) { + throw new ValidationFrostFSException(POOL_NOT_DIALED); + } + + Statistic statistics = new Statistic(); + + for (InnerPool inner : innerPools) { + int valueIndex = 0; + String[] nodes = new String[inner.getClients().length]; + + lock.lock(); + try { + for (ClientWrapper client : inner.getClients()) { + if (client.isHealthy()) { + nodes[valueIndex] = client.getAddress(); + } + + NodeStatistic node = new NodeStatistic(); + node.setAddress(client.getAddress()); + node.setMethods(client.getMethodsStatus()); + node.setOverallErrors(client.getOverallErrorRate()); + node.setCurrentErrors(client.getCurrentErrorRate()); + + statistics.getNodes().add(node); + valueIndex++; + statistics.setOverallErrors(statistics.getOverallErrors() + node.getOverallErrors()); + } + + if (statistics.getCurrentNodes() == null || statistics.getCurrentNodes().length == 0) { + statistics.setCurrentNodes(nodes); + } + } finally { + lock.unlock(); + } + } + + return statistics; + } + + + @Override + public Container getContainer(ContainerId cid) { + ClientWrapper client = connection(); + return client.getClient().getContainer(cid); + } + + @Override + public List listContainers() { + ClientWrapper client = connection(); + return client.getClient().listContainers(); + } + + @Override + public ContainerId createContainer(Container container) { + ClientWrapper client = connection(); + return client.getClient().createContainer(container); + } + + @Override + public void deleteContainer(ContainerId cid) { + ClientWrapper client = connection(); + client.getClient().deleteContainer(cid); + } + + @Override + public ObjectHeader getObjectHead(ContainerId containerId, ObjectId objectId) { + ClientWrapper client = connection(); + return client.getClient().getObjectHead(containerId, objectId); + } + + @Override + public ObjectFrostFS getObject(ContainerId containerId, ObjectId objectId) { + ClientWrapper client = connection(); + return client.getClient().getObject(containerId, objectId); + } + + @Override + public ObjectId putObject(PutObjectParameters parameters) { + ClientWrapper client = connection(); + return client.getClient().putObject(parameters); + } + + @Override + public ObjectId putSingleObject(ObjectFrostFS objectFrostFS) { + ClientWrapper client = connection(); + return client.getClient().putSingleObject(objectFrostFS); + } + + @Override + public void deleteObject(ContainerId containerId, ObjectId objectId) { + ClientWrapper client = connection(); + client.getClient().deleteObject(containerId, objectId); + } + + @Override + public Iterable searchObjects(ContainerId cid, ObjectFilter... filters) { + ClientWrapper client = connection(); + return client.getClient().searchObjects(cid, filters); + } + + @Override + public byte[] addChain(Chain chain, ChainTarget chainTarget) { + ClientWrapper client = connection(); + return client.getClient().addChain(chain, chainTarget); + } + + @Override + public void removeChain(Chain chain, ChainTarget chainTarget) { + ClientWrapper client = connection(); + client.getClient().removeChain(chain, chainTarget); + } + + @Override + public List listChains(ChainTarget chainTarget) { + ClientWrapper client = connection(); + return client.getClient().listChains(chainTarget); + } + + @Override + public NetmapSnapshot getNetmapSnapshot() { + ClientWrapper client = connection(); + return client.getClient().getNetmapSnapshot(); + } + + @Override + public NodeInfo getLocalNodeInfo() { + ClientWrapper client = connection(); + return client.getClient().getLocalNodeInfo(); + } + + @Override + public NetworkSettings getNetworkSettings() { + ClientWrapper client = connection(); + return client.getClient().getNetworkSettings(); + } + + @Override + public SessionToken createSession(long expiration) { + ClientWrapper client = connection(); + return client.getClient().createSession(expiration); + } + + public frostfs.session.Types.SessionToken createSessionInternal(long expiration) { + ClientWrapper client = connection(); + return client.getClient().createSessionInternal(expiration); + } + + @Override + public ObjectId calculateObjectId(ObjectHeader header) { + ClientWrapper client = connection(); + return client.getClient().calculateObjectId(header); + } + + @Override + public frostfs.accounting.Types.Decimal getBalance() { + ClientWrapper client = connection(); + return client.getClient().getBalance(); + } +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/RebalanceParameters.java b/client/src/main/java/info/frostfs/sdk/pool/RebalanceParameters.java new file mode 100644 index 0000000..d7bd790 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/RebalanceParameters.java @@ -0,0 +1,15 @@ +package info.frostfs.sdk.pool; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +@AllArgsConstructor +public class RebalanceParameters { + private NodesParam[] nodesParams; + private long nodeRequestTimeout; + private long clientRebalanceInterval; + private long sessionExpirationDuration; +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/RequestInfo.java b/client/src/main/java/info/frostfs/sdk/pool/RequestInfo.java new file mode 100644 index 0000000..68d8312 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/RequestInfo.java @@ -0,0 +1,15 @@ +package info.frostfs.sdk.pool; + +import info.frostfs.sdk.enums.MethodIndex; +import lombok.Getter; +import lombok.Setter; + +import java.time.Duration; + +@Getter +@Setter +public class RequestInfo { + private String address; + private MethodIndex methodIndex; + private Duration elapsed; +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/Sampler.java b/client/src/main/java/info/frostfs/sdk/pool/Sampler.java new file mode 100644 index 0000000..505243a --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/Sampler.java @@ -0,0 +1,77 @@ +package info.frostfs.sdk.pool; + +import java.util.ArrayList; +import java.util.Random; + +class Sampler { + private final Object lock = new Object(); + private final Random random = new Random(); + private final double[] probabilities; + private final int[] alias; + + Sampler(double[] probabilities) { + ArrayList small = new ArrayList<>(); + ArrayList large = new ArrayList<>(); + + int n = probabilities.length; + + this.probabilities = new double[n]; + this.alias = new int[n]; + + // Compute scaled probabilities. + double[] p = new double[n]; + + for (int i = 0; i < n; i++) { + p[i] = probabilities[i] * n; + if (p[i] < 1) { + small.add(i); + } else { + large.add(i); + } + } + + while (!small.isEmpty() && !large.isEmpty()) { + int l = small.remove(small.size() - 1); + int g = large.remove(large.size() - 1); + + this.probabilities[l] = p[l]; + this.alias[l] = g; + + p[g] = p[g] + p[l] - 1; + + if (p[g] < 1) { + small.add(g); + } else { + large.add(g); + } + } + + while (!large.isEmpty()) { + int g = large.remove(large.size() - 1); + this.probabilities[g] = 1; + } + + while (!small.isEmpty()) { + int l = small.remove(small.size() - 1); + probabilities[l] = 1; + } + } + + int next() { + int n = alias.length; + + int i; + double f; + synchronized (lock) { + i = random.nextInt(n); + f = random.nextDouble(); + } + + if (f < probabilities[i]) { + return i; + } + + return alias[i]; + } +} + diff --git a/client/src/main/java/info/frostfs/sdk/pool/SessionCache.java b/client/src/main/java/info/frostfs/sdk/pool/SessionCache.java new file mode 100644 index 0000000..f64865b --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/SessionCache.java @@ -0,0 +1,46 @@ +package info.frostfs.sdk.pool; + +import info.frostfs.sdk.dto.session.SessionToken; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class SessionCache { + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + private final long tokenDuration; + private long currentEpoch; + + public SessionCache(long sessionExpirationDuration) { + this.tokenDuration = sessionExpirationDuration; + } + + public boolean contains(String key) { + return cache.containsKey(key); + } + + public boolean tryGetValue(String key, SessionToken[] value) { + if (key == null) { + value[0] = null; + return false; + } + + SessionToken token = cache.get(key); + if (token != null) { + value[0] = token; + return true; + } + return false; + } + + public void setValue(String key, SessionToken value) { + if (key != null) { + cache.put(key, value); + } + } + + public void deleteByPrefix(String prefix) { + cache.keySet().removeIf(key -> key.startsWith(prefix)); + } +} + + diff --git a/client/src/main/java/info/frostfs/sdk/pool/Statistic.java b/client/src/main/java/info/frostfs/sdk/pool/Statistic.java new file mode 100644 index 0000000..c7bc2ea --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/Statistic.java @@ -0,0 +1,15 @@ +package info.frostfs.sdk.pool; + +import lombok.Getter; +import lombok.Setter; + +import java.util.ArrayList; +import java.util.List; + +@Getter +@Setter +public class Statistic { + private long overallErrors; + private List nodes = new ArrayList<>(); + private String[] currentNodes; +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/StatusSnapshot.java b/client/src/main/java/info/frostfs/sdk/pool/StatusSnapshot.java new file mode 100644 index 0000000..99a797b --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/StatusSnapshot.java @@ -0,0 +1,13 @@ +package info.frostfs.sdk.pool; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class StatusSnapshot { + private long allTime; + private long allRequests; +} + + diff --git a/client/src/main/java/info/frostfs/sdk/pool/WorkList.java b/client/src/main/java/info/frostfs/sdk/pool/WorkList.java new file mode 100644 index 0000000..7e7a455 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/WorkList.java @@ -0,0 +1,22 @@ +package info.frostfs.sdk.pool; + +import java.util.ArrayList; +import java.util.List; + +class WorkList { + private final List elements = new ArrayList<>(); + + private int getLength() { + return elements.size(); + } + + private void add(int element) { + elements.add(element); + } + + private int remove() { + int last = elements.get(elements.size() - 1); + elements.remove(elements.size() - 1); + return last; + } +} diff --git a/client/src/main/java/info/frostfs/sdk/pool/WrapperPrm.java b/client/src/main/java/info/frostfs/sdk/pool/WrapperPrm.java new file mode 100644 index 0000000..d03efc4 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/pool/WrapperPrm.java @@ -0,0 +1,26 @@ +package info.frostfs.sdk.pool; + +import info.frostfs.sdk.jdo.ECDsa; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannelBuilder; +import lombok.Getter; +import lombok.Setter; +import org.slf4j.Logger; + +import java.util.Collection; + +@Getter +@Setter +public class WrapperPrm { + private Logger logger; + private String address; + private ECDsa key; + private long dialTimeout; + private long streamTimeout; + private int errorThreshold; + private Runnable responseInfoCallback; + private Runnable poolRequestInfoCallback; + private ManagedChannelBuilder grpcChannelOptions; + private long gracefulCloseOnSwitchTimeout; + private Collection interceptors; +} diff --git a/client/src/main/java/info/frostfs/sdk/services/AccountingClient.java b/client/src/main/java/info/frostfs/sdk/services/AccountingClient.java new file mode 100644 index 0000000..e29e153 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/AccountingClient.java @@ -0,0 +1,7 @@ +package info.frostfs.sdk.services; + +import frostfs.accounting.Types; + +public interface AccountingClient { + Types.Decimal getBalance(); +} diff --git a/client/src/main/java/info/frostfs/sdk/services/CommonClient.java b/client/src/main/java/info/frostfs/sdk/services/CommonClient.java new file mode 100644 index 0000000..e0482fb --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/CommonClient.java @@ -0,0 +1,5 @@ +package info.frostfs.sdk.services; + +public interface CommonClient extends + AccountingClient, ApeManagerClient, ContainerClient, NetmapClient, ObjectClient, SessionClient, ToolsClient { +} diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/AccountingClientImpl.java b/client/src/main/java/info/frostfs/sdk/services/impl/AccountingClientImpl.java new file mode 100644 index 0000000..deab5c2 --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/services/impl/AccountingClientImpl.java @@ -0,0 +1,46 @@ +package info.frostfs.sdk.services.impl; + +import frostfs.accounting.AccountingServiceGrpc; +import frostfs.accounting.Service; +import frostfs.accounting.Types; +import info.frostfs.sdk.jdo.ClientEnvironment; +import info.frostfs.sdk.mappers.object.OwnerIdMapper; +import info.frostfs.sdk.services.AccountingClient; +import info.frostfs.sdk.services.ContextAccessor; +import info.frostfs.sdk.tools.RequestConstructor; +import info.frostfs.sdk.tools.RequestSigner; +import info.frostfs.sdk.tools.Verifier; + +import java.util.Map; + +public class AccountingClientImpl extends ContextAccessor implements AccountingClient { + private final AccountingServiceGrpc.AccountingServiceBlockingStub serviceBlockingStub; + + public AccountingClientImpl(ClientEnvironment clientEnvironment) { + super(clientEnvironment); + this.serviceBlockingStub = AccountingServiceGrpc.newBlockingStub(getContext().getChannel()); + } + + @Override + public Types.Decimal getBalance() { + var request = createGetRequest(null); + + var response = serviceBlockingStub.balance(request); + + Verifier.checkResponse(response); + return response.getBody().getBalance(); + } + + private Service.BalanceRequest createGetRequest(Map xHeaders) { + var body = Service.BalanceRequest.Body.newBuilder() + .setOwnerId(OwnerIdMapper.toGrpcMessage(getContext().getOwnerId())) + .build(); + var request = Service.BalanceRequest.newBuilder() + .setBody(body); + + RequestConstructor.addMetaHeader(request, xHeaders); + RequestSigner.sign(request, getContext().getKey()); + + return request.build(); + } +} diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/ObjectToolsImpl.java b/client/src/main/java/info/frostfs/sdk/services/impl/ObjectToolsImpl.java index 9730f34..abe6927 100644 --- a/client/src/main/java/info/frostfs/sdk/services/impl/ObjectToolsImpl.java +++ b/client/src/main/java/info/frostfs/sdk/services/impl/ObjectToolsImpl.java @@ -47,16 +47,16 @@ public class ObjectToolsImpl extends ContextAccessor implements ToolsClient { ); } - public Types.Object createObject(ObjectFrostFS objectFrostFs) { - objectFrostFs.getHeader().setOwnerId(getContext().getOwnerId()); - objectFrostFs.getHeader().setVersion(getContext().getVersion()); - objectFrostFs.getHeader().setPayloadLength(objectFrostFs.getPayload().length); + public Types.Object createObject(ObjectFrostFS objectFrostFS) { + objectFrostFS.getHeader().setOwnerId(getContext().getOwnerId()); + objectFrostFS.getHeader().setVersion(getContext().getVersion()); + objectFrostFS.getHeader().setPayloadLength(objectFrostFS.getPayload().length); - var grpcHeader = ObjectHeaderMapper.toGrpcMessage(objectFrostFs.getHeader()).toBuilder() - .setPayloadHash(sha256Checksum(objectFrostFs.getPayload())) + var grpcHeader = ObjectHeaderMapper.toGrpcMessage(objectFrostFS.getHeader()).toBuilder() + .setPayloadHash(sha256Checksum(objectFrostFS.getPayload())) .build(); - var split = objectFrostFs.getHeader().getSplit(); + var split = objectFrostFS.getHeader().getSplit(); if (nonNull(split)) { grpcHeader = updateSplitValues(grpcHeader, split); } @@ -68,7 +68,7 @@ public class ObjectToolsImpl extends ContextAccessor implements ToolsClient { return Types.Object.newBuilder() .setHeader(grpcHeader) .setObjectId(objectId) - .setPayload(ByteString.copyFrom(objectFrostFs.getPayload())) + .setPayload(ByteString.copyFrom(objectFrostFS.getPayload())) .setSignature(sig) .build(); } diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/ClientMetrics.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/ClientMetrics.java index 149c145..656d19f 100644 --- a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/ClientMetrics.java +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/ClientMetrics.java @@ -15,10 +15,10 @@ import static info.frostfs.sdk.services.impl.interceptor.Labels.*; public class ClientMetrics { private static final List defaultRequestLabels = - Arrays.asList("grpc_type", "grpc_service", "grpc_method"); + Arrays.asList("grpc_target", "grpc_type", "grpc_service", "grpc_method"); private static final List defaultResponseLabels = - Arrays.asList("grpc_type", "grpc_service", "grpc_method", "code", "grpc_code"); + Arrays.asList("grpc_target", "grpc_type", "grpc_service", "grpc_method", "code", "grpc_code"); private static final Counter.Builder rpcStartedBuilder = Counter.build() @@ -113,7 +113,9 @@ public class ClientMetrics { .observe(latencySec); } - /** Knows how to produce {@link ClientMetrics} instances for individual methods. */ + /** + * Knows how to produce {@link ClientMetrics} instances for individual methods. + */ static class Factory { private final List> labelHeaderKeys; private final Counter rpcStarted; @@ -155,7 +157,9 @@ public class ClientMetrics { } } - /** Creates a {@link ClientMetrics} for the supplied gRPC method. */ + /** + * Creates a {@link ClientMetrics} for the supplied gRPC method. + */ ClientMetrics createMetricsForMethod(GrpcMethod grpcMethod) { return new ClientMetrics( labelHeaderKeys, diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/GrpcMethod.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/GrpcMethod.java index 8f9aa4c..41e4ef1 100644 --- a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/GrpcMethod.java +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/GrpcMethod.java @@ -1,27 +1,33 @@ package info.frostfs.sdk.services.impl.interceptor; +import io.grpc.Channel; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; public class GrpcMethod { + private final String targetName; private final String serviceName; private final String methodName; private final MethodType type; - private GrpcMethod(String serviceName, String methodName, MethodType type) { + private GrpcMethod(String targetName, String serviceName, String methodName, MethodType type) { + this.targetName = targetName; this.serviceName = serviceName; this.methodName = methodName; this.type = type; } - static GrpcMethod of(MethodDescriptor method) { + static GrpcMethod of(MethodDescriptor method, Channel channel) { String serviceName = MethodDescriptor.extractFullServiceName(method.getFullMethodName()); // Full method names are of the form: "full.serviceName/MethodName". We extract the last part. String methodName = method.getFullMethodName().substring(serviceName.length() + 1); - return new GrpcMethod(serviceName, methodName, method.getType()); + return new GrpcMethod(channel.authority(), serviceName, methodName, method.getType()); } + String targetName() { + return targetName; + } String serviceName() { return serviceName; diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Labels.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Labels.java index ba44eaf..e728a79 100644 --- a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Labels.java +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/Labels.java @@ -53,6 +53,7 @@ public class Labels { */ static T addLabels(SimpleCollector collector, List labels, GrpcMethod method) { List allLabels = new ArrayList<>(); + allLabels.add(method.targetName()); allLabels.add(method.type()); allLabels.add(method.serviceName()); allLabels.add(method.methodName()); diff --git a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientInterceptor.java b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientInterceptor.java index ada4761..4654487 100644 --- a/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientInterceptor.java +++ b/client/src/main/java/info/frostfs/sdk/services/impl/interceptor/MonitoringClientInterceptor.java @@ -1,11 +1,8 @@ package info.frostfs.sdk.services.impl.interceptor; +import io.grpc.*; + import java.time.Clock; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.MethodDescriptor; public class MonitoringClientInterceptor implements ClientInterceptor { @@ -19,6 +16,7 @@ public class MonitoringClientInterceptor implements ClientInterceptor { this.configuration = configuration; this.clientMetricsFactory = clientMetricsFactory; } + public static MonitoringClientInterceptor create(Configuration configuration) { return new MonitoringClientInterceptor( Clock.systemDefaultZone(), configuration, new ClientMetrics.Factory(configuration)); @@ -27,7 +25,7 @@ public class MonitoringClientInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { - GrpcMethod grpcMethod = GrpcMethod.of(methodDescriptor); + GrpcMethod grpcMethod = GrpcMethod.of(methodDescriptor, channel); ClientMetrics metrics = clientMetricsFactory.createMetricsForMethod(grpcMethod); return new MonitoringClientCall<>( channel.newCall(methodDescriptor, callOptions), metrics, grpcMethod, configuration, clock); diff --git a/client/src/main/java/info/frostfs/sdk/tools/GrpcClient.java b/client/src/main/java/info/frostfs/sdk/tools/GrpcClient.java index 13cdfe5..9692e3a 100644 --- a/client/src/main/java/info/frostfs/sdk/tools/GrpcClient.java +++ b/client/src/main/java/info/frostfs/sdk/tools/GrpcClient.java @@ -3,7 +3,7 @@ package info.frostfs.sdk.tools; import info.frostfs.sdk.exceptions.ValidationFrostFSException; import info.frostfs.sdk.jdo.ClientSettings; import info.frostfs.sdk.utils.Validator; -import io.grpc.Channel; +import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; import java.net.URI; @@ -16,7 +16,7 @@ public class GrpcClient { private GrpcClient() { } - public static Channel initGrpcChannel(ClientSettings clientSettings) { + public static ManagedChannel initGrpcChannel(ClientSettings clientSettings) { Validator.validate(clientSettings); try { @@ -32,4 +32,15 @@ public class GrpcClient { ); } } + + public static ManagedChannel initGrpcChannel(String address) { + try { + URI uri = new URI(address); + return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()).usePlaintext().build(); + } catch (URISyntaxException exp) { + throw new ValidationFrostFSException( + String.format(INVALID_HOST_TEMPLATE, address, exp.getMessage()) + ); + } + } } diff --git a/client/src/main/java/info/frostfs/sdk/utils/FrostFSMessages.java b/client/src/main/java/info/frostfs/sdk/utils/FrostFSMessages.java new file mode 100644 index 0000000..04b7dab --- /dev/null +++ b/client/src/main/java/info/frostfs/sdk/utils/FrostFSMessages.java @@ -0,0 +1,20 @@ +package info.frostfs.sdk.utils; + +import org.slf4j.Logger; + +public class FrostFSMessages { + private FrostFSMessages() { + } + + public static void sessionCreationError(Logger logger, String address, String error) { + logger.warn("Failed to create frostfs session token for client. Address {}, {}", address, error); + } + + public static void errorThresholdReached(Logger logger, String address, long threshold) { + logger.warn("Error threshold reached. Address {}, threshold {}", address, threshold); + } + + public static void healthChanged(Logger logger, String address, boolean healthy, String error) { + logger.warn("Health has changed: {} healthy {}, reason {}", address, healthy, error); + } +} diff --git a/client/src/test/java/info/frostfs/sdk/services/AccountingClientTest.java b/client/src/test/java/info/frostfs/sdk/services/AccountingClientTest.java new file mode 100644 index 0000000..e887223 --- /dev/null +++ b/client/src/test/java/info/frostfs/sdk/services/AccountingClientTest.java @@ -0,0 +1,102 @@ +package info.frostfs.sdk.services; + +import frostfs.accounting.AccountingServiceGrpc; +import frostfs.accounting.Service; +import info.frostfs.sdk.Base58; +import info.frostfs.sdk.dto.object.OwnerId; +import info.frostfs.sdk.jdo.ClientEnvironment; +import info.frostfs.sdk.services.impl.AccountingClientImpl; +import info.frostfs.sdk.testgenerator.AccountingGenerator; +import info.frostfs.sdk.tools.RequestConstructor; +import info.frostfs.sdk.tools.RequestSigner; +import info.frostfs.sdk.tools.Verifier; +import io.grpc.Channel; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.platform.commons.util.ReflectionUtils; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.lang.reflect.Field; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class AccountingClientTest { + private static final String OWNER_ID = "NVxUSpEEJzYXZZtUs18PrJTD9QZkLLNQ8S"; + + private AccountingClientImpl accountingClient; + + @Mock + private AccountingServiceGrpc.AccountingServiceBlockingStub AccountingServiceClient; + @Mock + private ClientEnvironment clientEnvironment; + @Mock + private Channel channel; + + private MockedStatic verifierMock; + private MockedStatic requestConstructorMock; + private MockedStatic requestSignerMock; + + @BeforeEach + void setUp() throws IllegalAccessException { + when(clientEnvironment.getChannel()).thenReturn(channel); + when(clientEnvironment.getOwnerId()).thenReturn(new OwnerId(OWNER_ID)); + + accountingClient = new AccountingClientImpl(clientEnvironment); + + Field field = ReflectionUtils.findFields(AccountingClientImpl.class, + f -> f.getName().equals("serviceBlockingStub"), + ReflectionUtils.HierarchyTraversalMode.TOP_DOWN) + .get(0); + + field.setAccessible(true); + field.set(accountingClient, AccountingServiceClient); + + verifierMock = Mockito.mockStatic(Verifier.class); + requestConstructorMock = Mockito.mockStatic(RequestConstructor.class); + requestSignerMock = Mockito.mockStatic(RequestSigner.class); + } + + @AfterEach + void cleanUp() { + verifierMock.close(); + requestConstructorMock.close(); + requestSignerMock.close(); + } + + @Test + void getBalance_success() { + //Given + var response = AccountingGenerator.generateBalanceResponse(); + + var captor = ArgumentCaptor.forClass(Service.BalanceRequest.class); + + when(AccountingServiceClient.balance(captor.capture())).thenReturn(response); + + //When + var result = accountingClient.getBalance(); + + //Then + requestConstructorMock.verify( + () -> RequestConstructor.addMetaHeader(any(Service.BalanceRequest.Builder.class), eq(null)), + times(1) + ); + requestSignerMock.verify( + () -> RequestSigner.sign(any(Service.BalanceRequest.Builder.class), eq(null)), + times(1) + ); + verifierMock.verify(() -> Verifier.checkResponse(response), times(1)); + + assertEquals(response.getBody().getBalance(), result); + + var request = captor.getValue(); + assertEquals(OWNER_ID, Base58.encode(request.getBody().getOwnerId().getValue().toByteArray())); + } +} diff --git a/client/src/test/java/info/frostfs/sdk/testgenerator/AccountingGenerator.java b/client/src/test/java/info/frostfs/sdk/testgenerator/AccountingGenerator.java new file mode 100644 index 0000000..13a59d2 --- /dev/null +++ b/client/src/test/java/info/frostfs/sdk/testgenerator/AccountingGenerator.java @@ -0,0 +1,42 @@ +package info.frostfs.sdk.testgenerator; + +import frostfs.accounting.Service; +import frostfs.accounting.Types; + +public class AccountingGenerator { + + public static Service.BalanceRequest generateBalanceRequest() { + return Service.BalanceRequest.newBuilder() + .setBody(generateBalanceRequestBody()) + .setMetaHeader(SessionGenerator.generateRequestMetaHeader()) + .setVerifyHeader(SessionGenerator.generateRequestVerificationHeader()) + .build(); + } + + public static Service.BalanceRequest.Body generateBalanceRequestBody() { + return Service.BalanceRequest.Body.newBuilder() + .setOwnerId(RefsGenerator.generateOwnerID()) + .build(); + } + + public static Service.BalanceResponse generateBalanceResponse() { + return Service.BalanceResponse.newBuilder() + .setBody(generateBalanceResponseBody()) + .setMetaHeader(SessionGenerator.generateResponseMetaHeader()) + .setVerifyHeader(SessionGenerator.generateResponseVerificationHeader()) + .build(); + } + + public static Service.BalanceResponse.Body generateBalanceResponseBody() { + return Service.BalanceResponse.Body.newBuilder() + .setBalance(generateDecimal()) + .build(); + } + + public static Types.Decimal generateDecimal() { + return Types.Decimal.newBuilder() + .setValue(1) + .setPrecision(2) + .build(); + } +} diff --git a/cryptography/pom.xml b/cryptography/pom.xml index 33bf8fb..606a193 100644 --- a/cryptography/pom.xml +++ b/cryptography/pom.xml @@ -6,7 +6,7 @@ info.frostfs.sdk frostfs-sdk-java - 0.1.0 + 0.2.0 cryptography @@ -21,7 +21,7 @@ info.frostfs.sdk exceptions - 0.1.0 + 0.2.0 com.google.protobuf diff --git a/exceptions/pom.xml b/exceptions/pom.xml index 2c80de2..a24f1fc 100644 --- a/exceptions/pom.xml +++ b/exceptions/pom.xml @@ -6,7 +6,7 @@ info.frostfs.sdk frostfs-sdk-java - 0.1.0 + 0.2.0 exceptions diff --git a/exceptions/src/main/java/info/frostfs/sdk/constants/ErrorConst.java b/exceptions/src/main/java/info/frostfs/sdk/constants/ErrorConst.java index 1bf25d4..26f3eac 100644 --- a/exceptions/src/main/java/info/frostfs/sdk/constants/ErrorConst.java +++ b/exceptions/src/main/java/info/frostfs/sdk/constants/ErrorConst.java @@ -40,6 +40,11 @@ public class ErrorConst { public static final String WRONG_UUID_SIZE_TEMPLATE = "uuid byte array length must be %s"; + public static final String POOL_CLIENT_UNHEALTHY = "pool client unhealthy"; + public static final String POOL_PEERS_IS_MISSING = "no FrostFS peers configured"; + public static final String POOL_NODES_UNHEALTHY = "at least one node must be healthy"; + public static final String POOL_CLIENTS_UNHEALTHY = "cannot find alive client"; + public static final String POOL_NOT_DIALED = "pool not dialed"; public static final String FIELDS_DELIMITER_COMMA = ", "; public static final String FIELDS_DELIMITER_OR = " or "; diff --git a/exceptions/src/main/java/info/frostfs/sdk/exceptions/FrostFSException.java b/exceptions/src/main/java/info/frostfs/sdk/exceptions/FrostFSException.java new file mode 100644 index 0000000..fc41bb9 --- /dev/null +++ b/exceptions/src/main/java/info/frostfs/sdk/exceptions/FrostFSException.java @@ -0,0 +1,15 @@ +package info.frostfs.sdk.exceptions; + +public class FrostFSException extends RuntimeException { + + public FrostFSException() { + } + + public FrostFSException(String message) { + super(message); + } + + public FrostFSException(Throwable cause) { + super(cause); + } +} diff --git a/exceptions/src/main/java/info/frostfs/sdk/exceptions/ProcessFrostFSException.java b/exceptions/src/main/java/info/frostfs/sdk/exceptions/ProcessFrostFSException.java index 47e06b3..6cd3560 100644 --- a/exceptions/src/main/java/info/frostfs/sdk/exceptions/ProcessFrostFSException.java +++ b/exceptions/src/main/java/info/frostfs/sdk/exceptions/ProcessFrostFSException.java @@ -1,6 +1,6 @@ package info.frostfs.sdk.exceptions; -public class ProcessFrostFSException extends RuntimeException { +public class ProcessFrostFSException extends FrostFSException { public ProcessFrostFSException(String message) { super(message); } diff --git a/exceptions/src/main/java/info/frostfs/sdk/exceptions/SessionExpiredFrostFSException.java b/exceptions/src/main/java/info/frostfs/sdk/exceptions/SessionExpiredFrostFSException.java new file mode 100644 index 0000000..1a3a1e8 --- /dev/null +++ b/exceptions/src/main/java/info/frostfs/sdk/exceptions/SessionExpiredFrostFSException.java @@ -0,0 +1,8 @@ +package info.frostfs.sdk.exceptions; + +public class SessionExpiredFrostFSException extends FrostFSException { + + public SessionExpiredFrostFSException(String message) { + super(message); + } +} diff --git a/exceptions/src/main/java/info/frostfs/sdk/exceptions/SessionNotFoundFrostFSException.java b/exceptions/src/main/java/info/frostfs/sdk/exceptions/SessionNotFoundFrostFSException.java new file mode 100644 index 0000000..0e8522b --- /dev/null +++ b/exceptions/src/main/java/info/frostfs/sdk/exceptions/SessionNotFoundFrostFSException.java @@ -0,0 +1,8 @@ +package info.frostfs.sdk.exceptions; + +public class SessionNotFoundFrostFSException extends FrostFSException { + + public SessionNotFoundFrostFSException(String message) { + super(message); + } +} diff --git a/exceptions/src/main/java/info/frostfs/sdk/exceptions/TimeoutFrostFSException.java b/exceptions/src/main/java/info/frostfs/sdk/exceptions/TimeoutFrostFSException.java index fca9263..5e1ee20 100644 --- a/exceptions/src/main/java/info/frostfs/sdk/exceptions/TimeoutFrostFSException.java +++ b/exceptions/src/main/java/info/frostfs/sdk/exceptions/TimeoutFrostFSException.java @@ -1,6 +1,6 @@ package info.frostfs.sdk.exceptions; -public class TimeoutFrostFSException extends RuntimeException { +public class TimeoutFrostFSException extends FrostFSException { public TimeoutFrostFSException() { } } diff --git a/exceptions/src/main/java/info/frostfs/sdk/exceptions/ValidationFrostFSException.java b/exceptions/src/main/java/info/frostfs/sdk/exceptions/ValidationFrostFSException.java index d294ef8..74e5259 100644 --- a/exceptions/src/main/java/info/frostfs/sdk/exceptions/ValidationFrostFSException.java +++ b/exceptions/src/main/java/info/frostfs/sdk/exceptions/ValidationFrostFSException.java @@ -1,6 +1,6 @@ package info.frostfs.sdk.exceptions; -public class ValidationFrostFSException extends RuntimeException { +public class ValidationFrostFSException extends FrostFSException { public ValidationFrostFSException(String message) { super(message); } diff --git a/models/pom.xml b/models/pom.xml index a5dfe24..08dbdc6 100644 --- a/models/pom.xml +++ b/models/pom.xml @@ -6,7 +6,7 @@ info.frostfs.sdk frostfs-sdk-java - 0.1.0 + 0.2.0 models @@ -21,17 +21,17 @@ info.frostfs.sdk cryptography - 0.1.0 + 0.2.0 info.frostfs.sdk protos - 0.1.0 + 0.2.0 info.frostfs.sdk exceptions - 0.1.0 + 0.2.0 diff --git a/models/src/main/java/info/frostfs/sdk/constants/AppConst.java b/models/src/main/java/info/frostfs/sdk/constants/AppConst.java index 9942aa7..67a2422 100644 --- a/models/src/main/java/info/frostfs/sdk/constants/AppConst.java +++ b/models/src/main/java/info/frostfs/sdk/constants/AppConst.java @@ -1,6 +1,8 @@ package info.frostfs.sdk.constants; public class AppConst { + public static final String RESERVED_PREFIX = "__SYSTEM__"; + public static final int DEFAULT_MAJOR_VERSION = 2; public static final int DEFAULT_MINOR_VERSION = 13; public static final int BYTE_SHIFT = 10; diff --git a/models/src/main/java/info/frostfs/sdk/constants/AttributeConst.java b/models/src/main/java/info/frostfs/sdk/constants/AttributeConst.java new file mode 100644 index 0000000..214a9e3 --- /dev/null +++ b/models/src/main/java/info/frostfs/sdk/constants/AttributeConst.java @@ -0,0 +1,10 @@ +package info.frostfs.sdk.constants; + +import static info.frostfs.sdk.constants.AppConst.RESERVED_PREFIX; + +public class AttributeConst { + public static final String DISABLE_HOMOMORPHIC_HASHING_ATTRIBUTE = RESERVED_PREFIX + "DISABLE_HOMOMORPHIC_HASHING"; + + private AttributeConst() { + } +} diff --git a/models/src/main/java/info/frostfs/sdk/constants/XHeaderConst.java b/models/src/main/java/info/frostfs/sdk/constants/XHeaderConst.java index e8b2c72..805762a 100644 --- a/models/src/main/java/info/frostfs/sdk/constants/XHeaderConst.java +++ b/models/src/main/java/info/frostfs/sdk/constants/XHeaderConst.java @@ -1,9 +1,10 @@ package info.frostfs.sdk.constants; +import static info.frostfs.sdk.constants.AppConst.RESERVED_PREFIX; + public class XHeaderConst { - public static final String RESERVED_XHEADER_PREFIX = "__SYSTEM__"; - public static final String XHEADER_NETMAP_EPOCH = RESERVED_XHEADER_PREFIX + "NETMAP_EPOCH"; - public static final String XHEADER_NETMAP_LOOKUP_DEPTH = RESERVED_XHEADER_PREFIX + "NETMAP_LOOKUP_DEPTH"; + public static final String XHEADER_NETMAP_EPOCH = RESERVED_PREFIX + "NETMAP_EPOCH"; + public static final String XHEADER_NETMAP_LOOKUP_DEPTH = RESERVED_PREFIX + "NETMAP_LOOKUP_DEPTH"; private XHeaderConst() { } diff --git a/models/src/main/java/info/frostfs/sdk/dto/container/Container.java b/models/src/main/java/info/frostfs/sdk/dto/container/Container.java index 2d0816e..6f04b2e 100644 --- a/models/src/main/java/info/frostfs/sdk/dto/container/Container.java +++ b/models/src/main/java/info/frostfs/sdk/dto/container/Container.java @@ -6,6 +6,8 @@ import info.frostfs.sdk.enums.BasicAcl; import lombok.Getter; import lombok.Setter; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; @Getter @@ -15,6 +17,7 @@ public class Container { private BasicAcl basicAcl; private PlacementPolicy placementPolicy; private Version version; + private Map attributes = new HashMap<>(); public Container(BasicAcl basicAcl, PlacementPolicy placementPolicy) { this.nonce = UUID.randomUUID(); diff --git a/models/src/main/java/info/frostfs/sdk/mappers/container/ContainerMapper.java b/models/src/main/java/info/frostfs/sdk/mappers/container/ContainerMapper.java index 76f8479..9b9dd72 100644 --- a/models/src/main/java/info/frostfs/sdk/mappers/container/ContainerMapper.java +++ b/models/src/main/java/info/frostfs/sdk/mappers/container/ContainerMapper.java @@ -7,9 +7,13 @@ import info.frostfs.sdk.enums.BasicAcl; import info.frostfs.sdk.exceptions.ProcessFrostFSException; import info.frostfs.sdk.mappers.netmap.PlacementPolicyMapper; import info.frostfs.sdk.mappers.netmap.VersionMapper; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.stream.Collectors; import static info.frostfs.sdk.UuidExtension.asBytes; import static info.frostfs.sdk.UuidExtension.asUuid; +import static info.frostfs.sdk.constants.AttributeConst.DISABLE_HOMOMORPHIC_HASHING_ATTRIBUTE; import static info.frostfs.sdk.constants.ErrorConst.UNKNOWN_ENUM_VALUE_TEMPLATE; import static java.util.Objects.isNull; @@ -22,11 +26,23 @@ public class ContainerMapper { return null; } - return Types.Container.newBuilder() + var containerGrpc = Types.Container.newBuilder() .setBasicAcl(container.getBasicAcl().value) .setPlacementPolicy(PlacementPolicyMapper.toGrpcMessage(container.getPlacementPolicy())) - .setNonce(ByteString.copyFrom(asBytes(container.getNonce()))) - .build(); + .setNonce(ByteString.copyFrom(asBytes(container.getNonce()))); + + container.getAttributes().putIfAbsent(DISABLE_HOMOMORPHIC_HASHING_ATTRIBUTE, Boolean.TRUE.toString()); + var attributes = container.getAttributes().entrySet().stream() + .map(entry -> + Types.Container.Attribute.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build() + ) + .collect(Collectors.toList()); + containerGrpc.addAllAttributes(attributes); + + return containerGrpc.build(); } public static Container toModel(Types.Container containerGrpc) { @@ -41,9 +57,17 @@ public class ContainerMapper { ); } + var container = new Container(basicAcl, PlacementPolicyMapper.toModel(containerGrpc.getPlacementPolicy())); container.setNonce(asUuid(containerGrpc.getNonce().toByteArray())); container.setVersion(VersionMapper.toModel(containerGrpc.getVersion())); + + if (CollectionUtils.isNotEmpty(containerGrpc.getAttributesList())) { + var attributes = containerGrpc.getAttributesList().stream() + .collect(Collectors.toMap(Types.Container.Attribute::getKey, Types.Container.Attribute::getValue)); + container.setAttributes(attributes); + } + return container; } } diff --git a/models/src/test/java/info/frostfs/sdk/mappers/container/ContainerMapperTest.java b/models/src/test/java/info/frostfs/sdk/mappers/container/ContainerMapperTest.java index d4c6dd4..4631766 100644 --- a/models/src/test/java/info/frostfs/sdk/mappers/container/ContainerMapperTest.java +++ b/models/src/test/java/info/frostfs/sdk/mappers/container/ContainerMapperTest.java @@ -15,6 +15,7 @@ import java.util.UUID; import static info.frostfs.sdk.UuidExtension.asBytes; import static info.frostfs.sdk.UuidExtension.asUuid; +import static info.frostfs.sdk.constants.AttributeConst.DISABLE_HOMOMORPHIC_HASHING_ATTRIBUTE; import static org.junit.jupiter.api.Assertions.*; public class ContainerMapperTest { @@ -24,6 +25,8 @@ public class ContainerMapperTest { //Given var placementPolicy = new PlacementPolicy(new Replica[]{new Replica(1)}, true); var container = new Container(BasicAcl.PUBLIC_RW, placementPolicy); + container.getAttributes().put("key1", "val1"); + container.getAttributes().put(DISABLE_HOMOMORPHIC_HASHING_ATTRIBUTE, "false"); //When var result = ContainerMapper.toGrpcMessage(container); @@ -42,6 +45,11 @@ public class ContainerMapperTest { container.getPlacementPolicy().getReplicas()[0].getSelector(), result.getPlacementPolicy().getReplicasList().get(0).getSelector() ); + + assertEquals("key1", result.getAttributes(0).getKey()); + assertEquals("val1", result.getAttributes(0).getValue()); + assertEquals(DISABLE_HOMOMORPHIC_HASHING_ATTRIBUTE, result.getAttributes(1).getKey()); + assertEquals("false", result.getAttributes(1).getValue()); } @Test @@ -69,11 +77,23 @@ public class ContainerMapperTest { .addReplicas(replica) .build(); + var attribute1 = Types.Container.Attribute.newBuilder() + .setKey("key1") + .setValue("val1") + .build(); + + var attribute2 = Types.Container.Attribute.newBuilder() + .setKey("key2") + .setValue("val2") + .build(); + var container = Types.Container.newBuilder() .setBasicAcl(basicAcl.value) .setNonce(ByteString.copyFrom(asBytes(UUID.randomUUID()))) .setVersion(version) .setPlacementPolicy(placementPolicy) + .addAttributes(attribute1) + .addAttributes(attribute2) .build(); //When @@ -95,6 +115,9 @@ public class ContainerMapperTest { ); assertEquals(version.getMajor(), result.getVersion().getMajor()); assertEquals(version.getMinor(), result.getVersion().getMinor()); + + assertEquals(attribute1.getValue(), result.getAttributes().get(attribute1.getKey())); + assertEquals(attribute2.getValue(), result.getAttributes().get(attribute2.getKey())); } @Test diff --git a/pom.xml b/pom.xml index bd46c8b..2c9f30e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ info.frostfs.sdk frostfs-sdk-java - 0.1.0 + 0.2.0 pom client diff --git a/protos/pom.xml b/protos/pom.xml index 5381838..d08ae98 100644 --- a/protos/pom.xml +++ b/protos/pom.xml @@ -6,7 +6,7 @@ info.frostfs.sdk frostfs-sdk-java - 0.1.0 + 0.2.0 protos