Compare commits

...

13 commits

Author SHA1 Message Date
7edd827706 [#114] tree: Fix retry tests
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-05-19 17:24:08 +03:00
c8f3dc17e0 [#114] tree: Don't ignore unhealthy endpoints
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-05-19 17:24:02 +03:00
efdde64c23 [#110] tree: Add more logs for switching tree endpoints
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-05-19 15:22:31 +03:00
8b41fbeed0 [#110] tree: Update errors to switch endpoint
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-05-19 15:22:28 +03:00
02934f49e5 [#1] Update comment lines
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-05-16 16:30:41 +03:00
b4d5d84f21 [#1] Rename files with mentions of previous project
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-05-16 16:05:42 +03:00
b9baebbed7 [#74] tree: Simplify retry
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
0c1e17dca4 [#74] Add round tree retry
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
18c7d669e0 [#74] Update docs
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
cab758d8ce [#74] service/tree: Add logger
Log error instead of failing when multiple unversioned nodes are found

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
b60afd88c4 [#74] Support multiple tree endpoints
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-21 12:02:42 +03:00
5d4304e204 [#83] Don't create extra delete marker
We shouldn't create delete marker if:
1. object doesn't exist at all
2. last version is already a delete marker

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-19 17:55:52 +03:00
17b8905b24 [#91] Update values for health metric
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-04-18 10:25:47 +03:00
18 changed files with 505 additions and 74 deletions

View file

@ -8,6 +8,7 @@ This document outlines major changes between releases.
- Clean up List and Name caches when object is missing in Tree service (#57) - Clean up List and Name caches when object is missing in Tree service (#57)
- Get empty bucket CORS from frostfs (TrueCloudLab#36) - Get empty bucket CORS from frostfs (TrueCloudLab#36)
- Don't count pool error on client abort (#35) - Don't count pool error on client abort (#35)
- Don't create unnecessary delete-markers (#83)
### Added ### Added
- Return `X-Owner-Id` in `head-bucket` response (#79) - Return `X-Owner-Id` in `head-bucket` response (#79)
@ -29,6 +30,8 @@ This document outlines major changes between releases.
- Limit number of objects to delete at one time (TrueCloudLab#37) - Limit number of objects to delete at one time (TrueCloudLab#37)
- CompleteMultipartUpload handler now sends whitespace characters to keep alive client's connection (#60) - CompleteMultipartUpload handler now sends whitespace characters to keep alive client's connection (#60)
- Support new system attributes (#64) - Support new system attributes (#64)
- Changed values for `frostfs_s3_gw_state_health` metric (#91)
- Support multiple tree service endpoints (#74)
## [0.26.0] - 2022-12-28 ## [0.26.0] - 2022-12-28

View file

@ -147,6 +147,81 @@ func TestRemoveDeleteMarker(t *testing.T) {
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
} }
func TestDeleteMarkerVersioned(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
createVersionedBucketAndObject(t, tc, bktName, objName)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions := listVersions(t, tc, bktName)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions = listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
}
func TestDeleteMarkerSuspended(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName)
putBucketVersioning(t, tc, bktName, false)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
t.Run("remove last unversioned non delete marker", func(t *testing.T) {
objName := "obj3"
putObject(t, tc, bktName, objName)
nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName)
require.NoError(t, err)
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
objVersions := getVersion(listVersions(t, tc, bktName), objName)
require.Len(t, objVersions, 0)
require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID))
})
}
func TestDeleteObjectCombined(t *testing.T) { func TestDeleteObjectCombined(t *testing.T) {
tc := prepareHandlerContext(t) tc := prepareHandlerContext(t)
@ -197,7 +272,7 @@ func TestDeleteMarkers(t *testing.T) {
deleteObject(t, tc, bktName, objName, emptyVersion) deleteObject(t, tc, bktName, objName, emptyVersion)
versions := listVersions(t, tc, bktName) versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length") require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length")
require.Len(t, versions.Version, 0, "versions must be empty") require.Len(t, versions.Version, 0, "versions must be empty")
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs") require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
@ -316,6 +391,16 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects
return res return res
} }
func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse {
var res []*ObjectVersionResponse
for i, version := range resp.Version {
if version.Key == objName {
res = append(res, &resp.Version[i])
}
}
return res
}
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) { func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
body := bytes.NewReader([]byte("content")) body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(tc, bktName, objName, body) w, r := prepareTestPayloadRequest(tc, bktName, objName, body)

View file

@ -33,6 +33,7 @@ type handlerContext struct {
t *testing.T t *testing.T
h *handler h *handler
tp *layer.TestFrostFS tp *layer.TestFrostFS
tree *tree.Tree
context context.Context context context.Context
} }
@ -85,11 +86,13 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
var owner user.ID var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey) user.IDFromKey(&owner, key.PrivateKey.PublicKey)
treeMock := NewTreeServiceMock(t)
layerCfg := &layer.Config{ layerCfg := &layer.Config{
Caches: layer.DefaultCachesConfigs(zap.NewExample()), Caches: layer.DefaultCachesConfigs(zap.NewExample()),
AnonKey: layer.AnonymousKey{Key: key}, AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver, Resolver: testResolver,
TreeService: NewTreeServiceMock(t), TreeService: treeMock,
} }
var pp netmap.PlacementPolicy var pp netmap.PlacementPolicy
@ -110,6 +113,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
t: t, t: t,
h: h, h: h,
tp: tp, tp: tp,
tree: treeMock,
context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)), context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)),
} }
} }
@ -117,7 +121,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
func NewTreeServiceMock(t *testing.T) *tree.Tree { func NewTreeServiceMock(t *testing.T) *tree.Tree {
memCli, err := tree.NewTreeServiceClientMemory() memCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err) require.NoError(t, err)
return tree.NewTree(memCli) return tree.NewTree(memCli, zap.NewExample())
} }
func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo { func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {

View file

@ -15,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
@ -55,6 +56,16 @@ func (t *TestFrostFS) Objects() []*object.Object {
return res return res
} }
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
for _, obj := range t.objects {
if id, _ := obj.ID(); id.Equals(objID) {
return true
}
}
return false
}
func (t *TestFrostFS) AddObject(key string, obj *object.Object) { func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
t.objects[key] = obj t.objects[key] = obj
} }
@ -161,7 +172,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
}, nil }, nil
} }
return nil, fmt.Errorf("object not found %s", addr) return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr)
} }
func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) { func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {

View file

@ -561,21 +561,38 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj return obj
} }
var newVersion *data.NodeVersion lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
if err != nil {
obj.Error = err
return n.handleNotFoundError(bkt, obj)
}
if settings.VersioningSuspended() { if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID obj.VersionID = data.UnversionedObjectVersionID
var nodeVersion *data.NodeVersion var nullVersionToDelete *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { if lastVersion.IsUnversioned {
return n.handleNotFoundError(bkt, obj) if !lastVersion.IsDeleteMarker() {
nullVersionToDelete = lastVersion
} }
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { if !isNotFoundError(obj.Error) {
return obj return obj
} }
} }
if nullVersionToDelete != nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
return obj
}
}
}
if lastVersion.IsDeleteMarker() {
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
return obj
}
randOID, err := getRandomOID() randOID, err := getRandomOID()
if err != nil { if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err) obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
@ -584,7 +601,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
obj.DeleteMarkVersion = randOID.EncodeToString() obj.DeleteMarkVersion = randOID.EncodeToString()
newVersion = &data.NodeVersion{ newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{ BaseNodeVersion: data.BaseNodeVersion{
OID: randOID, OID: randOID,
FilePath: obj.Name, FilePath: obj.Name,
@ -606,8 +623,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
} }
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject { func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) || if isNotFoundError(obj.Error) {
errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
obj.Error = nil obj.Error = nil
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
@ -616,6 +632,11 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
return obj return obj
} }
func isNotFoundError(err error) bool {
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
}
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) { func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{ objVersion := &ObjectVersion{
BktInfo: bkt, BktInfo: bkt,
@ -627,6 +648,17 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo
return n.getNodeVersion(ctx, objVersion) return n.getNodeVersion(ctx, objVersion)
} }
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: "",
NoErrorOnDeleteMarker: true,
}
return n.getNodeVersion(ctx, objVersion)
}
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) { func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.IsDeleteMarker() { if nodeVersion.IsDeleteMarker() {
return obj.VersionID, nil return obj.VersionID, nil

View file

@ -113,14 +113,14 @@ func (a *App) init(ctx context.Context) {
func (a *App) initLayer(ctx context.Context) { func (a *App) initLayer(ctx context.Context) {
a.initResolver() a.initResolver()
treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint) treeServiceEndpoint := a.cfg.GetStringSlice(cfgTreeServiceEndpoint)
grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials()) grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, grpcDialOpt) treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, a.log, grpcDialOpt)
if err != nil { if err != nil {
a.log.Fatal("failed to create tree service", zap.Error(err)) a.log.Fatal("failed to create tree service", zap.Error(err))
} }
treeService := tree.NewTree(treeGRPCClient) treeService := tree.NewTree(treeGRPCClient, a.log)
a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint)) a.log.Info("init tree service", zap.Strings("endpoints", treeGRPCClient.Endpoints()))
// prepare random key for anonymous requests // prepare random key for anonymous requests
randomKey, err := keys.NewPrivateKey() randomKey, err := keys.NewPrivateKey()
@ -182,6 +182,7 @@ func (a *App) initAPI(ctx context.Context) {
func (a *App) initMetrics() { func (a *App) initMetrics() {
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting)
} }
func (a *App) initResolver() { func (a *App) initResolver() {
@ -369,7 +370,7 @@ func (a *App) Wait() {
} }
func (a *App) setHealthStatus() { func (a *App) setHealthStatus() {
a.metrics.SetHealth(1) a.metrics.SetHealth(metrics.HealthStatusReady)
} }
// Serve runs HTTP server to handle S3 API requests. // Serve runs HTTP server to handle S3 API requests.

View file

@ -42,8 +42,8 @@ S3_GW_CONFIG=/path/to/config/yaml
# Logger # Logger
S3_GW_LOGGER_LEVEL=debug S3_GW_LOGGER_LEVEL=debug
# Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section). # Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used.
S3_GW_TREE_SERVICE=grpc://s01.frostfs.devenv:8080 S3_GW_TREE_SERVICE=grpc://s01.frostfs.devenv:8080 grpc://s02.frostfs.devenv:8080
# RPC endpoint and order of resolving of bucket names # RPC endpoint and order of resolving of bucket names
S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/ S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/

View file

@ -44,9 +44,11 @@ listen_domains:
logger: logger:
level: debug level: debug
# Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section). # Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used.
tree: tree:
service: node1.frostfs:8080 service:
- node1.frostfs:8080
- node2.frostfs:8080
# RPC endpoint and order of resolving of bucket names # RPC endpoint and order of resolving of bucket names
rpc_endpoint: http://morph-chain.frostfs.devenv:30333 rpc_endpoint: http://morph-chain.frostfs.devenv:30333

View file

@ -337,14 +337,23 @@ logger:
### `tree` section ### `tree` section
```yaml
tree:
service:
- s01.frostfs.devenv:8080
- s02.frostfs.devenv:8080
```
If you use only one endpoint, it can be provided as:
```yaml ```yaml
tree: tree:
service: s01.frostfs.devenv:8080 service: s01.frostfs.devenv:8080
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-----------|----------|---------------|------------------------------------------------------------------------------------------------------------| |-----------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------|
| `service` | `string` | | Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section). | | `service` | `[]string` | | Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used. |
### `cache` section ### `cache` section

