Compare commits

...

9 commits

Author SHA1 Message Date
02934f49e5 [#1] Update comment lines
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-05-16 16:30:41 +03:00
b4d5d84f21 [#1] Rename files with mentions of previous project
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-05-16 16:05:42 +03:00
b9baebbed7 [#74] tree: Simplify retry
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
0c1e17dca4 [#74] Add round tree retry
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
18c7d669e0 [#74] Update docs
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
cab758d8ce [#74] service/tree: Add logger
Log error instead of failing when multiple unversioned nodes are found

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
b60afd88c4 [#74] Support multiple tree endpoints
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
5d4304e204 [#83] Don't create extra delete marker
We shouldn't create delete marker if:
1. object doesn't exist at all
2. last version is already a delete marker

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-19 17:55:52 +03:00
17b8905b24 [#91] Update values for health metric
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-18 10:25:47 +03:00
17 changed files with 391 additions and 72 deletions

View file

@ -8,6 +8,7 @@ This document outlines major changes between releases.
- Clean up List and Name caches when object is missing in Tree service (#57)
- Get empty bucket CORS from frostfs (TrueCloudLab#36)
- Don't count pool error on client abort (#35)
- Don't create unnecessary delete-markers (#83)
### Added
- Return `X-Owner-Id` in `head-bucket` response (#79)
@ -29,6 +30,8 @@ This document outlines major changes between releases.
- Limit number of objects to delete at one time (TrueCloudLab#37)
- CompleteMultipartUpload handler now sends whitespace characters to keep alive client's connection (#60)
- Support new system attributes (#64)
- Changed values for `frostfs_s3_gw_state_health` metric (#91)
- Support multiple tree service endpoints (#74)
## [0.26.0] - 2022-12-28

View file

@ -147,6 +147,81 @@ func TestRemoveDeleteMarker(t *testing.T) {
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
}
func TestDeleteMarkerVersioned(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
createVersionedBucketAndObject(t, tc, bktName, objName)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions := listVersions(t, tc, bktName)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions = listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
}
func TestDeleteMarkerSuspended(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName)
putBucketVersioning(t, tc, bktName, false)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
t.Run("remove last unversioned non delete marker", func(t *testing.T) {
objName := "obj3"
putObject(t, tc, bktName, objName)
nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName)
require.NoError(t, err)
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
objVersions := getVersion(listVersions(t, tc, bktName), objName)
require.Len(t, objVersions, 0)
require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID))
})
}
func TestDeleteObjectCombined(t *testing.T) {
tc := prepareHandlerContext(t)
@ -197,7 +272,7 @@ func TestDeleteMarkers(t *testing.T) {
deleteObject(t, tc, bktName, objName, emptyVersion)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length")
require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length")
require.Len(t, versions.Version, 0, "versions must be empty")
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
@ -316,6 +391,16 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects
return res
}
func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse {
var res []*ObjectVersionResponse
for i, version := range resp.Version {
if version.Key == objName {
res = append(res, &resp.Version[i])
}
}
return res
}
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(tc, bktName, objName, body)

View file

@ -33,6 +33,7 @@ type handlerContext struct {
t *testing.T
h *handler
tp *layer.TestFrostFS
tree *tree.Tree
context context.Context
}
@ -85,11 +86,13 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
treeMock := NewTreeServiceMock(t)
layerCfg := &layer.Config{
Caches: layer.DefaultCachesConfigs(zap.NewExample()),
AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver,
TreeService: NewTreeServiceMock(t),
TreeService: treeMock,
}
var pp netmap.PlacementPolicy
@ -110,6 +113,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
t: t,
h: h,
tp: tp,
tree: treeMock,
context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)),
}
}
@ -117,7 +121,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
func NewTreeServiceMock(t *testing.T) *tree.Tree {
memCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err)
return tree.NewTree(memCli)
return tree.NewTree(memCli, zap.NewExample())
}
func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {

View file

@ -15,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
@ -55,6 +56,16 @@ func (t *TestFrostFS) Objects() []*object.Object {
return res
}
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
for _, obj := range t.objects {
if id, _ := obj.ID(); id.Equals(objID) {
return true
}
}
return false
}
func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
t.objects[key] = obj
}
@ -161,7 +172,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
}, nil
}
return nil, fmt.Errorf("object not found %s", addr)
return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr)
}
func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {

View file

@ -561,21 +561,38 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj
}
var newVersion *data.NodeVersion
lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
if err != nil {
obj.Error = err
return n.handleNotFoundError(bkt, obj)
}
if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return n.handleNotFoundError(bkt, obj)
var nullVersionToDelete *data.NodeVersion
if lastVersion.IsUnversioned {
if !lastVersion.IsDeleteMarker() {
nullVersionToDelete = lastVersion
}
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
if !isNotFoundError(obj.Error) {
return obj
}
}
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
if nullVersionToDelete != nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
return obj
}
}
}
if lastVersion.IsDeleteMarker() {
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
return obj
}
randOID, err := getRandomOID()
if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
@ -584,7 +601,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
obj.DeleteMarkVersion = randOID.EncodeToString()
newVersion = &data.NodeVersion{
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: obj.Name,
@ -606,8 +623,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
}
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) ||
errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
if isNotFoundError(obj.Error) {
obj.Error = nil
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
@ -616,6 +632,11 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
return obj
}
func isNotFoundError(err error) bool {
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
}
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,
@ -627,6 +648,17 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo
return n.getNodeVersion(ctx, objVersion)
}
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: "",
NoErrorOnDeleteMarker: true,
}
return n.getNodeVersion(ctx, objVersion)
}
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.IsDeleteMarker() {
return obj.VersionID, nil

View file

@ -113,14 +113,14 @@ func (a *App) init(ctx context.Context) {
func (a *App) initLayer(ctx context.Context) {
a.initResolver()
treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint)
treeServiceEndpoint := a.cfg.GetStringSlice(cfgTreeServiceEndpoint)
grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, grpcDialOpt)
treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, a.log, grpcDialOpt)
if err != nil {
a.log.Fatal("failed to create tree service", zap.Error(err))
}
treeService := tree.NewTree(treeGRPCClient)
a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint))
treeService := tree.NewTree(treeGRPCClient, a.log)
a.log.Info("init tree service", zap.Strings("endpoints", treeGRPCClient.Endpoints()))
// prepare random key for anonymous requests
randomKey, err := keys.NewPrivateKey()
@ -182,6 +182,7 @@ func (a *App) initAPI(ctx context.Context) {
func (a *App) initMetrics() {
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting)
}
func (a *App) initResolver() {
@ -369,7 +370,7 @@ func (a *App) Wait() {
}
func (a *App) setHealthStatus() {
a.metrics.SetHealth(1)
a.metrics.SetHealth(metrics.HealthStatusReady)
}
// Serve runs HTTP server to handle S3 API requests.

View file

@ -42,8 +42,8 @@ S3_GW_CONFIG=/path/to/config/yaml
# Logger
S3_GW_LOGGER_LEVEL=debug
# Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section).
S3_GW_TREE_SERVICE=grpc://s01.frostfs.devenv:8080
# Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used.
S3_GW_TREE_SERVICE=grpc://s01.frostfs.devenv:8080 grpc://s02.frostfs.devenv:8080
# RPC endpoint and order of resolving of bucket names
S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/

View file

@ -44,9 +44,11 @@ listen_domains:
logger:
level: debug
# Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section).
# Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used.
tree:
service: node1.frostfs:8080
service:
- node1.frostfs:8080
- node2.frostfs:8080
# RPC endpoint and order of resolving of bucket names
rpc_endpoint: http://morph-chain.frostfs.devenv:30333

View file

@ -337,14 +337,23 @@ logger:
### `tree` section
```yaml
tree:
service:
- s01.frostfs.devenv:8080
- s02.frostfs.devenv:8080
```
If you use only one endpoint, it can be provided as:
```yaml
tree:
service: s01.frostfs.devenv:8080
```
| Parameter | Type | Default value | Description |
|-----------|----------|---------------|------------------------------------------------------------------------------------------------------------|
| `service` | `string` | | Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section). |
| Parameter | Type | Default value | Description |
|-----------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------|
| `service` | `[]string` | | Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used. |
### `cache` section

