forked from TrueCloudLab/frostfs-s3-gw
Compare commits
13 commits
master
...
support/v0
Author | SHA1 | Date | |
---|---|---|---|
7edd827706 | |||
c8f3dc17e0 | |||
efdde64c23 | |||
8b41fbeed0 | |||
02934f49e5 | |||
b4d5d84f21 | |||
b9baebbed7 | |||
0c1e17dca4 | |||
18c7d669e0 | |||
cab758d8ce | |||
b60afd88c4 | |||
5d4304e204 | |||
17b8905b24 |
18 changed files with 505 additions and 74 deletions
|
@ -8,6 +8,7 @@ This document outlines major changes between releases.
|
||||||
- Clean up List and Name caches when object is missing in Tree service (#57)
|
- Clean up List and Name caches when object is missing in Tree service (#57)
|
||||||
- Get empty bucket CORS from frostfs (TrueCloudLab#36)
|
- Get empty bucket CORS from frostfs (TrueCloudLab#36)
|
||||||
- Don't count pool error on client abort (#35)
|
- Don't count pool error on client abort (#35)
|
||||||
|
- Don't create unnecessary delete-markers (#83)
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Return `X-Owner-Id` in `head-bucket` response (#79)
|
- Return `X-Owner-Id` in `head-bucket` response (#79)
|
||||||
|
@ -29,6 +30,8 @@ This document outlines major changes between releases.
|
||||||
- Limit number of objects to delete at one time (TrueCloudLab#37)
|
- Limit number of objects to delete at one time (TrueCloudLab#37)
|
||||||
- CompleteMultipartUpload handler now sends whitespace characters to keep alive client's connection (#60)
|
- CompleteMultipartUpload handler now sends whitespace characters to keep alive client's connection (#60)
|
||||||
- Support new system attributes (#64)
|
- Support new system attributes (#64)
|
||||||
|
- Changed values for `frostfs_s3_gw_state_health` metric (#91)
|
||||||
|
- Support multiple tree service endpoints (#74)
|
||||||
|
|
||||||
## [0.26.0] - 2022-12-28
|
## [0.26.0] - 2022-12-28
|
||||||
|
|
||||||
|
|
|
@ -147,6 +147,81 @@ func TestRemoveDeleteMarker(t *testing.T) {
|
||||||
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
|
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteMarkerVersioned(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||||
|
createVersionedBucketAndObject(t, tc, bktName, objName)
|
||||||
|
|
||||||
|
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
|
||||||
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||||
|
require.True(t, isDeleteMarker)
|
||||||
|
versions := listVersions(t, tc, bktName)
|
||||||
|
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
||||||
|
|
||||||
|
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||||
|
require.True(t, isDeleteMarker)
|
||||||
|
versions = listVersions(t, tc, bktName)
|
||||||
|
require.Len(t, versions.DeleteMarker, 1)
|
||||||
|
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
|
||||||
|
versionsBefore := listVersions(t, tc, bktName)
|
||||||
|
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
|
||||||
|
require.False(t, isDeleteMarker)
|
||||||
|
versionsAfter := listVersions(t, tc, bktName)
|
||||||
|
require.Equal(t, versionsBefore, versionsAfter)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteMarkerSuspended(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||||
|
bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName)
|
||||||
|
putBucketVersioning(t, tc, bktName, false)
|
||||||
|
|
||||||
|
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
|
||||||
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||||
|
require.True(t, isDeleteMarker)
|
||||||
|
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
||||||
|
|
||||||
|
deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||||
|
require.True(t, isDeleteMarker)
|
||||||
|
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
||||||
|
|
||||||
|
versions := listVersions(t, tc, bktName)
|
||||||
|
require.Len(t, versions.DeleteMarker, 1)
|
||||||
|
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
|
||||||
|
versionsBefore := listVersions(t, tc, bktName)
|
||||||
|
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
|
||||||
|
require.False(t, isDeleteMarker)
|
||||||
|
versionsAfter := listVersions(t, tc, bktName)
|
||||||
|
require.Equal(t, versionsBefore, versionsAfter)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("remove last unversioned non delete marker", func(t *testing.T) {
|
||||||
|
objName := "obj3"
|
||||||
|
putObject(t, tc, bktName, objName)
|
||||||
|
|
||||||
|
nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||||
|
require.True(t, isDeleteMarker)
|
||||||
|
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
|
||||||
|
|
||||||
|
objVersions := getVersion(listVersions(t, tc, bktName), objName)
|
||||||
|
require.Len(t, objVersions, 0)
|
||||||
|
|
||||||
|
require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeleteObjectCombined(t *testing.T) {
|
func TestDeleteObjectCombined(t *testing.T) {
|
||||||
tc := prepareHandlerContext(t)
|
tc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
@ -197,7 +272,7 @@ func TestDeleteMarkers(t *testing.T) {
|
||||||
deleteObject(t, tc, bktName, objName, emptyVersion)
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||||
|
|
||||||
versions := listVersions(t, tc, bktName)
|
versions := listVersions(t, tc, bktName)
|
||||||
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length")
|
require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length")
|
||||||
require.Len(t, versions.Version, 0, "versions must be empty")
|
require.Len(t, versions.Version, 0, "versions must be empty")
|
||||||
|
|
||||||
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
|
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
|
||||||
|
@ -316,6 +391,16 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse {
|
||||||
|
var res []*ObjectVersionResponse
|
||||||
|
for i, version := range resp.Version {
|
||||||
|
if version.Key == objName {
|
||||||
|
res = append(res, &resp.Version[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
|
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
|
||||||
body := bytes.NewReader([]byte("content"))
|
body := bytes.NewReader([]byte("content"))
|
||||||
w, r := prepareTestPayloadRequest(tc, bktName, objName, body)
|
w, r := prepareTestPayloadRequest(tc, bktName, objName, body)
|
||||||
|
|
|
@ -33,6 +33,7 @@ type handlerContext struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
h *handler
|
h *handler
|
||||||
tp *layer.TestFrostFS
|
tp *layer.TestFrostFS
|
||||||
|
tree *tree.Tree
|
||||||
context context.Context
|
context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,11 +86,13 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||||
var owner user.ID
|
var owner user.ID
|
||||||
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
||||||
|
|
||||||
|
treeMock := NewTreeServiceMock(t)
|
||||||
|
|
||||||
layerCfg := &layer.Config{
|
layerCfg := &layer.Config{
|
||||||
Caches: layer.DefaultCachesConfigs(zap.NewExample()),
|
Caches: layer.DefaultCachesConfigs(zap.NewExample()),
|
||||||
AnonKey: layer.AnonymousKey{Key: key},
|
AnonKey: layer.AnonymousKey{Key: key},
|
||||||
Resolver: testResolver,
|
Resolver: testResolver,
|
||||||
TreeService: NewTreeServiceMock(t),
|
TreeService: treeMock,
|
||||||
}
|
}
|
||||||
|
|
||||||
var pp netmap.PlacementPolicy
|
var pp netmap.PlacementPolicy
|
||||||
|
@ -110,6 +113,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||||
t: t,
|
t: t,
|
||||||
h: h,
|
h: h,
|
||||||
tp: tp,
|
tp: tp,
|
||||||
|
tree: treeMock,
|
||||||
context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)),
|
context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -117,7 +121,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||||
func NewTreeServiceMock(t *testing.T) *tree.Tree {
|
func NewTreeServiceMock(t *testing.T) *tree.Tree {
|
||||||
memCli, err := tree.NewTreeServiceClientMemory()
|
memCli, err := tree.NewTreeServiceClientMemory()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return tree.NewTree(memCli)
|
return tree.NewTree(memCli, zap.NewExample())
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {
|
func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||||
|
@ -55,6 +56,16 @@ func (t *TestFrostFS) Objects() []*object.Object {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
|
||||||
|
for _, obj := range t.objects {
|
||||||
|
if id, _ := obj.ID(); id.Equals(objID) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
|
func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
|
||||||
t.objects[key] = obj
|
t.objects[key] = obj
|
||||||
}
|
}
|
||||||
|
@ -161,7 +172,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("object not found %s", addr)
|
return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
|
@ -561,21 +561,38 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
var newVersion *data.NodeVersion
|
lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
|
||||||
|
if err != nil {
|
||||||
|
obj.Error = err
|
||||||
|
return n.handleNotFoundError(bkt, obj)
|
||||||
|
}
|
||||||
|
|
||||||
if settings.VersioningSuspended() {
|
if settings.VersioningSuspended() {
|
||||||
obj.VersionID = data.UnversionedObjectVersionID
|
obj.VersionID = data.UnversionedObjectVersionID
|
||||||
|
|
||||||
var nodeVersion *data.NodeVersion
|
var nullVersionToDelete *data.NodeVersion
|
||||||
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
if lastVersion.IsUnversioned {
|
||||||
return n.handleNotFoundError(bkt, obj)
|
if !lastVersion.IsDeleteMarker() {
|
||||||
|
nullVersionToDelete = lastVersion
|
||||||
|
}
|
||||||
|
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||||
|
if !isNotFoundError(obj.Error) {
|
||||||
|
return obj
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
if nullVersionToDelete != nil {
|
||||||
return obj
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
|
||||||
|
return obj
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if lastVersion.IsDeleteMarker() {
|
||||||
|
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
randOID, err := getRandomOID()
|
randOID, err := getRandomOID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
|
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
|
||||||
|
@ -584,7 +601,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
|
|
||||||
obj.DeleteMarkVersion = randOID.EncodeToString()
|
obj.DeleteMarkVersion = randOID.EncodeToString()
|
||||||
|
|
||||||
newVersion = &data.NodeVersion{
|
newVersion := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
OID: randOID,
|
OID: randOID,
|
||||||
FilePath: obj.Name,
|
FilePath: obj.Name,
|
||||||
|
@ -606,8 +623,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||||
if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) ||
|
if isNotFoundError(obj.Error) {
|
||||||
errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
|
|
||||||
obj.Error = nil
|
obj.Error = nil
|
||||||
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
|
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
|
||||||
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
||||||
|
@ -616,6 +632,11 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isNotFoundError(err error) bool {
|
||||||
|
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
|
||||||
|
errors.IsS3Error(err, errors.ErrNoSuchVersion)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||||
objVersion := &ObjectVersion{
|
objVersion := &ObjectVersion{
|
||||||
BktInfo: bkt,
|
BktInfo: bkt,
|
||||||
|
@ -627,6 +648,17 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo
|
||||||
return n.getNodeVersion(ctx, objVersion)
|
return n.getNodeVersion(ctx, objVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||||
|
objVersion := &ObjectVersion{
|
||||||
|
BktInfo: bkt,
|
||||||
|
ObjectName: obj.Name,
|
||||||
|
VersionID: "",
|
||||||
|
NoErrorOnDeleteMarker: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
return n.getNodeVersion(ctx, objVersion)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
||||||
if nodeVersion.IsDeleteMarker() {
|
if nodeVersion.IsDeleteMarker() {
|
||||||
return obj.VersionID, nil
|
return obj.VersionID, nil
|
||||||
|
|
|
@ -113,14 +113,14 @@ func (a *App) init(ctx context.Context) {
|
||||||
func (a *App) initLayer(ctx context.Context) {
|
func (a *App) initLayer(ctx context.Context) {
|
||||||
a.initResolver()
|
a.initResolver()
|
||||||
|
|
||||||
treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint)
|
treeServiceEndpoint := a.cfg.GetStringSlice(cfgTreeServiceEndpoint)
|
||||||
grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
|
grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||||
treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, grpcDialOpt)
|
treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, a.log, grpcDialOpt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal("failed to create tree service", zap.Error(err))
|
a.log.Fatal("failed to create tree service", zap.Error(err))
|
||||||
}
|
}
|
||||||
treeService := tree.NewTree(treeGRPCClient)
|
treeService := tree.NewTree(treeGRPCClient, a.log)
|
||||||
a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint))
|
a.log.Info("init tree service", zap.Strings("endpoints", treeGRPCClient.Endpoints()))
|
||||||
|
|
||||||
// prepare random key for anonymous requests
|
// prepare random key for anonymous requests
|
||||||
randomKey, err := keys.NewPrivateKey()
|
randomKey, err := keys.NewPrivateKey()
|
||||||
|
@ -182,6 +182,7 @@ func (a *App) initAPI(ctx context.Context) {
|
||||||
|
|
||||||
func (a *App) initMetrics() {
|
func (a *App) initMetrics() {
|
||||||
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
|
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
|
||||||
|
a.metrics.SetHealth(metrics.HealthStatusStarting)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initResolver() {
|
func (a *App) initResolver() {
|
||||||
|
@ -369,7 +370,7 @@ func (a *App) Wait() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) setHealthStatus() {
|
func (a *App) setHealthStatus() {
|
||||||
a.metrics.SetHealth(1)
|
a.metrics.SetHealth(metrics.HealthStatusReady)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serve runs HTTP server to handle S3 API requests.
|
// Serve runs HTTP server to handle S3 API requests.
|
||||||
|
|
|
@ -42,8 +42,8 @@ S3_GW_CONFIG=/path/to/config/yaml
|
||||||
# Logger
|
# Logger
|
||||||
S3_GW_LOGGER_LEVEL=debug
|
S3_GW_LOGGER_LEVEL=debug
|
||||||
|
|
||||||
# Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section).
|
# Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used.
|
||||||
S3_GW_TREE_SERVICE=grpc://s01.frostfs.devenv:8080
|
S3_GW_TREE_SERVICE=grpc://s01.frostfs.devenv:8080 grpc://s02.frostfs.devenv:8080
|
||||||
|
|
||||||
# RPC endpoint and order of resolving of bucket names
|
# RPC endpoint and order of resolving of bucket names
|
||||||
S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/
|
S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/
|
||||||
|
|
|
@ -44,9 +44,11 @@ listen_domains:
|
||||||
logger:
|
logger:
|
||||||
level: debug
|
level: debug
|
||||||
|
|
||||||
# Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section).
|
# Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used.
|
||||||
tree:
|
tree:
|
||||||
service: node1.frostfs:8080
|
service:
|
||||||
|
- node1.frostfs:8080
|
||||||
|
- node2.frostfs:8080
|
||||||
|
|
||||||
# RPC endpoint and order of resolving of bucket names
|
# RPC endpoint and order of resolving of bucket names
|
||||||
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
||||||
|
|
|
@ -337,14 +337,23 @@ logger:
|
||||||
|
|
||||||
### `tree` section
|
### `tree` section
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
tree:
|
||||||
|
service:
|
||||||
|
- s01.frostfs.devenv:8080
|
||||||
|
- s02.frostfs.devenv:8080
|
||||||
|
```
|
||||||
|
|
||||||
|
If you use only one endpoint, it can be provided as:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
tree:
|
tree:
|
||||||
service: s01.frostfs.devenv:8080
|
service: s01.frostfs.devenv:8080
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-----------|----------|---------------|------------------------------------------------------------------------------------------------------------|
|
|-----------|------------|---------------|-------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `service` | `string` | | Endpoint of the tree service. Must be provided. Can be one of the node address (from the `peers` section). |
|
| `service` | `[]string` | | Endpoints of the tree service. At least one endpoint must be provided. Node addresses (from the `peers` section) can be used. |
|
||||||
|
|
||||||
### `cache` section
|
### `cache` section
|
||||||
|
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -31,6 +31,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
|
||||||
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect
|
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect
|
||||||
|
github.com/benbjohnson/clock v1.1.0 // indirect
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||||
|
|
70
internal/frostfs/services/client/client.go
Normal file
70
internal/frostfs/services/client/client.go
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TreeClient struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
address string
|
||||||
|
opts []grpc.DialOption
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
service grpcService.TreeServiceClient
|
||||||
|
dialed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTreeClient creates new tree client with auto dial.
|
||||||
|
func NewTreeClient(addr string, opts ...grpc.DialOption) *TreeClient {
|
||||||
|
return &TreeClient{
|
||||||
|
address: addr,
|
||||||
|
opts: opts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TreeClient) dial(ctx context.Context) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.dialed {
|
||||||
|
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := grpc.Dial(c.address, c.opts...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("grpc dial node tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceClient := grpcService.NewTreeServiceClient(conn)
|
||||||
|
if _, err = serviceClient.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||||
|
return fmt.Errorf("healthcheck tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.conn = conn
|
||||||
|
c.service = serviceClient
|
||||||
|
c.dialed = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TreeClient) TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error) {
|
||||||
|
c.mu.RLock()
|
||||||
|
dialed := c.dialed
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
|
if !dialed {
|
||||||
|
if err := c.dial(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.service, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TreeClient) Address() string {
|
||||||
|
return c.address
|
||||||
|
}
|
|
@ -6,14 +6,17 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||||
|
treeClient "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client"
|
||||||
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -65,28 +68,61 @@ func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServiceClientGRPC struct {
|
type TreeClient interface {
|
||||||
key *keys.PrivateKey
|
TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error)
|
||||||
conn *grpc.ClientConn
|
Address() string
|
||||||
service grpcService.TreeServiceClient
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTreeServiceClientGRPC(ctx context.Context, addr string, key *keys.PrivateKey, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
|
type ServiceClientGRPC struct {
|
||||||
conn, err := grpc.Dial(addr, grpcOpts...)
|
key *keys.PrivateKey
|
||||||
if err != nil {
|
log *zap.Logger
|
||||||
return nil, fmt.Errorf("did not connect: %v", err)
|
clients []TreeClient
|
||||||
|
startIndex int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceClientGRPC) getStartIndex() int {
|
||||||
|
return int(atomic.LoadInt32(&c.startIndex))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceClientGRPC) setStartIndex(index int) {
|
||||||
|
atomic.StoreInt32(&c.startIndex, int32(index))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceClientGRPC) Endpoints() []string {
|
||||||
|
res := make([]string, len(c.clients))
|
||||||
|
for i, client := range c.clients {
|
||||||
|
res[i] = client.Address()
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys.PrivateKey, log *zap.Logger, grpcOpts ...grpc.DialOption) (*ServiceClientGRPC, error) {
|
||||||
|
res := &ServiceClientGRPC{
|
||||||
|
key: key,
|
||||||
|
log: log,
|
||||||
}
|
}
|
||||||
|
|
||||||
c := grpcService.NewTreeServiceClient(conn)
|
firstHealthy := -1
|
||||||
if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
|
||||||
return nil, fmt.Errorf("healthcheck: %w", err)
|
res.clients = make([]TreeClient, len(endpoints))
|
||||||
|
for i, addr := range endpoints {
|
||||||
|
res.clients[i] = treeClient.NewTreeClient(addr, grpcOpts...)
|
||||||
|
if _, err := res.clients[i].TreeClient(ctx); err != nil {
|
||||||
|
log.Warn("dial tree", zap.String("address", addr), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if firstHealthy == -1 {
|
||||||
|
firstHealthy = i
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ServiceClientGRPC{
|
if firstHealthy == -1 {
|
||||||
key: key,
|
return nil, errors.New("no healthy tree grpc client")
|
||||||
conn: conn,
|
}
|
||||||
service: c,
|
|
||||||
}, nil
|
res.setStartIndex(firstHealthy)
|
||||||
|
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
||||||
|
@ -112,9 +148,15 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.service.GetNodeByPath(ctx, request)
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", p.TreeID),
|
||||||
if err != nil {
|
zap.String("method", "GetNodeByPath"))
|
||||||
return nil, handleError("failed to get node by path", err)
|
|
||||||
|
var resp *grpcService.GetNodeByPathResponse
|
||||||
|
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
|
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||||
|
return handleError("failed to get node by path", inErr)
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
res := make([]tree.NodeResponse, len(resp.GetBody().GetNodes()))
|
||||||
|
@ -145,9 +187,15 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := c.service.GetSubTree(ctx, request)
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
||||||
if err != nil {
|
zap.String("method", "GetSubTree"))
|
||||||
return nil, handleError("failed to get sub tree client", err)
|
|
||||||
|
var cli grpcService.TreeService_GetSubTreeClient
|
||||||
|
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
|
cli, inErr = client.GetSubTree(ctx, request)
|
||||||
|
return handleError("failed to get sub tree client", inErr)
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var subtree []tree.NodeResponse
|
var subtree []tree.NodeResponse
|
||||||
|
@ -183,9 +231,15 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.service.Add(ctx, request)
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
||||||
if err != nil {
|
zap.String("method", "Add"))
|
||||||
return 0, handleError("failed to add node", err)
|
|
||||||
|
var resp *grpcService.AddResponse
|
||||||
|
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
|
resp, inErr = client.Add(ctx, request)
|
||||||
|
return handleError("failed to add node", inErr)
|
||||||
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp.GetBody().GetNodeId(), nil
|
return resp.GetBody().GetNodeId(), nil
|
||||||
|
@ -212,9 +266,15 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.service.AddByPath(ctx, request)
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
||||||
if err != nil {
|
zap.String("method", "AddByPath"))
|
||||||
return 0, handleError("failed to add node by path", err)
|
|
||||||
|
var resp *grpcService.AddByPathResponse
|
||||||
|
if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||||
|
resp, inErr = client.AddByPath(ctx, request)
|
||||||
|
return handleError("failed to add node by path", inErr)
|
||||||
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
body := resp.GetBody()
|
body := resp.GetBody()
|
||||||
|
@ -249,11 +309,15 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := c.service.Move(ctx, request); err != nil {
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
||||||
return handleError("failed to move node", err)
|
zap.String("method", "Move"))
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
|
||||||
|
if _, err := client.Move(ctx, request); err != nil {
|
||||||
|
return handleError("failed to move node", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
||||||
|
@ -274,11 +338,43 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := c.service.Remove(ctx, request); err != nil {
|
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
|
||||||
return handleError("failed to remove node", err)
|
zap.String("method", "Remove"))
|
||||||
|
|
||||||
|
return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
|
||||||
|
if _, err := client.Remove(ctx, request); err != nil {
|
||||||
|
return handleError("failed to remove node", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServiceClientGRPC) requestWithRetry(ctx context.Context, log *zap.Logger, fn func(client grpcService.TreeServiceClient) error) error {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
cl grpcService.TreeServiceClient
|
||||||
|
)
|
||||||
|
|
||||||
|
start := c.getStartIndex()
|
||||||
|
for i := start; i < start+len(c.clients); i++ {
|
||||||
|
index := i % len(c.clients)
|
||||||
|
if cl, err = c.clients[index].TreeClient(ctx); err == nil {
|
||||||
|
err = fn(cl)
|
||||||
|
}
|
||||||
|
if !shouldTryAgain(err) {
|
||||||
|
c.setStartIndex(index)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Debug("tree request error", zap.String("address", c.clients[index].Address()), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func shouldTryAgain(err error) bool {
|
||||||
|
return !(err == nil ||
|
||||||
|
errors.Is(err, tree.ErrNodeNotFound) ||
|
||||||
|
errors.Is(err, tree.ErrNodeAccessDenied))
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
func metaToKV(meta map[string]string) []*grpcService.KeyValue {
|
||||||
|
@ -303,6 +399,9 @@ func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleError(msg string, err error) error {
|
func handleError(msg string, err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if strings.Contains(err.Error(), "not found") {
|
if strings.Contains(err.Error(), "not found") {
|
||||||
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
|
||||||
} else if strings.Contains(err.Error(), "is denied by") {
|
} else if strings.Contains(err.Error(), "is denied by") {
|
||||||
|
|
|
@ -7,8 +7,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *ServiceClientGRPC) signData(buf []byte, f func(key, sign []byte)) error {
|
func (c *ServiceClientGRPC) signData(buf []byte, f func(key, sign []byte)) error {
|
||||||
// crypto package should not be used outside of API libraries (see neofs-node#491).
|
|
||||||
// For now tree service does not include into SDK Client nor SDK Pool, so there is no choice.
|
|
||||||
// When SDK library adopts Tree service client, this should be dropped.
|
// When SDK library adopts Tree service client, this should be dropped.
|
||||||
sign, err := crypto.Sign(&c.key.PrivateKey, buf)
|
sign, err := crypto.Sign(&c.key.PrivateKey, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1,13 +1,32 @@
|
||||||
package services
|
package services
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
|
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type treeClientMock struct {
|
||||||
|
address string
|
||||||
|
err bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *treeClientMock) TreeClient(context.Context) (grpcService.TreeServiceClient, error) {
|
||||||
|
if t.err {
|
||||||
|
return nil, errors.New("error")
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *treeClientMock) Address() string {
|
||||||
|
return t.address
|
||||||
|
}
|
||||||
|
|
||||||
func TestHandleError(t *testing.T) {
|
func TestHandleError(t *testing.T) {
|
||||||
defaultError := errors.New("default error")
|
defaultError := errors.New("default error")
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
|
@ -33,3 +52,78 @@ func TestHandleError(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetry(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
log := zaptest.NewLogger(t)
|
||||||
|
|
||||||
|
cl := &ServiceClientGRPC{
|
||||||
|
log: zaptest.NewLogger(t),
|
||||||
|
clients: []TreeClient{
|
||||||
|
&treeClientMock{address: "node0"},
|
||||||
|
&treeClientMock{address: "node1"},
|
||||||
|
&treeClientMock{address: "node2"},
|
||||||
|
&treeClientMock{address: "node3"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
makeFn := func(client grpcService.TreeServiceClient) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("first ok", func(t *testing.T) {
|
||||||
|
err := cl.requestWithRetry(ctx, log, makeFn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, cl.getStartIndex())
|
||||||
|
resetClients(cl)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("first failed", func(t *testing.T) {
|
||||||
|
setErrors(cl.clients[:1])
|
||||||
|
err := cl.requestWithRetry(ctx, log, makeFn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, cl.getStartIndex())
|
||||||
|
resetClients(cl)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("all failed", func(t *testing.T) {
|
||||||
|
setErrors(cl.clients)
|
||||||
|
err := cl.requestWithRetry(ctx, log, makeFn)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, 0, cl.getStartIndex())
|
||||||
|
resetClients(cl)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("round", func(t *testing.T) {
|
||||||
|
setErrors(cl.clients[:2])
|
||||||
|
err := cl.requestWithRetry(ctx, log, makeFn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, cl.getStartIndex())
|
||||||
|
resetClientsErrors(cl)
|
||||||
|
|
||||||
|
setErrors(cl.clients[2:])
|
||||||
|
err = cl.requestWithRetry(ctx, log, makeFn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, cl.getStartIndex())
|
||||||
|
resetClients(cl)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func resetClients(cl *ServiceClientGRPC) {
|
||||||
|
resetClientsErrors(cl)
|
||||||
|
cl.setStartIndex(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func resetClientsErrors(cl *ServiceClientGRPC) {
|
||||||
|
for _, client := range cl.clients {
|
||||||
|
node := client.(*treeClientMock)
|
||||||
|
node.err = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setErrors(clients []TreeClient) {
|
||||||
|
for _, client := range clients {
|
||||||
|
node := client.(*treeClientMock)
|
||||||
|
node.err = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func (m *AppMetrics) SetEnabled(enabled bool) {
|
||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *AppMetrics) SetHealth(status int32) {
|
func (m *AppMetrics) SetHealth(status HealthStatus) {
|
||||||
if !m.isEnabled() {
|
if !m.isEnabled() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -48,7 +48,7 @@ func (m *AppMetrics) SetHealth(status int32) {
|
||||||
func (m *AppMetrics) Shutdown() {
|
func (m *AppMetrics) Shutdown() {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
if m.enabled {
|
if m.enabled {
|
||||||
m.gate.State.SetHealth(0)
|
m.gate.State.SetHealth(HealthStatusShuttingDown)
|
||||||
m.enabled = false
|
m.enabled = false
|
||||||
}
|
}
|
||||||
m.gate.Unregister()
|
m.gate.Unregister()
|
||||||
|
|
|
@ -4,6 +4,16 @@ import "github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
const stateSubsystem = "state"
|
const stateSubsystem = "state"
|
||||||
|
|
||||||
|
// HealthStatus of the gate application.
|
||||||
|
type HealthStatus int32
|
||||||
|
|
||||||
|
const (
|
||||||
|
HealthStatusUndefined HealthStatus = 0
|
||||||
|
HealthStatusStarting HealthStatus = 1
|
||||||
|
HealthStatusReady HealthStatus = 2
|
||||||
|
HealthStatusShuttingDown HealthStatus = 3
|
||||||
|
)
|
||||||
|
|
||||||
type stateMetrics struct {
|
type stateMetrics struct {
|
||||||
healthCheck prometheus.Gauge
|
healthCheck prometheus.Gauge
|
||||||
}
|
}
|
||||||
|
@ -27,6 +37,6 @@ func (m stateMetrics) unregister() {
|
||||||
prometheus.Unregister(m.healthCheck)
|
prometheus.Unregister(m.healthCheck)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m stateMetrics) SetHealth(s int32) {
|
func (m stateMetrics) SetHealth(s HealthStatus) {
|
||||||
m.healthCheck.Set(float64(s))
|
m.healthCheck.Set(float64(s))
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -12,11 +13,13 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Tree struct {
|
Tree struct {
|
||||||
service ServiceClient
|
service ServiceClient
|
||||||
|
log *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceClient is a client to interact with tree service.
|
// ServiceClient is a client to interact with tree service.
|
||||||
|
@ -104,8 +107,11 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTree creates instance of Tree using provided address and create grpc connection.
|
// NewTree creates instance of Tree using provided address and create grpc connection.
|
||||||
func NewTree(service ServiceClient) *Tree {
|
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
|
||||||
return &Tree{service: service}
|
return &Tree{
|
||||||
|
service: service,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Meta interface {
|
type Meta interface {
|
||||||
|
@ -811,14 +817,19 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(nodes) > 1 {
|
if len(nodes) == 0 {
|
||||||
return nil, fmt.Errorf("found more than one unversioned node")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(nodes) != 1 {
|
|
||||||
return nil, layer.ErrNodeNotFound
|
return nil, layer.ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(nodes) > 1 {
|
||||||
|
c.log.Debug("found more than one unversioned node", zap.Stringer("cid", bktInfo.CID),
|
||||||
|
zap.String("treeID", treeID), zap.String("filepath", filepath))
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(nodes, func(i, j int) bool {
|
||||||
|
return nodes[i].Timestamp > nodes[j].Timestamp
|
||||||
|
})
|
||||||
|
|
||||||
return nodes[0], nil
|
return nodes[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLockConfigurationEncoding(t *testing.T) {
|
func TestLockConfigurationEncoding(t *testing.T) {
|
||||||
|
@ -102,7 +103,7 @@ func TestTreeServiceSettings(t *testing.T) {
|
||||||
|
|
||||||
memCli, err := NewTreeServiceClientMemory()
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
treeService := NewTree(memCli)
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
bktInfo := &data.BucketInfo{
|
bktInfo := &data.BucketInfo{
|
||||||
CID: cidtest.ID(),
|
CID: cidtest.ID(),
|
||||||
|
@ -134,7 +135,7 @@ func TestTreeServiceAddVersion(t *testing.T) {
|
||||||
|
|
||||||
memCli, err := NewTreeServiceClientMemory()
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
treeService := NewTree(memCli)
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
bktInfo := &data.BucketInfo{
|
bktInfo := &data.BucketInfo{
|
||||||
CID: cidtest.ID(),
|
CID: cidtest.ID(),
|
||||||
|
|
Loading…
Reference in a new issue