forked from TrueCloudLab/frostfs-sdk-java
131 lines
3.7 KiB
Java
131 lines
3.7 KiB
Java
package info.frostfs.sdk.pool;
|
|
|
|
import info.frostfs.sdk.FrostFSClient;
|
|
import info.frostfs.sdk.enums.MethodIndex;
|
|
import info.frostfs.sdk.exceptions.ResponseFrostFSException;
|
|
import info.frostfs.sdk.exceptions.ValidationFrostFSException;
|
|
import info.frostfs.sdk.jdo.parameters.CallContext;
|
|
import info.frostfs.sdk.utils.WaitUtil;
|
|
import lombok.AccessLevel;
|
|
import lombok.Getter;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
import static info.frostfs.sdk.constants.ErrorConst.POOL_CLIENT_UNHEALTHY;
|
|
|
|
@Getter
|
|
public class ClientWrapper extends ClientStatusMonitor {
|
|
@Getter(value = AccessLevel.NONE)
|
|
private final Lock lock = new ReentrantLock();
|
|
private final SessionCache sessionCache;
|
|
private final WrapperPrm wrapperPrm;
|
|
private FrostFSClient client;
|
|
|
|
public ClientWrapper(WrapperPrm wrapperPrm, Pool pool) {
|
|
super(wrapperPrm.getLogger(), wrapperPrm.getAddress());
|
|
this.wrapperPrm = wrapperPrm;
|
|
setErrorThreshold(wrapperPrm.getErrorThreshold());
|
|
|
|
this.sessionCache = pool.getSessionCache();
|
|
this.client = new FrostFSClient(wrapperPrm, sessionCache);
|
|
}
|
|
|
|
|
|
public FrostFSClient getClient() {
|
|
lock.lock();
|
|
try {
|
|
if (isHealthy()) {
|
|
return client;
|
|
}
|
|
return null;
|
|
} finally {
|
|
lock.unlock();
|
|
}
|
|
}
|
|
|
|
public void dial(CallContext ctx) {
|
|
FrostFSClient client = getClient();
|
|
if (client == null) {
|
|
throw new ValidationFrostFSException(POOL_CLIENT_UNHEALTHY);
|
|
}
|
|
client.dial(ctx);
|
|
}
|
|
|
|
public void handleError(Exception exp) {
|
|
if (exp instanceof ResponseFrostFSException && ((ResponseFrostFSException) exp).getStatus() != null) {
|
|
switch (((ResponseFrostFSException) exp).getStatus().getCode()) {
|
|
case INTERNAL:
|
|
case WRONG_MAGIC_NUMBER:
|
|
case SIGNATURE_VERIFICATION_FAILURE:
|
|
case NODE_UNDER_MAINTENANCE:
|
|
incErrorRate();
|
|
}
|
|
return;
|
|
}
|
|
|
|
incErrorRate();
|
|
}
|
|
|
|
private void scheduleGracefulClose() {
|
|
if (client == null) {
|
|
return;
|
|
}
|
|
|
|
WaitUtil.sleep(wrapperPrm.getGracefulCloseOnSwitchTimeout());
|
|
client.close();
|
|
}
|
|
|
|
public CompletableFuture<Boolean> restartIfUnhealthy(CallContext ctx) {
|
|
try {
|
|
client.getLocalNodeInfo(ctx);
|
|
return CompletableFuture.completedFuture(false);
|
|
} catch (Exception ignored) {
|
|
}
|
|
|
|
if (isDialed()) {
|
|
scheduleGracefulClose();
|
|
}
|
|
|
|
return CompletableFuture.completedFuture(restartClient(ctx));
|
|
}
|
|
|
|
private boolean restartClient(CallContext ctx) {
|
|
FrostFSClient newClient = null;
|
|
try {
|
|
newClient = new FrostFSClient(wrapperPrm, sessionCache);
|
|
|
|
var error = newClient.dial(ctx);
|
|
if (StringUtils.isNotBlank(error)) {
|
|
setUnhealthyOnDial();
|
|
newClient.close();
|
|
return true;
|
|
}
|
|
|
|
lock.lock();
|
|
client = newClient;
|
|
lock.unlock();
|
|
} catch (Exception exp) {
|
|
if (newClient != null) {
|
|
newClient.close();
|
|
}
|
|
}
|
|
|
|
try {
|
|
client.getLocalNodeInfo(ctx);
|
|
} catch (Exception exp) {
|
|
setUnhealthy();
|
|
return true;
|
|
}
|
|
|
|
setHealthy();
|
|
return false;
|
|
}
|
|
|
|
public void incRequests(long elapsed, MethodIndex method) {
|
|
var methodStat = getMethods()[method.ordinal()];
|
|
methodStat.incRequests(elapsed);
|
|
}
|
|
}
|