[#509] Support fallback address when getting box

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-10-18 16:59:52 +03:00
parent 3c7cb82553
commit 3cf27d281d
6 changed files with 182 additions and 32 deletions

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/bluele/gcache"
"go.uber.org/zap"
)
@ -29,6 +30,7 @@ type (
Box *accessbox.Box
Attributes []object.Attribute
PutTime time.Time
Address *oid.Address
}
)
@ -73,12 +75,7 @@ func (o *AccessBoxCache) Get(accessKeyID string) *AccessBoxCacheValue {
}
// Put stores an accessbox to cache.
func (o *AccessBoxCache) Put(accessKeyID string, box *accessbox.Box, attrs []object.Attribute) error {
val := &AccessBoxCacheValue{
Box: box,
Attributes: attrs,
PutTime: time.Now(),
}
func (o *AccessBoxCache) Put(accessKeyID string, val *AccessBoxCacheValue) error {
return o.cache.Set(accessKeyID, val)
}

View file

@ -8,7 +8,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -24,15 +23,16 @@ func TestAccessBoxCacheType(t *testing.T) {
addr := oidtest.Address()
box := &accessbox.Box{}
var attrs []object.Attribute
val := &AccessBoxCacheValue{
Box: box,
}
accessKeyID := getAccessKeyID(addr)
err := cache.Put(accessKeyID, box, attrs)
err := cache.Put(accessKeyID, val)
require.NoError(t, err)
val := cache.Get(accessKeyID)
require.Equal(t, box, val.Box)
require.Equal(t, attrs, val.Attributes)
resVal := cache.Get(accessKeyID)
require.Equal(t, box, resVal.Box)
require.Equal(t, 0, observedLog.Len())
err = cache.cache.Set(accessKeyID, "tmp")

View file

@ -49,6 +49,12 @@ type (
CacheConfig *cache.Config
RemovingCheckAfterDurations time.Duration
}
Box struct {
AccessBox *accessbox.AccessBox
Attributes []object.Attribute
Address *oid.Address
}
)
// PrmObjectCreate groups parameters of objects created by credential tool.
@ -85,6 +91,10 @@ type PrmGetCredsObject struct {
// S3 access key id.
AccessKeyID string
// FallbackAddress is an address that should be used to get creds if we couldn't find it by AccessKeyID.
// Optional.
FallbackAddress *oid.Address
}
var ErrCustomAccessKeyIDNotFound = errors.New("custom AccessKeyId not found")
@ -136,19 +146,26 @@ func (c *cred) GetBox(ctx context.Context, cnrID cid.ID, accessKeyID string) (*a
return c.checkIfCredentialsAreRemoved(ctx, cnrID, accessKeyID, cachedBoxValue)
}
box, attrs, err := c.getAccessBox(ctx, cnrID, accessKeyID)
box, err := c.getAccessBox(ctx, cnrID, accessKeyID, nil)
if err != nil {
return nil, nil, fmt.Errorf("get access box: %w", err)
}
cachedBox, err := box.GetBox(c.key)
cachedBox, err := box.AccessBox.GetBox(c.key)
if err != nil {
return nil, nil, fmt.Errorf("get gate box: %w", err)
}
c.putBoxToCache(accessKeyID, cachedBox, attrs)
val := &cache.AccessBoxCacheValue{
Box: cachedBox,
Attributes: box.Attributes,
PutTime: time.Now(),
Address: box.Address,
}
return cachedBox, attrs, nil
c.putBoxToCache(accessKeyID, val)
return cachedBox, box.Attributes, nil
}
func (c *cred) checkIfCredentialsAreRemoved(ctx context.Context, cnrID cid.ID, accessKeyID string, cachedBoxValue *cache.AccessBoxCacheValue) (*accessbox.Box, []object.Attribute, error) {
@ -156,7 +173,7 @@ func (c *cred) checkIfCredentialsAreRemoved(ctx context.Context, cnrID cid.ID, a
return cachedBoxValue.Box, cachedBoxValue.Attributes, nil
}
box, attrs, err := c.getAccessBox(ctx, cnrID, accessKeyID)
box, err := c.getAccessBox(ctx, cnrID, accessKeyID, cachedBoxValue.Address)
if err != nil {
if client.IsErrObjectAlreadyRemoved(err) {
c.cache.Delete(accessKeyID)
@ -165,41 +182,61 @@ func (c *cred) checkIfCredentialsAreRemoved(ctx context.Context, cnrID cid.ID, a
return cachedBoxValue.Box, cachedBoxValue.Attributes, nil
}
cachedBox, err := box.GetBox(c.key)
cachedBox, err := box.AccessBox.GetBox(c.key)
if err != nil {
c.cache.Delete(accessKeyID)
return nil, nil, fmt.Errorf("get gate box: %w", err)
}
// we need this to reset PutTime
// to don't check for removing each time after removingCheckDuration interval
c.putBoxToCache(accessKeyID, cachedBox, attrs)
val := &cache.AccessBoxCacheValue{
Box: cachedBox,
Attributes: box.Attributes,
PutTime: time.Now(),
Address: box.Address,
}
c.putBoxToCache(accessKeyID, val)
return cachedBoxValue.Box, attrs, nil
return cachedBoxValue.Box, box.Attributes, nil
}
func (c *cred) putBoxToCache(accessKeyID string, box *accessbox.Box, attrs []object.Attribute) {
if err := c.cache.Put(accessKeyID, box, attrs); err != nil {
func (c *cred) putBoxToCache(accessKeyID string, val *cache.AccessBoxCacheValue) {
if err := c.cache.Put(accessKeyID, val); err != nil {
c.log.Warn(logs.CouldntPutAccessBoxIntoCache, zap.String("accessKeyID", accessKeyID))
}
}
func (c *cred) getAccessBox(ctx context.Context, cnrID cid.ID, accessKeyID string) (*accessbox.AccessBox, []object.Attribute, error) {
func (c *cred) getAccessBox(ctx context.Context, cnrID cid.ID, accessKeyID string, fallbackAddr *oid.Address) (*Box, error) {
prm := PrmGetCredsObject{
Container: cnrID,
AccessKeyID: accessKeyID,
FallbackAddress: fallbackAddr,
}
obj, err := c.frostFS.GetCredsObject(ctx, prm)
if err != nil {
return nil, nil, fmt.Errorf("read payload and attributes: %w", err)
return nil, fmt.Errorf("read payload and attributes: %w", err)
}
// decode access box
var box accessbox.AccessBox
if err = box.Unmarshal(obj.Payload()); err != nil {
return nil, nil, fmt.Errorf("unmarhal access box: %w", err)
return nil, fmt.Errorf("unmarhal access box: %w", err)
}
return &box, obj.Attributes(), nil
addr := &oid.Address{}
boxCnrID, cnrIDOk := obj.ContainerID()
boxObjID, objIDOk := obj.ID()
addr.SetContainer(boxCnrID)
addr.SetObject(boxObjID)
if !cnrIDOk || !objIDOk {
addr = nil
}
return &Box{
AccessBox: &box,
Attributes: obj.Attributes(),
Address: addr,
}, nil
}
func (c *cred) Put(ctx context.Context, prm CredentialsParam) (oid.Address, error) {

View file

@ -128,8 +128,21 @@ func TestRemovingAccessBox(t *testing.T) {
accessKeyID := getAccessKeyID(addr)
accessBoxCustom, _, err := accessbox.PackTokens(gateData, []byte("secret"), true)
require.NoError(t, err)
dataCustom, err := accessBoxCustom.Marshal()
require.NoError(t, err)
var objCustom object.Object
objCustom.SetPayload(dataCustom)
addrCustom := oidtest.Address()
objCustom.SetID(addrCustom.Object())
objCustom.SetContainerID(addrCustom.Container())
accessKeyIDCustom := "accessKeyID"
frostfs := &frostfsMock{
objects: map[string][]*object.Object{accessKeyID: {&obj}},
objects: map[string][]*object.Object{accessKeyID: {&obj}, accessKeyIDCustom: {&objCustom}},
errors: map[string]error{},
}
@ -148,14 +161,29 @@ func TestRemovingAccessBox(t *testing.T) {
_, _, err = creds.GetBox(ctx, addr.Container(), accessKeyID)
require.NoError(t, err)
_, _, err = creds.GetBox(ctx, addr.Container(), accessKeyIDCustom)
require.NoError(t, err)
frostfs.errors[accessKeyID] = errors.New("network error")
_, _, err = creds.GetBox(ctx, addr.Container(), accessKeyID)
require.NoError(t, err)
frostfs.errors[accessKeyIDCustom] = errors.New("network error")
_, _, err = creds.GetBox(ctx, addr.Container(), accessKeyIDCustom)
require.NoError(t, err)
frostfs.errors[accessKeyID] = &apistatus.ObjectAlreadyRemoved{}
_, _, err = creds.GetBox(ctx, addr.Container(), accessKeyID)
require.Error(t, err)
frostfs.errors[accessKeyIDCustom] = &apistatus.ObjectAlreadyRemoved{}
_, _, err = creds.GetBox(ctx, addr.Container(), accessKeyIDCustom)
require.Error(t, err)
frostfs.errors[accessKeyID] = &apistatus.ObjectAlreadyRemoved{}
_, _, err = creds.GetBox(ctx, addr.Container(), accessKeyID)
require.Error(t, err)
frostfs.errors[accessKeyIDCustom] = &apistatus.ObjectAlreadyRemoved{}
_, _, err = creds.GetBox(ctx, addr.Container(), accessKeyIDCustom)
require.Error(t, err)
}
func TestGetBox(t *testing.T) {

View file

@ -74,7 +74,17 @@ func (x *AuthmateFrostFS) CreateContainer(ctx context.Context, prm authmate.PrmC
}
// GetCredsObject implements authmate.FrostFS interface method.
func (x *AuthmateFrostFS) GetCredsObject(ctx context.Context, prm tokens.PrmGetCredsObject) (*object.Object, error) {
func (x *AuthmateFrostFS) GetCredsObject(ctx context.Context, prm tokens.PrmGetCredsObject) (obj *object.Object, err error) {
var readObjAddr *oid.Address
defer func() {
if prm.FallbackAddress == nil {
return
}
if err != nil && (readObjAddr == nil || !readObjAddr.Equals(*prm.FallbackAddress)) {
obj, err = x.readObject(ctx, *prm.FallbackAddress)
}
}()
versions, err := x.getCredVersions(ctx, prm.Container, prm.AccessKeyID)
if err != nil {
return nil, err
@ -92,9 +102,17 @@ func (x *AuthmateFrostFS) GetCredsObject(ctx context.Context, prm tokens.PrmGetC
return nil, fmt.Errorf("%w: '%s'", tokens.ErrCustomAccessKeyIDNotFound, prm.AccessKeyID)
}
readObjAddr = &oid.Address{}
readObjAddr.SetContainer(prm.Container)
readObjAddr.SetObject(credObjID)
return x.readObject(ctx, *readObjAddr)
}
func (x *AuthmateFrostFS) readObject(ctx context.Context, addr oid.Address) (*object.Object, error) {
res, err := x.frostFS.GetObject(ctx, frostfs.PrmObjectGet{
Container: prm.Container,
Object: credObjID,
Container: addr.Container(),
Object: addr.Object(),
})
if err != nil {
return nil, err

View file

@ -2,6 +2,7 @@ package frostfs
import (
"context"
"errors"
"strconv"
"strings"
"testing"
@ -13,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -177,6 +179,74 @@ func TestCredsObject(t *testing.T) {
require.Error(t, err)
})
})
t.Run("fallback", func(t *testing.T) {
t.Run("regular", func(t *testing.T) {
prm := tokens.PrmObjectCreate{
Container: cnrID,
Filepath: "regular-obj",
Payload: payload,
}
objID, err := frostfs.CreateObject(ctx, prm)
require.NoError(t, err)
accessKeyID := cnrID.EncodeToString() + "0" + objID.EncodeToString()
prmNew := tokens.PrmObjectCreate{
Container: cnrID,
Filepath: "regular-obj-new",
Payload: newPayload,
NewVersionForAccessKeyID: accessKeyID,
}
objIDNew, err := frostfs.CreateObject(ctx, prmNew)
require.NoError(t, err)
addr := newAddress(cnrID, objID)
prmFallback := tokens.PrmGetCredsObject{
Container: cnrID,
AccessKeyID: accessKeyID,
FallbackAddress: &addr,
}
frostfs.frostFS.(*layer.TestFrostFS).SetObjectError(newAddress(cnrID, objIDNew), errors.New("error"))
obj, err := frostfs.GetCredsObject(ctx, prmFallback)
require.NoError(t, err)
assertParamsSet(t, prm, obj, userID)
})
t.Run("custom", func(t *testing.T) {
prm := tokens.PrmObjectCreate{
Container: cnrID,
Filepath: "custom-obj",
ExpirationEpoch: 10,
Payload: payload,
CustomAccessKey: "custom-access-key-id",
}
objID, err := frostfs.CreateObject(ctx, prm)
require.NoError(t, err)
addr := newAddress(cnrID, objID)
prmFallback := tokens.PrmGetCredsObject{
Container: cnrID,
AccessKeyID: "unknown",
FallbackAddress: &addr,
}
obj, err := frostfs.GetCredsObject(ctx, prmFallback)
require.NoError(t, err)
assertParamsSet(t, prm, obj, userID)
})
})
}
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(obj)
return addr
}
func assertParamsSet(t *testing.T, prm tokens.PrmObjectCreate, obj *object.Object, userID user.ID) {