1
go.mod
View file

@ -31,6 +31,7 @@ require (
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect

View file

@ -0,0 +1,70 @@
package client
import (
"context"
"fmt"
"sync"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
"google.golang.org/grpc"
)
type TreeClient struct {
mu sync.RWMutex
address string
opts []grpc.DialOption
conn *grpc.ClientConn
service grpcService.TreeServiceClient
dialed bool
}
// NewTreeClient creates new tree client with auto dial.
func NewTreeClient(addr string, opts ...grpc.DialOption) *TreeClient {
return &TreeClient{
address: addr,
opts: opts,
}
}
func (c *TreeClient) dial(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.dialed {
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
}
conn, err := grpc.Dial(c.address, c.opts...)
if err != nil {
return fmt.Errorf("grpc dial node tree service: %w", err)
}
serviceClient := grpcService.NewTreeServiceClient(conn)
if _, err = serviceClient.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
return fmt.Errorf("healthcheck tree service: %w", err)
}
c.conn = conn
c.service = serviceClient
c.dialed = true
return nil
}
func (c *TreeClient) TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error) {
c.mu.RLock()
dialed := c.dialed
c.mu.RUnlock()
if !dialed {
if err := c.dial(ctx); err != nil {
return nil, err
}
}
return c.service, nil
}
func (c *TreeClient) Address() string {
return c.address
}

