feature/test-configured_buffers #197

Merged
alexvanin merged 4 commits from dkirillov/frostfs-s3-gw:feature/test-configured_buffers into master 2023-10-11 12:32:50 +00:00
9 changed files with 54 additions and 26 deletions
Showing only changes of commit 4fc23ffa99 - Show all commits

View file

@ -22,13 +22,14 @@ const (
type ( type (
// BucketInfo stores basic bucket data. // BucketInfo stores basic bucket data.
BucketInfo struct { BucketInfo struct {
Name string // container name from system attribute Name string // container name from system attribute
Zone string // container zone from system attribute Zone string // container zone from system attribute
CID cid.ID CID cid.ID
Owner user.ID Owner user.ID
Created time.Time Created time.Time
LocationConstraint string LocationConstraint string
ObjectLockEnabled bool ObjectLockEnabled bool
HomomorphicHashDisabled bool
} }
// ObjectInfo holds S3 object data. // ObjectInfo holds S3 object data.

View file

@ -213,7 +213,7 @@ func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {
} }
func createTestBucketWithLock(hc *handlerContext, bktName string, conf *data.ObjectLockConfiguration) *data.BucketInfo { func createTestBucketWithLock(hc *handlerContext, bktName string, conf *data.ObjectLockConfiguration) *data.BucketInfo {
cnrID, err := hc.MockedPool().CreateContainer(hc.Context(), layer.PrmContainerCreate{ res, err := hc.MockedPool().CreateContainer(hc.Context(), layer.PrmContainerCreate{
Creator: hc.owner, Creator: hc.owner,
Name: bktName, Name: bktName,
AdditionalAttributes: [][2]string{{layer.AttributeLockEnabled, "true"}}, AdditionalAttributes: [][2]string{{layer.AttributeLockEnabled, "true"}},
@ -223,10 +223,11 @@ func createTestBucketWithLock(hc *handlerContext, bktName string, conf *data.Obj
var ownerID user.ID var ownerID user.ID
bktInfo := &data.BucketInfo{ bktInfo := &data.BucketInfo{
CID: cnrID, CID: res.ContainerID,
Name: bktName, Name: bktName,
ObjectLockEnabled: true, ObjectLockEnabled: true,
Owner: ownerID, Owner: ownerID,
HomomorphicHashDisabled: res.HomomorphicHashDisabled,
} }
sp := &layer.PutSettingsParams{ sp := &layer.PutSettingsParams{

View file

@ -59,6 +59,7 @@ func (n *layer) containerInfo(ctx context.Context, idCnr cid.ID) (*data.BucketIn
} }
info.Created = container.CreatedAt(cnr) info.Created = container.CreatedAt(cnr)
info.LocationConstraint = cnr.Attribute(attributeLocationConstraint) info.LocationConstraint = cnr.Attribute(attributeLocationConstraint)
info.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(cnr)
attrLockEnabled := cnr.Attribute(AttributeLockEnabled) attrLockEnabled := cnr.Attribute(AttributeLockEnabled)
if len(attrLockEnabled) > 0 { if len(attrLockEnabled) > 0 {
@ -122,7 +123,7 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
}) })
} }
idCnr, err := n.frostFS.CreateContainer(ctx, PrmContainerCreate{ res, err := n.frostFS.CreateContainer(ctx, PrmContainerCreate{
Creator: bktInfo.Owner, Creator: bktInfo.Owner,
Policy: p.Policy, Policy: p.Policy,
Name: p.Name, Name: p.Name,
@ -134,7 +135,8 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
return nil, fmt.Errorf("create container: %w", err) return nil, fmt.Errorf("create container: %w", err)
} }
bktInfo.CID = idCnr bktInfo.CID = res.ContainerID
bktInfo.HomomorphicHashDisabled = res.HomomorphicHashDisabled
if err = n.setContainerEACLTable(ctx, bktInfo.CID, p.EACL, p.SessionEACL); err != nil { if err = n.setContainerEACLTable(ctx, bktInfo.CID, p.EACL, p.SessionEACL); err != nil {
return nil, fmt.Errorf("set container eacl: %w", err) return nil, fmt.Errorf("set container eacl: %w", err)

View file

@ -43,6 +43,12 @@ type PrmContainerCreate struct {
AdditionalAttributes [][2]string AdditionalAttributes [][2]string
} }
// ContainerCreateResult is a result parameter of FrostFS.CreateContainer operation.
type ContainerCreateResult struct {
ContainerID cid.ID
HomomorphicHashDisabled bool
}
// PrmAuth groups authentication parameters for the FrostFS operation. // PrmAuth groups authentication parameters for the FrostFS operation.
type PrmAuth struct { type PrmAuth struct {
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional. // Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
@ -114,6 +120,9 @@ type PrmObjectCreate struct {
// Enables client side object preparing. // Enables client side object preparing.
ClientCut bool ClientCut bool
// Disables using Tillich-Zémor hash for payload.
WithoutHomomorphicHash bool
} }
// PrmObjectDelete groups parameters of FrostFS.DeleteObject operation. // PrmObjectDelete groups parameters of FrostFS.DeleteObject operation.
@ -162,7 +171,7 @@ type FrostFS interface {
// //
// It returns exactly one non-zero value. It returns any error encountered which // It returns exactly one non-zero value. It returns any error encountered which
// prevented the container from being created. // prevented the container from being created.
CreateContainer(context.Context, PrmContainerCreate) (cid.ID, error) CreateContainer(context.Context, PrmContainerCreate) (*ContainerCreateResult, error)
// Container reads a container from FrostFS by ID. // Container reads a container from FrostFS by ID.
// //

View file

@ -29,6 +29,10 @@ type FeatureSettingsMock struct {
clientCut bool clientCut bool
} }
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
return 0
}
func (k *FeatureSettingsMock) ClientCut() bool { func (k *FeatureSettingsMock) ClientCut() bool {
return k.clientCut return k.clientCut
} }
@ -114,7 +118,7 @@ func (t *TestFrostFS) ContainerID(name string) (cid.ID, error) {
return cid.ID{}, fmt.Errorf("not found") return cid.ID{}, fmt.Errorf("not found")
} }
func (t *TestFrostFS) CreateContainer(_ context.Context, prm PrmContainerCreate) (cid.ID, error) { func (t *TestFrostFS) CreateContainer(_ context.Context, prm PrmContainerCreate) (*ContainerCreateResult, error) {
var cnr container.Container var cnr container.Container
cnr.Init() cnr.Init()
cnr.SetOwner(prm.Creator) cnr.SetOwner(prm.Creator)
@ -141,14 +145,14 @@ func (t *TestFrostFS) CreateContainer(_ context.Context, prm PrmContainerCreate)
b := make([]byte, 32) b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil { if _, err := io.ReadFull(rand.Reader, b); err != nil {
return cid.ID{}, err return nil, err
} }
var id cid.ID var id cid.ID
id.SetSHA256(sha256.Sum256(b)) id.SetSHA256(sha256.Sum256(b))
t.containers[id.EncodeToString()] = &cnr t.containers[id.EncodeToString()] = &cnr
return id, nil return &ContainerCreateResult{ContainerID: id}, nil
} }
func (t *TestFrostFS) DeleteContainer(_ context.Context, cnrID cid.ID, _ *session.Container) error { func (t *TestFrostFS) DeleteContainer(_ context.Context, cnrID cid.ID, _ *session.Container) error {

View file

@ -460,6 +460,7 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) { func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) {
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
prm.ClientCut = n.features.ClientCut() prm.ClientCut = n.features.ClientCut()
prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled
var size uint64 var size uint64
hash := sha256.New() hash := sha256.New()
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) { prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {

View file

@ -153,7 +153,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
tp := NewTestFrostFS(key) tp := NewTestFrostFS(key)
bktName := "testbucket1" bktName := "testbucket1"
bktID, err := tp.CreateContainer(ctx, PrmContainerCreate{ res, err := tp.CreateContainer(ctx, PrmContainerCreate{
Name: bktName, Name: bktName,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -177,9 +177,10 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
ctx: ctx, ctx: ctx,
layer: NewLayer(logger, tp, layerCfg), layer: NewLayer(logger, tp, layerCfg),
bktInfo: &data.BucketInfo{ bktInfo: &data.BucketInfo{
Name: bktName, Name: bktName,
Owner: owner, Owner: owner,
CID: bktID, CID: res.ContainerID,
HomomorphicHashDisabled: res.HomomorphicHashDisabled,
}, },
obj: "obj1", obj: "obj1",
t: t, t: t,

View file

@ -57,12 +57,16 @@ func (x *AuthmateFrostFS) CreateContainer(ctx context.Context, prm authmate.PrmC
basicACL.AllowOp(acl.OpObjectHead, acl.RoleOthers) basicACL.AllowOp(acl.OpObjectHead, acl.RoleOthers)
basicACL.AllowOp(acl.OpObjectSearch, acl.RoleOthers) basicACL.AllowOp(acl.OpObjectSearch, acl.RoleOthers)
return x.frostFS.CreateContainer(ctx, layer.PrmContainerCreate{ res, err := x.frostFS.CreateContainer(ctx, layer.PrmContainerCreate{
Creator: prm.Owner, Creator: prm.Owner,
Policy: prm.Policy, Policy: prm.Policy,
Name: prm.FriendlyName, Name: prm.FriendlyName,
BasicACL: basicACL, BasicACL: basicACL,
}) })
if err != nil {
return cid.ID{}, err
}
return res.ContainerID, nil
} }
// GetCredsPayload implements authmate.FrostFS interface method. // GetCredsPayload implements authmate.FrostFS interface method.

View file

@ -106,7 +106,7 @@ var basicACLZero acl.Basic
// CreateContainer implements frostfs.FrostFS interface method. // CreateContainer implements frostfs.FrostFS interface method.
// //
// If prm.BasicACL is zero, 'eacl-public-read-write' is used. // If prm.BasicACL is zero, 'eacl-public-read-write' is used.
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (cid.ID, error) { func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
if prm.BasicACL == basicACLZero { if prm.BasicACL == basicACLZero {
prm.BasicACL = acl.PublicRWExtended prm.BasicACL = acl.PublicRWExtended
} }
@ -137,7 +137,7 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
err := pool.SyncContainerWithNetwork(ctx, &cnr, x.pool) err := pool.SyncContainerWithNetwork(ctx, &cnr, x.pool)
if err != nil { if err != nil {
return cid.ID{}, handleObjectError("sync container with the network state", err) return nil, handleObjectError("sync container with the network state", err)
} }
prmPut := pool.PrmContainerPut{ prmPut := pool.PrmContainerPut{
@ -150,7 +150,10 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
// send request to save the container // send request to save the container
idCnr, err := x.pool.PutContainer(ctx, prmPut) idCnr, err := x.pool.PutContainer(ctx, prmPut)
return idCnr, handleObjectError("save container via connection pool", err) return &layer.ContainerCreateResult{
ContainerID: idCnr,
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(cnr),
}, handleObjectError("save container via connection pool", err)
} }
// UserContainers implements frostfs.FrostFS interface method. // UserContainers implements frostfs.FrostFS interface method.
@ -244,6 +247,8 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
prmPut.SetPayload(prm.Payload) prmPut.SetPayload(prm.Payload)
prmPut.SetCopiesNumberVector(prm.CopiesNumber) prmPut.SetCopiesNumberVector(prm.CopiesNumber)
prmPut.SetClientCut(prm.ClientCut) prmPut.SetClientCut(prm.ClientCut)
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
if prm.BearerToken != nil { if prm.BearerToken != nil {
prmPut.UseBearer(*prm.BearerToken) prmPut.UseBearer(*prm.BearerToken)