1
go.mod
View file

@ -31,6 +31,7 @@ require (
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"strings"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
@ -14,7 +15,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type GetNodeByPathResponseInfoWrapper struct {
@ -66,27 +70,57 @@ func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
}
type ServiceClientGRPC struct {
key *keys.PrivateKey
key *keys.PrivateKey
log *zap.Logger
clients []treeClient
startIndex int32
}
type treeClient struct {
address string
conn *grpc.ClientConn
service grpcService.TreeServiceClient
}
func NewTreeServiceClientGRPC(ctx context.Context, addr string, key *keys.PrivateKey, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
conn, err := grpc.Dial(addr, grpcOpts...)
if err != nil {
return nil, fmt.Errorf("did not connect: %v", err)
func (c *ServiceClientGRPC) Endpoints() []string {
res := make([]string, len(c.clients))
for i, client := range c.clients {
res[i] = client.address
}
return res
}
func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
res := &ServiceClientGRPC{
key: key,
log: log,
}
c := grpcService.NewTreeServiceClient(conn)
if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
return nil, fmt.Errorf("healthcheck: %w", err)
for _, addr := range endpoints {
conn, err := grpc.Dial(addr, grpcOpts...)
if err != nil {
log.Warn("dial node tree service", zap.String("address", addr), zap.Error(err))
continue
}
c := grpcService.NewTreeServiceClient(conn)
if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
log.Warn("healthcheck tree service", zap.String("address", addr), zap.Error(err))
continue
}
res.clients = append(res.clients, treeClient{
address: addr,
conn: conn,
service: c,
})
}
return &ServiceClientGRPC{
key: key,
conn: conn,
service: c,
}, nil
if len(res.clients) == 0 {
return nil, errors.New("no healthy tree grpc client")
}
return res, nil
}
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
@ -112,9 +146,12 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
return nil, err
}
resp, err := c.service.GetNodeByPath(ctx, request)
if err != nil {
return nil, handleError("failed to get node by path", err)
var resp *grpcService.GetNodeByPathResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.GetNodeByPath(ctx, request)
return handleError("failed to get node by path", inErr)
}); err != nil {
return nil, err
}
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
@ -145,9 +182,12 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
return nil, err
}
cli, err := c.service.GetSubTree(ctx, request)
if err != nil {
return nil, handleError("failed to get sub tree client", err)
var cli grpcService.TreeService_GetSubTreeClient
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
cli, inErr = client.service.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr)
}); err != nil {
return nil, err
}
var subtree []tree.NodeResponse
@ -183,9 +223,12 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
return 0, err
}
resp, err := c.service.Add(ctx, request)
if err != nil {
return 0, handleError("failed to add node", err)
var resp *grpcService.AddResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.Add(ctx, request)
return handleError("failed to add node", inErr)
}); err != nil {
return 0, err
}
return resp.GetBody().GetNodeId(), nil
@ -212,9 +255,12 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
return 0, err
}
resp, err := c.service.AddByPath(ctx, request)
if err != nil {
return 0, handleError("failed to add node by path", err)
var resp *grpcService.AddByPathResponse
if err := c.requestWithRetry(func(client treeClient) (inErr error) {
resp, inErr = client.service.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr)
}); err != nil {
return 0, err
}
body := resp.GetBody()
@ -249,11 +295,12 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
return err
}
if _, err := c.service.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
return c.requestWithRetry(func(client treeClient) error {
if _, err := client.service.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
return nil
})
}
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
@ -274,11 +321,55 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
return err
}
if _, err := c.service.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
return c.requestWithRetry(func(client treeClient) error {
if _, err := client.service.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
return nil
})
}
func (c *ServiceClientGRPC) requestWithRetry(fn func(client treeClient) error) (err error) {
start := int(atomic.LoadInt32(&c.startIndex))
for i := start; i < start+len(c.clients); i++ {
index := i % len(c.clients)
err = fn(c.clients[index])
if !shouldTryAgain(err) {
atomic.StoreInt32(&c.startIndex, int32(index))
return err
}
c.log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err))
}
return nil
return err
}
func shouldTryAgain(err error) bool {
if err == nil {
return false
}
code := status.Code(unwrapErr(err))
if code == codes.Unavailable || code == codes.Unimplemented {
return true
}
errText := err.Error()
if strings.Contains(errText, "not found") ||
strings.Contains(errText, "shard is in read-only mode") ||
strings.Contains(errText, "shard is in degraded mode") {
return true
}
return false
}
func unwrapErr(err error) error {
for e := errors.Unwrap(err); e != nil; e = errors.Unwrap(err) {
err = e
}
return err
}
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
@ -303,6 +394,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
}
func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "is denied by") {

View file

@ -7,8 +7,6 @@ import (
)
func (c *ServiceClientGRPC) signData(buf []byte, f func(key, sign []byte)) error {
// crypto package should not be used outside of API libraries (see neofs-node#491).
// For now tree service does not include into SDK Client nor SDK Pool, so there is no choice.
// When SDK library adopts Tree service client, this should be dropped.
sign, err := crypto.Sign(&c.key.PrivateKey, buf)
if err != nil {

View file

@ -2,10 +2,12 @@ package services
import (
"errors"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestHandleError(t *testing.T) {
@ -33,3 +35,58 @@ func TestHandleError(t *testing.T) {
})
}
}
func TestRetry(t *testing.T) {
cl := &ServiceClientGRPC{
log: zaptest.NewLogger(t),
clients: []treeClient{
{address: "node0"},
{address: "node1"},
{address: "node2"},
{address: "node3"},
},
}
makeFn := func(shouldFail []string) func(treeClient) error {
return func(client treeClient) error {
for _, item := range shouldFail {
if item == client.address {
return errors.New("not found")
}
}
return nil
}
}
t.Run("first ok", func(t *testing.T) {
err := cl.requestWithRetry(makeFn([]string{}))
require.NoError(t, err)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("first failed", func(t *testing.T) {
err := cl.requestWithRetry(makeFn([]string{"node0"}))
require.NoError(t, err)
require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("all failed", func(t *testing.T) {
err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"}))
require.Error(t, err)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
t.Run("round", func(t *testing.T) {
err := cl.requestWithRetry(makeFn([]string{"node0", "node1"}))
require.NoError(t, err)
require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex)))
err = cl.requestWithRetry(makeFn([]string{"node2", "node3"}))
require.NoError(t, err)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex)))
atomic.StoreInt32(&cl.startIndex, 0)
})
}

View file

@ -37,7 +37,7 @@ func (m *AppMetrics) SetEnabled(enabled bool) {
m.mu.Unlock()
}
func (m *AppMetrics) SetHealth(status int32) {
func (m *AppMetrics) SetHealth(status HealthStatus) {
if !m.isEnabled() {
return
}
@ -48,7 +48,7 @@ func (m *AppMetrics) SetHealth(status int32) {
func (m *AppMetrics) Shutdown() {
m.mu.Lock()
if m.enabled {
m.gate.State.SetHealth(0)
m.gate.State.SetHealth(HealthStatusShuttingDown)
m.enabled = false
}
m.gate.Unregister()

View file

@ -4,6 +4,16 @@ import "github.com/prometheus/client_golang/prometheus"
const stateSubsystem = "state"
// HealthStatus of the gate application.
type HealthStatus int32
const (
HealthStatusUndefined HealthStatus = 0
HealthStatusStarting HealthStatus = 1
HealthStatusReady HealthStatus = 2
HealthStatusShuttingDown HealthStatus = 3
)
type stateMetrics struct {
healthCheck prometheus.Gauge
}
@ -27,6 +37,6 @@ func (m stateMetrics) unregister() {
prometheus.Unregister(m.healthCheck)
}
func (m stateMetrics) SetHealth(s int32) {
func (m stateMetrics) SetHealth(s HealthStatus) {
m.healthCheck.Set(float64(s))
}

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"
@ -12,11 +13,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
)
type (
Tree struct {
service ServiceClient
log *zap.Logger
}
// ServiceClient is a client to interact with tree service.
@ -104,8 +107,11 @@ const (
)
// NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient) *Tree {
return &Tree{service: service}
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{
service: service,
log: log,
}
}
type Meta interface {
@ -811,14 +817,19 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
return nil, err
}
if len(nodes) > 1 {
return nil, fmt.Errorf("found more than one unversioned node")
}
if len(nodes) != 1 {
if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound
}
if len(nodes) > 1 {
c.log.Debug("found more than one unversioned node", zap.Stringer("cid", bktInfo.CID),
zap.String("treeID", treeID), zap.String("filepath", filepath))
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Timestamp > nodes[j].Timestamp
})
return nodes[0], nil
}

View file

@ -8,6 +8,7 @@ import (
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestLockConfigurationEncoding(t *testing.T) {
@ -102,7 +103,7 @@ func TestTreeServiceSettings(t *testing.T) {
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli)
treeService := NewTree(memCli, zaptest.NewLogger(t))
bktInfo := &data.BucketInfo{
CID: cidtest.ID(),
@ -134,7 +135,7 @@ func TestTreeServiceAddVersion(t *testing.T) {
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli)
treeService := NewTree(memCli, zaptest.NewLogger(t))
bktInfo := &data.BucketInfo{
CID: cidtest.ID(),