forked from TrueCloudLab/frostfs-s3-gw
Denis Kirillov
5d6d5e41d0
Add two strategy for PutBucketSettings request retryer:
* exponential backoff (increasing up to `max_backoff` delays with jitter)
* constant backoff (always the same `max_backoff` delay between requests)
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
(cherry picked from commit bb81afc14a
)
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
460 lines
12 KiB
Go
460 lines
12 KiB
Go
package handler
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/xml"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/exp/slices"
|
|
)
|
|
|
|
type handlerContext struct {
|
|
owner user.ID
|
|
t *testing.T
|
|
h *handler
|
|
tp *layer.TestFrostFS
|
|
tree *tree.Tree
|
|
context context.Context
|
|
config *configMock
|
|
|
|
layerFeatures *layer.FeatureSettingsMock
|
|
treeMock *tree.ServiceClientMemory
|
|
cache *layer.Cache
|
|
}
|
|
|
|
func (hc *handlerContext) Handler() *handler {
|
|
return hc.h
|
|
}
|
|
|
|
func (hc *handlerContext) MockedPool() *layer.TestFrostFS {
|
|
return hc.tp
|
|
}
|
|
|
|
func (hc *handlerContext) Layer() layer.Client {
|
|
return hc.h.obj
|
|
}
|
|
|
|
func (hc *handlerContext) Context() context.Context {
|
|
return hc.context
|
|
}
|
|
|
|
type configMock struct {
|
|
defaultPolicy netmap.PlacementPolicy
|
|
copiesNumbers map[string][]uint32
|
|
defaultCopiesNumbers []uint32
|
|
bypassContentEncodingInChunks bool
|
|
md5Enabled bool
|
|
aclEnabled bool
|
|
}
|
|
|
|
func (c *configMock) DefaultPlacementPolicy(_ string) netmap.PlacementPolicy {
|
|
return c.defaultPolicy
|
|
}
|
|
|
|
func (c *configMock) PlacementPolicy(_, _ string) (netmap.PlacementPolicy, bool) {
|
|
return netmap.PlacementPolicy{}, false
|
|
}
|
|
|
|
func (c *configMock) CopiesNumbers(_, locationConstraint string) ([]uint32, bool) {
|
|
result, ok := c.copiesNumbers[locationConstraint]
|
|
return result, ok
|
|
}
|
|
|
|
func (c *configMock) DefaultCopiesNumbers(_ string) []uint32 {
|
|
return c.defaultCopiesNumbers
|
|
}
|
|
|
|
func (c *configMock) NewXMLDecoder(r io.Reader) *xml.Decoder {
|
|
return xml.NewDecoder(r)
|
|
}
|
|
|
|
func (c *configMock) BypassContentEncodingInChunks() bool {
|
|
return c.bypassContentEncodingInChunks
|
|
}
|
|
|
|
func (c *configMock) DefaultMaxAge() int {
|
|
return 0
|
|
}
|
|
|
|
func (c *configMock) NotificatorEnabled() bool {
|
|
return false
|
|
}
|
|
|
|
func (c *configMock) ResolveZoneList() []string {
|
|
return []string{}
|
|
}
|
|
|
|
func (c *configMock) IsResolveListAllow() bool {
|
|
return false
|
|
}
|
|
|
|
func (c *configMock) CompleteMultipartKeepalive() time.Duration {
|
|
return time.Duration(0)
|
|
}
|
|
|
|
func (c *configMock) MD5Enabled() bool {
|
|
return c.md5Enabled
|
|
}
|
|
|
|
func (c *configMock) ACLEnabled() bool {
|
|
return c.aclEnabled
|
|
}
|
|
|
|
func (c *configMock) ResolveNamespaceAlias(ns string) string {
|
|
return ns
|
|
}
|
|
|
|
func (c *configMock) RetryMaxAttempts() int {
|
|
return 1
|
|
}
|
|
|
|
func (c *configMock) RetryMaxBackoff() time.Duration {
|
|
return 0
|
|
}
|
|
|
|
func (c *configMock) RetryStrategy() RetryStrategy {
|
|
return RetryStrategyConstant
|
|
}
|
|
|
|
func prepareHandlerContext(t *testing.T) *handlerContext {
|
|
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
|
|
}
|
|
|
|
func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
|
|
return prepareHandlerContextBase(t, getMinCacheConfig(zap.NewExample()))
|
|
}
|
|
|
|
func prepareHandlerContextBase(t *testing.T, cacheCfg *layer.CachesConfig) *handlerContext {
|
|
key, err := keys.NewPrivateKey()
|
|
require.NoError(t, err)
|
|
|
|
l := zap.NewExample()
|
|
tp := layer.NewTestFrostFS(key)
|
|
|
|
testResolver := &resolver.Resolver{Name: "test_resolver"}
|
|
testResolver.SetResolveFunc(func(_ context.Context, name string) (cid.ID, error) {
|
|
return tp.ContainerID(name)
|
|
})
|
|
|
|
var owner user.ID
|
|
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
|
|
|
memCli, err := tree.NewTreeServiceClientMemory()
|
|
require.NoError(t, err)
|
|
|
|
treeMock := tree.NewTree(memCli, zap.NewExample())
|
|
|
|
features := &layer.FeatureSettingsMock{}
|
|
|
|
layerCfg := &layer.Config{
|
|
Cache: layer.NewCache(cacheCfg),
|
|
AnonKey: layer.AnonymousKey{Key: key},
|
|
Resolver: testResolver,
|
|
TreeService: treeMock,
|
|
Features: features,
|
|
GateOwner: owner,
|
|
}
|
|
|
|
var pp netmap.PlacementPolicy
|
|
err = pp.DecodeString("REP 1")
|
|
require.NoError(t, err)
|
|
|
|
cfg := &configMock{
|
|
defaultPolicy: pp,
|
|
}
|
|
h := &handler{
|
|
log: l,
|
|
obj: layer.NewLayer(l, tp, layerCfg),
|
|
cfg: cfg,
|
|
ape: newAPEMock(),
|
|
}
|
|
|
|
return &handlerContext{
|
|
owner: owner,
|
|
t: t,
|
|
h: h,
|
|
tp: tp,
|
|
tree: treeMock,
|
|
context: middleware.SetBoxData(context.Background(), newTestAccessBox(t, key)),
|
|
config: cfg,
|
|
|
|
layerFeatures: features,
|
|
treeMock: memCli,
|
|
cache: layerCfg.Cache,
|
|
}
|
|
}
|
|
|
|
func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
|
|
minCacheCfg := &cache.Config{
|
|
Size: 1,
|
|
Lifetime: 1,
|
|
Logger: logger,
|
|
}
|
|
return &layer.CachesConfig{
|
|
Logger: logger,
|
|
Objects: minCacheCfg,
|
|
ObjectsList: minCacheCfg,
|
|
SessionList: minCacheCfg,
|
|
Names: minCacheCfg,
|
|
Buckets: minCacheCfg,
|
|
System: minCacheCfg,
|
|
AccessControl: minCacheCfg,
|
|
}
|
|
}
|
|
|
|
type apeMock struct {
|
|
chainMap map[engine.Target][]*chain.Chain
|
|
policyMap map[string][]byte
|
|
}
|
|
|
|
func newAPEMock() *apeMock {
|
|
return &apeMock{
|
|
chainMap: map[engine.Target][]*chain.Chain{},
|
|
policyMap: map[string][]byte{},
|
|
}
|
|
}
|
|
|
|
func (a *apeMock) AddChain(target engine.Target, c *chain.Chain) error {
|
|
list := a.chainMap[target]
|
|
|
|
ind := slices.IndexFunc(list, func(item *chain.Chain) bool { return bytes.Equal(item.ID, c.ID) })
|
|
if ind != -1 {
|
|
list[ind] = c
|
|
} else {
|
|
list = append(list, c)
|
|
}
|
|
|
|
a.chainMap[target] = list
|
|
return nil
|
|
}
|
|
|
|
func (a *apeMock) RemoveChain(target engine.Target, chainID chain.ID) error {
|
|
a.chainMap[target] = slices.DeleteFunc(a.chainMap[target], func(item *chain.Chain) bool { return bytes.Equal(item.ID, chainID) })
|
|
return nil
|
|
}
|
|
|
|
func (a *apeMock) ListChains(target engine.Target) ([]*chain.Chain, error) {
|
|
return a.chainMap[target], nil
|
|
}
|
|
|
|
func (a *apeMock) PutPolicy(namespace string, cnrID cid.ID, policy []byte) error {
|
|
a.policyMap[namespace+cnrID.EncodeToString()] = policy
|
|
return nil
|
|
}
|
|
|
|
func (a *apeMock) DeletePolicy(namespace string, cnrID cid.ID) error {
|
|
delete(a.policyMap, namespace+cnrID.EncodeToString())
|
|
return nil
|
|
}
|
|
|
|
func (a *apeMock) PutBucketPolicy(ns string, cnrID cid.ID, policy []byte, chain []*chain.Chain) error {
|
|
if err := a.PutPolicy(ns, cnrID, policy); err != nil {
|
|
return err
|
|
}
|
|
|
|
for i := range chain {
|
|
if err := a.AddChain(engine.ContainerTarget(cnrID.EncodeToString()), chain[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *apeMock) DeleteBucketPolicy(ns string, cnrID cid.ID, chainIDs []chain.ID) error {
|
|
if err := a.DeletePolicy(ns, cnrID); err != nil {
|
|
return err
|
|
}
|
|
for i := range chainIDs {
|
|
if err := a.RemoveChain(engine.ContainerTarget(cnrID.EncodeToString()), chainIDs[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *apeMock) GetBucketPolicy(ns string, cnrID cid.ID) ([]byte, error) {
|
|
policy, ok := a.policyMap[ns+cnrID.EncodeToString()]
|
|
if !ok {
|
|
return nil, errors.New("not found")
|
|
}
|
|
|
|
return policy, nil
|
|
}
|
|
|
|
func (a *apeMock) SaveACLChains(cid string, chains []*chain.Chain) error {
|
|
for i := range chains {
|
|
if err := a.AddChain(engine.ContainerTarget(cid), chains[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {
|
|
info := createBucket(hc, bktName)
|
|
return info.BktInfo
|
|
}
|
|
|
|
func createTestBucketWithLock(hc *handlerContext, bktName string, conf *data.ObjectLockConfiguration) *data.BucketInfo {
|
|
res, err := hc.MockedPool().CreateContainer(hc.Context(), layer.PrmContainerCreate{
|
|
Creator: hc.owner,
|
|
Name: bktName,
|
|
AdditionalAttributes: [][2]string{{layer.AttributeLockEnabled, "true"}},
|
|
})
|
|
require.NoError(hc.t, err)
|
|
|
|
var ownerID user.ID
|
|
|
|
bktInfo := &data.BucketInfo{
|
|
CID: res.ContainerID,
|
|
Name: bktName,
|
|
ObjectLockEnabled: true,
|
|
Owner: ownerID,
|
|
HomomorphicHashDisabled: res.HomomorphicHashDisabled,
|
|
}
|
|
|
|
key, err := keys.NewPrivateKey()
|
|
require.NoError(hc.t, err)
|
|
|
|
sp := &layer.PutSettingsParams{
|
|
BktInfo: bktInfo,
|
|
Settings: &data.BucketSettings{
|
|
Versioning: data.VersioningEnabled,
|
|
LockConfiguration: conf,
|
|
OwnerKey: key.PublicKey(),
|
|
},
|
|
}
|
|
|
|
err = hc.Layer().PutBucketSettings(hc.Context(), sp)
|
|
require.NoError(hc.t, err)
|
|
|
|
return bktInfo
|
|
}
|
|
|
|
func createTestObject(hc *handlerContext, bktInfo *data.BucketInfo, objName string, encryption encryption.Params) *data.ObjectInfo {
|
|
content := make([]byte, 1024)
|
|
_, err := rand.Read(content)
|
|
require.NoError(hc.t, err)
|
|
|
|
header := map[string]string{
|
|
object.AttributeTimestamp: strconv.FormatInt(time.Now().UTC().Unix(), 10),
|
|
}
|
|
|
|
extObjInfo, err := hc.Layer().PutObject(hc.Context(), &layer.PutObjectParams{
|
|
BktInfo: bktInfo,
|
|
Object: objName,
|
|
Size: uint64(len(content)),
|
|
Reader: bytes.NewReader(content),
|
|
Header: header,
|
|
Encryption: encryption,
|
|
})
|
|
require.NoError(hc.t, err)
|
|
|
|
return extObjInfo.ObjectInfo
|
|
}
|
|
|
|
func prepareTestRequest(hc *handlerContext, bktName, objName string, body interface{}) (*httptest.ResponseRecorder, *http.Request) {
|
|
return prepareTestFullRequest(hc, bktName, objName, make(url.Values), body)
|
|
}
|
|
|
|
func prepareTestFullRequest(hc *handlerContext, bktName, objName string, query url.Values, body interface{}) (*httptest.ResponseRecorder, *http.Request) {
|
|
rawBody, err := xml.Marshal(body)
|
|
require.NoError(hc.t, err)
|
|
|
|
return prepareTestRequestWithQuery(hc, bktName, objName, query, rawBody)
|
|
}
|
|
|
|
func prepareTestRequestWithQuery(hc *handlerContext, bktName, objName string, query url.Values, body []byte) (*httptest.ResponseRecorder, *http.Request) {
|
|
w := httptest.NewRecorder()
|
|
r := httptest.NewRequest(http.MethodPut, defaultURL, bytes.NewReader(body))
|
|
r.URL.RawQuery = query.Encode()
|
|
|
|
reqInfo := middleware.NewReqInfo(w, r, middleware.ObjectRequest{Bucket: bktName, Object: objName})
|
|
r = r.WithContext(middleware.SetReqInfo(hc.Context(), reqInfo))
|
|
|
|
return w, r
|
|
}
|
|
|
|
func prepareTestPayloadRequest(hc *handlerContext, bktName, objName string, payload io.Reader) (*httptest.ResponseRecorder, *http.Request) {
|
|
w := httptest.NewRecorder()
|
|
r := httptest.NewRequest(http.MethodPut, defaultURL, payload)
|
|
|
|
reqInfo := middleware.NewReqInfo(w, r, middleware.ObjectRequest{Bucket: bktName, Object: objName})
|
|
r = r.WithContext(middleware.SetReqInfo(hc.Context(), reqInfo))
|
|
|
|
return w, r
|
|
}
|
|
|
|
func parseTestResponse(t *testing.T, response *httptest.ResponseRecorder, body interface{}) {
|
|
assertStatus(t, response, http.StatusOK)
|
|
err := xml.NewDecoder(response.Result().Body).Decode(body)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func existInMockedFrostFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo) bool {
|
|
p := &layer.GetObjectParams{
|
|
BucketInfo: bktInfo,
|
|
ObjectInfo: objInfo,
|
|
}
|
|
|
|
objPayload, err := tc.Layer().GetObject(tc.Context(), p)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
_, err = io.ReadAll(objPayload)
|
|
require.NoError(tc.t, err)
|
|
return true
|
|
}
|
|
|
|
func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID {
|
|
bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
|
|
require.NoError(t, err)
|
|
|
|
return tc.MockedPool().AllObjects(bktInfo.CID)
|
|
}
|
|
|
|
func assertStatus(t *testing.T, w *httptest.ResponseRecorder, status int) {
|
|
if w.Code != status {
|
|
resp, err := io.ReadAll(w.Result().Body)
|
|
require.NoError(t, err)
|
|
require.Failf(t, "unexpected status", "expected: %d, actual: %d, resp: '%s'", status, w.Code, string(resp))
|
|
}
|
|
}
|
|
|
|
func readResponse(t *testing.T, w *httptest.ResponseRecorder, status int, model interface{}) {
|
|
assertStatus(t, w, status)
|
|
if status == http.StatusOK {
|
|
err := xml.NewDecoder(w.Result().Body).Decode(model)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|