View file

@ -6,14 +6,17 @@ import (
"fmt" "fmt"
"io" "io"
"strings" "strings"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
treeClient "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree" grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -65,28 +68,61 @@ func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
return res return res
} }
type TreeClient interface {
TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error)
Address() string
}
type ServiceClientGRPC struct { type ServiceClientGRPC struct {
key *keys.PrivateKey key *keys.PrivateKey
conn *grpc.ClientConn log *zap.Logger
service grpcService.TreeServiceClient clients []TreeClient
startIndex int32
} }
func NewTreeServiceClientGRPC(ctx context.Context, addr string, key *keys.PrivateKey, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) { func (c *ServiceClientGRPC) getStartIndex() int {
conn, err := grpc.Dial(addr, grpcOpts...) return int(atomic.LoadInt32(&c.startIndex))
if err != nil {
return nil, fmt.Errorf("did not connect: %v", err)
} }
c := grpcService.NewTreeServiceClient(conn) func (c *ServiceClientGRPC) setStartIndex(index int) {
if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil { atomic.StoreInt32(&c.startIndex, int32(index))
return nil, fmt.Errorf("healthcheck: %w", err)
} }
return &ServiceClientGRPC{ func (c *ServiceClientGRPC) Endpoints() []string {
res := make([]string, len(c.clients))
for i, client := range c.clients {
res[i] = client.Address()
}
return res
}
func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
res := &ServiceClientGRPC{
key: key, key: key,
conn: conn, log: log,
service: c, }
}, nil
firstHealthy := -1
res.clients = make([]TreeClient, len(endpoints))
for i, addr := range endpoints {
res.clients[i] = treeClient.NewTreeClient(addr, grpcOpts...)
if _, err := res.clients[i].TreeClient(ctx); err != nil {
log.Warn("dial tree", zap.String("address", addr), zap.Error(err))
continue
}
if firstHealthy == -1 {
firstHealthy = i
}
}
if firstHealthy == -1 {
return nil, errors.New("no healthy tree grpc client")
}
res.setStartIndex(firstHealthy)
return res, nil
} }
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) { func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
@ -112,9 +148,15 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
return nil, err return nil, err
} }
resp, err := c.service.GetNodeByPath(ctx, request) log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", p.TreeID),
if err != nil { zap.String("method", "GetNodeByPath"))
return nil, handleError("failed to get node by path", err)
var resp *grpcService.GetNodeByPathResponse
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
return handleError("failed to get node by path", inErr)
}); err != nil {
return nil, err
} }
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes())) res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
@ -145,9 +187,15 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
return nil, err return nil, err
} }
cli, err := c.service.GetSubTree(ctx, request) log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
if err != nil { zap.String("method", "GetSubTree"))
return nil, handleError("failed to get sub tree client", err)
var cli grpcService.TreeService_GetSubTreeClient
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr)
}); err != nil {
return nil, err
} }
var subtree []tree.NodeResponse var subtree []tree.NodeResponse
@ -183,9 +231,15 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
return 0, err return 0, err
} }
resp, err := c.service.Add(ctx, request) log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
if err != nil { zap.String("method", "Add"))
return 0, handleError("failed to add node", err)
var resp *grpcService.AddResponse
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr)
}); err != nil {
return 0, err
} }
return resp.GetBody().GetNodeId(), nil return resp.GetBody().GetNodeId(), nil
@ -212,9 +266,15 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
return 0, err return 0, err
} }
resp, err := c.service.AddByPath(ctx, request) log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
if err != nil { zap.String("method", "AddByPath"))
return 0, handleError("failed to add node by path", err)
var resp *grpcService.AddByPathResponse
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr)
}); err != nil {
return 0, err
} }
body := resp.GetBody() body := resp.GetBody()
@ -249,11 +309,15 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
return err return err
} }
if _, err := c.service.Move(ctx, request); err != nil { log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
zap.String("method", "Move"))
return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
return nil return nil
})
} }
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
@ -274,11 +338,43 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
return err return err
} }
if _, err := c.service.Remove(ctx, request); err != nil { log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
zap.String("method", "Remove"))
return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
return nil return nil
})
}
func (c *ServiceClientGRPC) requestWithRetry(ctx context.Context, log *zap.Logger, fn func(client grpcService.TreeServiceClient) error) error {
var (
err error
cl grpcService.TreeServiceClient
)
start := c.getStartIndex()
for i := start; i < start+len(c.clients); i++ {
index := i % len(c.clients)
if cl, err = c.clients[index].TreeClient(ctx); err == nil {
err = fn(cl)
}
if !shouldTryAgain(err) {
c.setStartIndex(index)
return err
}
log.Debug("tree request error", zap.String("address", c.clients[index].Address()), zap.Error(err))
}
return err
}
func shouldTryAgain(err error) bool {
return !(err == nil ||
errors.Is(err, tree.ErrNodeNotFound) ||
errors.Is(err, tree.ErrNodeAccessDenied))
} }
func metaToKV(meta map[string]string) []*grpcService.KeyValue { func metaToKV(meta map[string]string) []*grpcService.KeyValue {
@ -303,6 +399,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
} }
func handleError(msg string, err error) error { func handleError(msg string, err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "not found") { if strings.Contains(err.Error(), "not found") {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error()) return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
} else if strings.Contains(err.Error(), "is denied by") { } else if strings.Contains(err.Error(), "is denied by") {

View file

@ -7,8 +7,6 @@ import (
) )
func (c *ServiceClientGRPC) signData(buf []byte, f func(key, sign []byte)) error { func (c *ServiceClientGRPC) signData(buf []byte, f func(key, sign []byte)) error {
// crypto package should not be used outside of API libraries (see neofs-node#491).
// For now tree service does not include into SDK Client nor SDK Pool, so there is no choice.
// When SDK library adopts Tree service client, this should be dropped. // When SDK library adopts Tree service client, this should be dropped.
sign, err := crypto.Sign(&c.key.PrivateKey, buf) sign, err := crypto.Sign(&c.key.PrivateKey, buf)
if err != nil { if err != nil {

View file

@ -1,13 +1,32 @@
package services package services
import ( import (
"context"
"errors" "errors"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
type treeClientMock struct {
address string
err bool
}
func (t *treeClientMock) TreeClient(context.Context) (grpcService.TreeServiceClient, error) {
if t.err {
return nil, errors.New("error")
}
return nil, nil
}
func (t *treeClientMock) Address() string {
return t.address
}
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {
defaultError := errors.New("default error") defaultError := errors.New("default error")
for _, tc := range []struct { for _, tc := range []struct {
@ -33,3 +52,78 @@ func TestHandleError(t *testing.T) {
}) })
} }
} }
func TestRetry(t *testing.T) {
ctx := context.Background()
log := zaptest.NewLogger(t)
cl := &ServiceClientGRPC{
log: zaptest.NewLogger(t),
clients: []TreeClient{
&treeClientMock{address: "node0"},
&treeClientMock{address: "node1"},
&treeClientMock{address: "node2"},
&treeClientMock{address: "node3"},
},
}
makeFn := func(client grpcService.TreeServiceClient) error {
return nil
}
t.Run("first ok", func(t *testing.T) {
err := cl.requestWithRetry(ctx, log, makeFn)
require.NoError(t, err)
require.Equal(t, 0, cl.getStartIndex())
resetClients(cl)
})
t.Run("first failed", func(t *testing.T) {
setErrors(cl.clients[:1])
err := cl.requestWithRetry(ctx, log, makeFn)
require.NoError(t, err)
require.Equal(t, 1, cl.getStartIndex())
resetClients(cl)
})
t.Run("all failed", func(t *testing.T) {
setErrors(cl.clients)
err := cl.requestWithRetry(ctx, log, makeFn)
require.Error(t, err)
require.Equal(t, 0, cl.getStartIndex())
resetClients(cl)
})
t.Run("round", func(t *testing.T) {
setErrors(cl.clients[:2])
err := cl.requestWithRetry(ctx, log, makeFn)
require.NoError(t, err)
require.Equal(t, 2, cl.getStartIndex())
resetClientsErrors(cl)
setErrors(cl.clients[2:])
err = cl.requestWithRetry(ctx, log, makeFn)
require.NoError(t, err)
require.Equal(t, 0, cl.getStartIndex())
resetClients(cl)
})
}
func resetClients(cl *ServiceClientGRPC) {
resetClientsErrors(cl)
cl.setStartIndex(0)
}
func resetClientsErrors(cl *ServiceClientGRPC) {
for _, client := range cl.clients {
node := client.(*treeClientMock)
node.err = false
}
}
func setErrors(clients []TreeClient) {
for _, client := range clients {
node := client.(*treeClientMock)
node.err = true
}
}

View file

@ -37,7 +37,7 @@ func (m *AppMetrics) SetEnabled(enabled bool) {
m.mu.Unlock() m.mu.Unlock()
} }
func (m *AppMetrics) SetHealth(status int32) { func (m *AppMetrics) SetHealth(status HealthStatus) {
if !m.isEnabled() { if !m.isEnabled() {
return return
} }
@ -48,7 +48,7 @@ func (m *AppMetrics) SetHealth(status int32) {
func (m *AppMetrics) Shutdown() { func (m *AppMetrics) Shutdown() {
m.mu.Lock() m.mu.Lock()
if m.enabled { if m.enabled {
m.gate.State.SetHealth(0) m.gate.State.SetHealth(HealthStatusShuttingDown)
m.enabled = false m.enabled = false
} }
m.gate.Unregister() m.gate.Unregister()

View file

@ -4,6 +4,16 @@ import "github.com/prometheus/client_golang/prometheus"
const stateSubsystem = "state" const stateSubsystem = "state"
// HealthStatus of the gate application.
type HealthStatus int32
const (
HealthStatusUndefined HealthStatus = 0
HealthStatusStarting HealthStatus = 1
HealthStatusReady HealthStatus = 2
HealthStatusShuttingDown HealthStatus = 3
)
type stateMetrics struct { type stateMetrics struct {
healthCheck prometheus.Gauge healthCheck prometheus.Gauge
} }
@ -27,6 +37,6 @@ func (m stateMetrics) unregister() {
prometheus.Unregister(m.healthCheck) prometheus.Unregister(m.healthCheck)
} }
func (m stateMetrics) SetHealth(s int32) { func (m stateMetrics) SetHealth(s HealthStatus) {
m.healthCheck.Set(float64(s)) m.healthCheck.Set(float64(s))
} }

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -12,11 +13,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
) )
type ( type (
Tree struct { Tree struct {
service ServiceClient service ServiceClient
log *zap.Logger
} }
// ServiceClient is a client to interact with tree service. // ServiceClient is a client to interact with tree service.
@ -104,8 +107,11 @@ const (
) )
// NewTree creates instance of Tree using provided address and create grpc connection. // NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient) *Tree { func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{service: service} return &Tree{
service: service,
log: log,
}
} }
type Meta interface { type Meta interface {
@ -811,14 +817,19 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
return nil, err return nil, err
} }
if len(nodes) > 1 { if len(nodes) == 0 {
return nil, fmt.Errorf("found more than one unversioned node")
}
if len(nodes) != 1 {
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
if len(nodes) > 1 {
c.log.Debug("found more than one unversioned node", zap.Stringer("cid", bktInfo.CID),
zap.String("treeID", treeID), zap.String("filepath", filepath))
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Timestamp > nodes[j].Timestamp
})
return nodes[0], nil return nodes[0], nil
} }

View file

@ -8,6 +8,7 @@ import (
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
func TestLockConfigurationEncoding(t *testing.T) { func TestLockConfigurationEncoding(t *testing.T) {
@ -102,7 +103,7 @@ func TestTreeServiceSettings(t *testing.T) {
memCli, err := NewTreeServiceClientMemory() memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err) require.NoError(t, err)
treeService := NewTree(memCli) treeService := NewTree(memCli, zaptest.NewLogger(t))
bktInfo := &data.BucketInfo{ bktInfo := &data.BucketInfo{
CID: cidtest.ID(), CID: cidtest.ID(),
@ -134,7 +135,7 @@ func TestTreeServiceAddVersion(t *testing.T) {
memCli, err := NewTreeServiceClientMemory() memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err) require.NoError(t, err)
treeService := NewTree(memCli) treeService := NewTree(memCli, zaptest.NewLogger(t))
bktInfo := &data.BucketInfo{ bktInfo := &data.BucketInfo{
CID: cidtest.ID(), CID: cidtest.ID(),