frostfs-sdk-java/client/src/main/java/info/frostfs/sdk/pool/Pool.java
Ori Bruk 3861eb0dc2
All checks were successful
DCO / DCO (pull_request) Successful in 26s
Verify code phase / Verify code (pull_request) Successful in 1m38s
[#41] Add APE rule serializer
Signed-off-by: Ori Bruk <o.bruk@yadro.com>
2025-02-13 20:57:31 +03:00

557 lines
20 KiB
Java

package info.frostfs.sdk.pool;
import frostfs.refs.Types;
import info.frostfs.sdk.dto.container.Container;
import info.frostfs.sdk.dto.container.ContainerId;
import info.frostfs.sdk.dto.netmap.NetmapSnapshot;
import info.frostfs.sdk.dto.netmap.NodeInfo;
import info.frostfs.sdk.dto.object.ObjectFrostFS;
import info.frostfs.sdk.dto.object.ObjectHeader;
import info.frostfs.sdk.dto.object.ObjectId;
import info.frostfs.sdk.dto.object.OwnerId;
import info.frostfs.sdk.dto.session.SessionToken;
import info.frostfs.sdk.exceptions.FrostFSException;
import info.frostfs.sdk.exceptions.SessionExpiredFrostFSException;
import info.frostfs.sdk.exceptions.SessionNotFoundFrostFSException;
import info.frostfs.sdk.exceptions.ValidationFrostFSException;
import info.frostfs.sdk.jdo.ECDsa;
import info.frostfs.sdk.jdo.NetworkSettings;
import info.frostfs.sdk.jdo.parameters.CallContext;
import info.frostfs.sdk.jdo.parameters.ape.PrmApeChainAdd;
import info.frostfs.sdk.jdo.parameters.ape.PrmApeChainList;
import info.frostfs.sdk.jdo.parameters.ape.PrmApeChainRemove;
import info.frostfs.sdk.jdo.parameters.container.PrmContainerCreate;
import info.frostfs.sdk.jdo.parameters.container.PrmContainerDelete;
import info.frostfs.sdk.jdo.parameters.container.PrmContainerGet;
import info.frostfs.sdk.jdo.parameters.container.PrmContainerGetAll;
import info.frostfs.sdk.jdo.parameters.object.*;
import info.frostfs.sdk.jdo.parameters.object.patch.PrmObjectPatch;
import info.frostfs.sdk.jdo.parameters.object.patch.PrmRangeGet;
import info.frostfs.sdk.jdo.parameters.object.patch.PrmRangeHashGet;
import info.frostfs.sdk.jdo.parameters.session.PrmSessionCreate;
import info.frostfs.sdk.jdo.pool.NodeParameters;
import info.frostfs.sdk.jdo.pool.PoolInitParameters;
import info.frostfs.sdk.jdo.result.ObjectHeaderResult;
import info.frostfs.sdk.services.CommonClient;
import info.frostfs.sdk.services.impl.rwhelper.ObjectWriter;
import info.frostfs.sdk.services.impl.rwhelper.RangeReader;
import info.frostfs.sdk.utils.FrostFSMessages;
import info.frostfs.sdk.utils.WaitUtil;
import lombok.Getter;
import org.slf4j.Logger;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static info.frostfs.sdk.Helper.getHexString;
import static info.frostfs.sdk.constants.ErrorConst.*;
import static info.frostfs.sdk.constants.PoolConst.*;
@Getter
public class Pool implements CommonClient {
private final ReentrantLock lock = new ReentrantLock();
private final ECDsa key;
private final SessionCache sessionCache;
private final long sessionTokenDuration;
private final RebalanceParameters rebalanceParams;
private final Function<String, ClientWrapper> clientBuilder;
private final Logger logger;
private InnerPool[] innerPools;
private Types.OwnerID ownerID;
private OwnerId ownerId;
private boolean disposedValue;
private long maxObjectSize;
private ClientStatus clientStatus;
public Pool(PoolInitParameters options) {
if (options == null || options.getKey() == null) {
throw new ValidationFrostFSException(
String.format(
PARAMS_ARE_MISSING_TEMPLATE,
String.join(
FIELDS_DELIMITER_COMMA, PoolInitParameters.class.getName(), ECDsa.class.getName()
)
)
);
}
List<NodesParam> nodesParams = adjustNodeParams(options.getNodeParams());
SessionCache cache = new SessionCache(options.getSessionExpirationDuration());
fillDefaultInitParams(options, this);
this.key = options.getKey();
this.sessionCache = cache;
this.logger = options.getLogger();
this.sessionTokenDuration = options.getSessionExpirationDuration();
this.rebalanceParams = new RebalanceParameters(
nodesParams.toArray(new NodesParam[0]),
options.getHealthCheckTimeout(),
options.getClientRebalanceInterval(),
options.getSessionExpirationDuration());
this.clientBuilder = options.getClientBuilder();
}
private static List<NodesParam> adjustNodeParams(NodeParameters[] nodeParams) {
if (nodeParams == null || nodeParams.length == 0) {
throw new ValidationFrostFSException(POOL_PEERS_IS_MISSING);
}
Map<Integer, NodesParam> nodesParamsDict = new HashMap<>(nodeParams.length);
for (NodeParameters nodeParam : nodeParams) {
var nodesParam = nodesParamsDict
.computeIfAbsent(nodeParam.getPriority(), k -> new NodesParam(nodeParam.getPriority()));
nodesParam.getAddress().add(nodeParam.getAddress());
nodesParam.getWeight().add(nodeParam.getWeight());
}
List<NodesParam> nodesParams = new ArrayList<>(nodesParamsDict.values());
nodesParams.sort(Comparator.comparingInt(NodesParam::getPriority));
for (NodesParam nodes : nodesParams) {
double[] newWeights = adjustWeights(nodes.getWeight().stream().mapToDouble(Double::doubleValue).toArray());
nodes.getWeight().clear();
for (double weight : newWeights) {
nodes.getWeight().add(weight);
}
}
return nodesParams;
}
private static double[] adjustWeights(double[] weights) {
double[] adjusted = new double[weights.length];
double sum = Arrays.stream(weights).sum();
if (sum > 0) {
for (int i = 0; i < weights.length; i++) {
adjusted[i] = weights[i] / sum;
}
}
return adjusted;
}
private static void fillDefaultInitParams(PoolInitParameters parameters, Pool pool) {
if (parameters.getSessionExpirationDuration() == 0) {
parameters.setSessionExpirationDuration(DEFAULT_SESSION_TOKEN_EXPIRATION_DURATION);
}
if (parameters.getErrorThreshold() == 0) {
parameters.setErrorThreshold(DEFAULT_ERROR_THRESHOLD);
}
if (parameters.getClientRebalanceInterval() <= 0) {
parameters.setClientRebalanceInterval(DEFAULT_REBALANCE_INTERVAL);
}
if (parameters.getGracefulCloseOnSwitchTimeout() <= 0) {
parameters.setGracefulCloseOnSwitchTimeout(DEFAULT_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT);
}
if (parameters.getHealthCheckTimeout() <= 0) {
parameters.setHealthCheckTimeout(DEFAULT_HEALTHCHECK_TIMEOUT);
}
if (parameters.getNodeDialTimeout() <= 0) {
parameters.setNodeDialTimeout(DEFAULT_DIAL_TIMEOUT);
}
if (parameters.getNodeStreamTimeout() <= 0) {
parameters.setNodeStreamTimeout(DEFAULT_STREAM_TIMEOUT);
}
if (parameters.getSessionExpirationDuration() == 0) {
parameters.setSessionExpirationDuration(DEFAULT_SESSION_TOKEN_EXPIRATION_DURATION);
}
if (parameters.getClientBuilder() == null) {
parameters.setClientBuilder(address -> {
WrapperPrm wrapperPrm = new WrapperPrm();
wrapperPrm.setAddress(address);
wrapperPrm.setKey(parameters.getKey());
wrapperPrm.setLogger(parameters.getLogger());
wrapperPrm.setDialTimeout(parameters.getNodeDialTimeout());
wrapperPrm.setStreamTimeout(parameters.getNodeStreamTimeout());
wrapperPrm.setErrorThreshold(parameters.getErrorThreshold());
wrapperPrm.setGracefulCloseOnSwitchTimeout(parameters.getGracefulCloseOnSwitchTimeout());
wrapperPrm.setInterceptors(parameters.getInterceptors());
return new ClientWrapper(wrapperPrm, pool);
});
}
}
private static SessionToken initSessionForDuration(CallContext ctx, ClientWrapper cw, long duration) {
var client = cw.getClient();
NetworkSettings networkInfo = client.getNetworkSettings(ctx);
long epoch = networkInfo.getEpochDuration();
long exp = (Long.MAX_VALUE - epoch < duration) ? Long.MAX_VALUE : (epoch + duration);
return client.createSession(new PrmSessionCreate(exp), ctx);
}
public static String formCacheKey(String address, String key) {
return address + key;
}
@Override
public String dial(CallContext ctx) {
InnerPool[] inner = new InnerPool[rebalanceParams.getNodesParams().length];
boolean atLeastOneHealthy = false;
int i = 0;
for (NodesParam nodeParams : rebalanceParams.getNodesParams()) {
ClientWrapper[] clients = new ClientWrapper[nodeParams.getWeight().size()];
for (int j = 0; j < nodeParams.getAddress().size(); j++) {
ClientWrapper client = clients[j] = clientBuilder.apply(nodeParams.getAddress().get(j));
boolean dialed = false;
try {
client.dial(ctx);
dialed = true;
SessionToken token = initSessionForDuration(
ctx, client, rebalanceParams.getSessionExpirationDuration()
);
String cacheKey = formCacheKey(
nodeParams.getAddress().get(j),
getHexString(key.getPublicKeyByte())
);
sessionCache.setValue(cacheKey, token);
atLeastOneHealthy = true;
} catch (ValidationFrostFSException exp) {
break;
} catch (Exception exp) {
if (!dialed) {
client.setUnhealthyOnDial();
} else {
client.setUnhealthy();
}
if (logger != null) {
FrostFSMessages
.sessionCreationError(logger, client.getWrapperPrm().getAddress(), exp.getMessage());
}
}
}
Sampler sampler = new Sampler(nodeParams.getWeight().stream().mapToDouble(Double::doubleValue).toArray());
inner[i] = new InnerPool(sampler, clients);
i++;
}
if (!atLeastOneHealthy) {
return POOL_NODES_UNHEALTHY;
}
this.innerPools = inner;
NetworkSettings networkSettings = getNetworkSettings(ctx);
this.maxObjectSize = networkSettings.getMaxObjectSize();
startRebalance(ctx);
return null;
}
private ClientWrapper connection() {
for (InnerPool pool : innerPools) {
ClientWrapper client = pool.connection();
if (client != null) {
return client;
}
}
throw new FrostFSException(POOL_CLIENTS_UNHEALTHY);
}
public void close() {
if (innerPools != null) {
for (InnerPool innerPool : innerPools) {
for (ClientWrapper client : innerPool.getClients()) {
if (client.isDialed()) {
client.getClient().close();
}
}
}
}
}
public void startRebalance(CallContext ctx) {
double[][] buffers = new double[rebalanceParams.getNodesParams().length][];
for (int i = 0; i < rebalanceParams.getNodesParams().length; i++) {
NodesParam parameters = rebalanceParams.getNodesParams()[i];
buffers[i] = new double[parameters.getWeight().size()];
CompletableFuture.runAsync(() -> {
WaitUtil.sleep(rebalanceParams.getClientRebalanceInterval());
updateNodesHealth(ctx, buffers);
});
}
}
private void updateNodesHealth(CallContext ctx, double[][] buffers) {
CompletableFuture<?>[] tasks = new CompletableFuture<?>[innerPools.length];
for (int i = 0; i < innerPools.length; i++) {
double[] bufferWeights = buffers[i];
int finalI = i;
tasks[i] = CompletableFuture.runAsync(() -> updateInnerNodesHealth(ctx, finalI, bufferWeights));
}
CompletableFuture.allOf(tasks).join();
}
private void updateInnerNodesHealth(CallContext ctx, int poolIndex, double[] bufferWeights) {
if (poolIndex > innerPools.length - 1) {
return;
}
InnerPool pool = innerPools[poolIndex];
RebalanceParameters options = rebalanceParams;
int[] healthyChanged = {0};
CompletableFuture<?>[] tasks = new CompletableFuture<?>[pool.getClients().length];
for (int j = 0; j < pool.getClients().length; j++) {
ClientWrapper client = pool.getClients()[j];
AtomicBoolean healthy = new AtomicBoolean(false);
AtomicReference<String> error = new AtomicReference<>();
AtomicBoolean changed = new AtomicBoolean(false);
int finalJ = j;
tasks[j] = client.restartIfUnhealthy(ctx).handle((unused, throwable) -> {
if (throwable != null) {
error.set(throwable.getMessage());
bufferWeights[finalJ] = 0;
sessionCache.deleteByPrefix(client.getAddress());
} else {
changed.set(unused);
healthy.set(true);
bufferWeights[finalJ] = options.getNodesParams()[poolIndex].getWeight().get(finalJ);
}
return null;
}).thenRun(() -> {
if (changed.get()) {
if (error.get() != null && logger != null) {
FrostFSMessages.healthChanged(logger, client.getAddress(), healthy.get(), error.get());
}
healthyChanged[0] = 1;
}
});
}
CompletableFuture.allOf(tasks).thenRun(() -> {
if (healthyChanged[0] == 1) {
double[] probabilities = adjustWeights(bufferWeights);
lock.lock();
try {
pool.setSampler(new Sampler(probabilities));
} finally {
lock.unlock();
}
}
});
}
private boolean checkSessionTokenErr(Exception error, String address) {
if (error == null) {
return false;
}
if (error instanceof SessionNotFoundFrostFSException || error instanceof SessionExpiredFrostFSException) {
sessionCache.deleteByPrefix(address);
return true;
}
return false;
}
public Statistic statistic() {
if (innerPools == null) {
throw new ValidationFrostFSException(POOL_NOT_DIALED);
}
Statistic statistics = new Statistic();
for (InnerPool inner : innerPools) {
int valueIndex = 0;
String[] nodes = new String[inner.getClients().length];
lock.lock();
try {
for (ClientWrapper client : inner.getClients()) {
if (client.isHealthy()) {
nodes[valueIndex] = client.getAddress();
}
NodeStatistic node = new NodeStatistic();
node.setAddress(client.getAddress());
node.setMethods(client.getMethodsStatus());
node.setOverallErrors(client.getOverallErrorRate());
node.setCurrentErrors(client.getCurrentErrorRate());
statistics.getNodes().add(node);
valueIndex++;
statistics.setOverallErrors(statistics.getOverallErrors() + node.getOverallErrors());
}
if (statistics.getCurrentNodes() == null || statistics.getCurrentNodes().length == 0) {
statistics.setCurrentNodes(nodes);
}
} finally {
lock.unlock();
}
}
return statistics;
}
@Override
public Container getContainer(PrmContainerGet args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getContainer(args, ctx);
}
@Override
public List<ContainerId> listContainers(PrmContainerGetAll args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().listContainers(args, ctx);
}
@Override
public ContainerId createContainer(PrmContainerCreate args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().createContainer(args, ctx);
}
@Override
public void deleteContainer(PrmContainerDelete args, CallContext ctx) {
ClientWrapper client = connection();
client.getClient().deleteContainer(args, ctx);
}
@Override
public ObjectHeaderResult getObjectHead(PrmObjectHeadGet args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getObjectHead(args, ctx);
}
@Override
public ObjectFrostFS getObject(PrmObjectGet args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getObject(args, ctx);
}
@Override
public ObjectWriter putObject(PrmObjectPut args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().putObject(args, ctx);
}
@Override
public ObjectId putClientCutObject(PrmObjectClientCutPut args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().putClientCutObject(args, ctx);
}
@Override
public ObjectId putSingleObject(PrmObjectSinglePut args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().putSingleObject(args, ctx);
}
@Override
public void deleteObject(PrmObjectDelete args, CallContext ctx) {
ClientWrapper client = connection();
client.getClient().deleteObject(args, ctx);
}
@Override
public Iterable<ObjectId> searchObjects(PrmObjectSearch args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().searchObjects(args, ctx);
}
@Override
public RangeReader getRange(PrmRangeGet args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getRange(args, ctx);
}
@Override
public byte[][] getRangeHash(PrmRangeHashGet args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getRangeHash(args, ctx);
}
@Override
public ObjectId patchObject(PrmObjectPatch args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().patchObject(args, ctx);
}
@Override
public byte[] addChain(PrmApeChainAdd args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().addChain(args, ctx);
}
@Override
public void removeChain(PrmApeChainRemove args, CallContext ctx) {
ClientWrapper client = connection();
client.getClient().removeChain(args, ctx);
}
@Override
public List<frostfs.ape.Types.Chain> listChains(PrmApeChainList args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().listChains(args, ctx);
}
@Override
public NetmapSnapshot getNetmapSnapshot(CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getNetmapSnapshot(ctx);
}
@Override
public NodeInfo getLocalNodeInfo(CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getLocalNodeInfo(ctx);
}
@Override
public NetworkSettings getNetworkSettings(CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getNetworkSettings(ctx);
}
@Override
public SessionToken createSession(PrmSessionCreate args, CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().createSession(args, ctx);
}
@Override
public ObjectId calculateObjectId(ObjectHeader header) {
ClientWrapper client = connection();
return client.getClient().calculateObjectId(header);
}
@Override
public frostfs.accounting.Types.Decimal getBalance(CallContext ctx) {
ClientWrapper client = connection();
return client.getClient().getBalance(ctx);
}
}