feature/test-configured_buffers #197
18 changed files with 93 additions and 33 deletions
|
@ -30,6 +30,7 @@ This document outlines major changes between releases.
|
|||
- Implement chunk uploading (#106)
|
||||
- Add new `kludge.bypass_content_encoding_check_in_chunks` config param (#146)
|
||||
- Add new `frostfs.client_cut` config param (#192)
|
||||
- Add new `frostfs.buffer_max_size_for_put` config param and sync TZ hash for PUT operations (#197)
|
||||
|
||||
### Changed
|
||||
- Update prometheus to v1.15.0 (#94)
|
||||
|
|
|
@ -29,6 +29,7 @@ type (
|
|||
Created time.Time
|
||||
LocationConstraint string
|
||||
ObjectLockEnabled bool
|
||||
HomomorphicHashDisabled bool
|
||||
}
|
||||
|
||||
// ObjectInfo holds S3 object data.
|
||||
|
|
|
@ -213,7 +213,7 @@ func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {
|
|||
}
|
||||
|
||||
func createTestBucketWithLock(hc *handlerContext, bktName string, conf *data.ObjectLockConfiguration) *data.BucketInfo {
|
||||
cnrID, err := hc.MockedPool().CreateContainer(hc.Context(), layer.PrmContainerCreate{
|
||||
res, err := hc.MockedPool().CreateContainer(hc.Context(), layer.PrmContainerCreate{
|
||||
Creator: hc.owner,
|
||||
Name: bktName,
|
||||
AdditionalAttributes: [][2]string{{layer.AttributeLockEnabled, "true"}},
|
||||
|
@ -223,10 +223,11 @@ func createTestBucketWithLock(hc *handlerContext, bktName string, conf *data.Obj
|
|||
var ownerID user.ID
|
||||
|
||||
bktInfo := &data.BucketInfo{
|
||||
CID: cnrID,
|
||||
CID: res.ContainerID,
|
||||
Name: bktName,
|
||||
ObjectLockEnabled: true,
|
||||
Owner: ownerID,
|
||||
HomomorphicHashDisabled: res.HomomorphicHashDisabled,
|
||||
}
|
||||
|
||||
sp := &layer.PutSettingsParams{
|
||||
|
|
|
@ -59,6 +59,7 @@ func (n *layer) containerInfo(ctx context.Context, idCnr cid.ID) (*data.BucketIn
|
|||
}
|
||||
info.Created = container.CreatedAt(cnr)
|
||||
info.LocationConstraint = cnr.Attribute(attributeLocationConstraint)
|
||||
info.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(cnr)
|
||||
|
||||
attrLockEnabled := cnr.Attribute(AttributeLockEnabled)
|
||||
if len(attrLockEnabled) > 0 {
|
||||
|
@ -122,7 +123,7 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
|
|||
})
|
||||
}
|
||||
|
||||
idCnr, err := n.frostFS.CreateContainer(ctx, PrmContainerCreate{
|
||||
res, err := n.frostFS.CreateContainer(ctx, PrmContainerCreate{
|
||||
Creator: bktInfo.Owner,
|
||||
Policy: p.Policy,
|
||||
Name: p.Name,
|
||||
|
@ -134,7 +135,8 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
|
|||
return nil, fmt.Errorf("create container: %w", err)
|
||||
}
|
||||
|
||||
bktInfo.CID = idCnr
|
||||
bktInfo.CID = res.ContainerID
|
||||
bktInfo.HomomorphicHashDisabled = res.HomomorphicHashDisabled
|
||||
|
||||
if err = n.setContainerEACLTable(ctx, bktInfo.CID, p.EACL, p.SessionEACL); err != nil {
|
||||
return nil, fmt.Errorf("set container eacl: %w", err)
|
||||
|
|
|
@ -43,6 +43,12 @@ type PrmContainerCreate struct {
|
|||
AdditionalAttributes [][2]string
|
||||
}
|
||||
|
||||
// ContainerCreateResult is a result parameter of FrostFS.CreateContainer operation.
|
||||
type ContainerCreateResult struct {
|
||||
ContainerID cid.ID
|
||||
HomomorphicHashDisabled bool
|
||||
}
|
||||
|
||||
// PrmAuth groups authentication parameters for the FrostFS operation.
|
||||
type PrmAuth struct {
|
||||
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
|
||||
|
@ -114,6 +120,12 @@ type PrmObjectCreate struct {
|
|||
|
||||
// Enables client side object preparing.
|
||||
ClientCut bool
|
||||
|
||||
// Disables using Tillich-Zémor hash for payload.
|
||||
WithoutHomomorphicHash bool
|
||||
|
||||
// Sets max buffer size to read payload.
|
||||
BufferMaxSize uint64
|
||||
}
|
||||
|
||||
// PrmObjectDelete groups parameters of FrostFS.DeleteObject operation.
|
||||
|
@ -162,7 +174,7 @@ type FrostFS interface {
|
|||
//
|
||||
// It returns exactly one non-zero value. It returns any error encountered which
|
||||
// prevented the container from being created.
|
||||
CreateContainer(context.Context, PrmContainerCreate) (cid.ID, error)
|
||||
CreateContainer(context.Context, PrmContainerCreate) (*ContainerCreateResult, error)
|
||||
|
||||
// Container reads a container from FrostFS by ID.
|
||||
//
|
||||
|
|
|
@ -29,6 +29,10 @@ type FeatureSettingsMock struct {
|
|||
clientCut bool
|
||||
}
|
||||
|
||||
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (k *FeatureSettingsMock) ClientCut() bool {
|
||||
return k.clientCut
|
||||
}
|
||||
|
@ -114,7 +118,7 @@ func (t *TestFrostFS) ContainerID(name string) (cid.ID, error) {
|
|||
return cid.ID{}, fmt.Errorf("not found")
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) CreateContainer(_ context.Context, prm PrmContainerCreate) (cid.ID, error) {
|
||||
func (t *TestFrostFS) CreateContainer(_ context.Context, prm PrmContainerCreate) (*ContainerCreateResult, error) {
|
||||
var cnr container.Container
|
||||
cnr.Init()
|
||||
cnr.SetOwner(prm.Creator)
|
||||
|
@ -141,14 +145,14 @@ func (t *TestFrostFS) CreateContainer(_ context.Context, prm PrmContainerCreate)
|
|||
|
||||
b := make([]byte, 32)
|
||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||
return cid.ID{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var id cid.ID
|
||||
id.SetSHA256(sha256.Sum256(b))
|
||||
t.containers[id.EncodeToString()] = &cnr
|
||||
|
||||
return id, nil
|
||||
return &ContainerCreateResult{ContainerID: id}, nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) DeleteContainer(_ context.Context, cnrID cid.ID, _ *session.Container) error {
|
||||
|
|
|
@ -48,6 +48,7 @@ type (
|
|||
|
||||
FeatureSettings interface {
|
||||
ClientCut() bool
|
||||
BufferMaxSizeForPut() uint64
|
||||
}
|
||||
|
||||
layer struct {
|
||||
|
|
|
@ -460,6 +460,8 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
|
|||
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) {
|
||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||
prm.ClientCut = n.features.ClientCut()
|
||||
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
|
||||
prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled
|
||||
var size uint64
|
||||
hash := sha256.New()
|
||||
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
|
||||
|
|
|
@ -153,7 +153,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
|||
tp := NewTestFrostFS(key)
|
||||
|
||||
bktName := "testbucket1"
|
||||
bktID, err := tp.CreateContainer(ctx, PrmContainerCreate{
|
||||
res, err := tp.CreateContainer(ctx, PrmContainerCreate{
|
||||
Name: bktName,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -179,7 +179,8 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
|||
bktInfo: &data.BucketInfo{
|
||||
Name: bktName,
|
||||
Owner: owner,
|
||||
CID: bktID,
|
||||
CID: res.ContainerID,
|
||||
HomomorphicHashDisabled: res.HomomorphicHashDisabled,
|
||||
},
|
||||
obj: "obj1",
|
||||
t: t,
|
||||
|
|
|
@ -85,6 +85,7 @@ type (
|
|||
defaultXMLNSForCompleteMultipart bool
|
||||
bypassContentEncodingInChunks bool
|
||||
clientCut bool
|
||||
maxBufferSizeForPut uint64
|
||||
}
|
||||
|
||||
maxClientsConfig struct {
|
||||
|
@ -189,6 +190,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
|||
settings.setBypassContentEncodingInChunks(v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
||||
settings.setClientCut(v.GetBool(cfgClientCut))
|
||||
settings.initPlacementPolicy(log.logger, v)
|
||||
settings.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut))
|
||||
|
||||
return settings
|
||||
}
|
||||
|
@ -217,6 +219,18 @@ func (s *appSettings) setClientCut(clientCut bool) {
|
|||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) BufferMaxSizeForPut() uint64 {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.maxBufferSizeForPut
|
||||
}
|
||||
|
||||
func (s *appSettings) setBufferMaxSizeForPut(size uint64) {
|
||||
s.mu.Lock()
|
||||
s.maxBufferSizeForPut = size
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) initPlacementPolicy(l *zap.Logger, v *viper.Viper) {
|
||||
defaultPolicy := fetchDefaultPolicy(l, v)
|
||||
regionMap := fetchRegionMappingPolicies(l, v)
|
||||
|
@ -589,6 +603,7 @@ func (a *App) updateSettings() {
|
|||
a.settings.useDefaultNamespaceForCompleteMultipart(a.cfg.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload))
|
||||
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
||||
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
||||
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
||||
}
|
||||
|
||||
func (a *App) startServices() {
|
||||
|
|
|
@ -149,6 +149,8 @@ const ( // Settings.
|
|||
cfgSetCopiesNumber = "frostfs.set_copies_number"
|
||||
// Enabling client side object preparing for PUT operations.
|
||||
cfgClientCut = "frostfs.client_cut"
|
||||
// Sets max buffer size for read payload in put operations.
|
||||
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
||||
|
||||
// List of allowed AccessKeyID prefixes.
|
||||
cfgAllowedAccessKeyIDPrefixes = "allowed_access_key_id_prefixes"
|
||||
|
@ -528,6 +530,9 @@ func newSettings() *viper.Viper {
|
|||
v.SetDefault(cfgPProfAddress, "localhost:8085")
|
||||
v.SetDefault(cfgPrometheusAddress, "localhost:8086")
|
||||
|
||||
// frostfs
|
||||
v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb
|
||||
|
||||
// kludge
|
||||
v.SetDefault(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload, false)
|
||||
v.SetDefault(cfgKludgeCompleteMultipartUploadKeepalive, 10*time.Second)
|
||||
|
|
|
@ -127,6 +127,8 @@ S3_GW_CORS_DEFAULT_MAX_AGE=600
|
|||
S3_GW_FROSTFS_SET_COPIES_NUMBER=0
|
||||
# This flag enables client side object preparing.
|
||||
S3_GW_FROSTFS_CLIENT_CUT=false
|
||||
# Sets max buffer size for read payload in put operations.
|
||||
S3_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
|
||||
|
||||
# List of allowed AccessKeyID prefixes
|
||||
# If not set, S3 GW will accept all AccessKeyIDs
|
||||
|
|
|
@ -152,6 +152,8 @@ frostfs:
|
|||
set_copies_number: [0]
|
||||
# This flag enables client side object preparing.
|
||||
client_cut: false
|
||||
# Sets max buffer size for read payload in put operations.
|
||||
buffer_max_size_for_put: 1048576
|
||||
|
||||
# List of allowed AccessKeyID prefixes
|
||||
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs
|
||||
|
|
|
@ -509,12 +509,14 @@ header for `PutObject`, `CopyObject`, `CreateMultipartUpload`.
|
|||
frostfs:
|
||||
set_copies_number: [0]
|
||||
client_cut: false
|
||||
buffer_max_size_for_put: 1048576 # 1mb
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|---------------------|------------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
|---------------------------|------------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `set_copies_number` | `[]uint32` | yes | `[0]` | Numbers of the object copies (for each replica) to consider PUT to FrostFS successful. <br/>Default value `[0]` or empty list means that object will be processed according to the container's placement policy |
|
||||
| `client_cut` | `bool` | yes | `false` | This flag enables client side object preparing. |
|
||||
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
|
||||
|
||||
# `resolve_bucket` section
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -5,7 +5,7 @@ go 1.20
|
|||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231003164722-60463871dbc2
|
||||
github.com/aws/aws-sdk-go v1.44.6
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/go-chi/chi/v5 v5.0.8
|
||||
|
|
4
go.sum
4
go.sum
|
@ -44,8 +44,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05 h1:OuViMF54N87FXmaBEpYw3jhzaLrJ/EWOlPL1wUkimE0=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231003164722-60463871dbc2 h1:PHZX/Gh59ZPNG10JtTjBkmKbhKNq84CKu+dJpbzPVOc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231003164722-60463871dbc2/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||
|
|
|
@ -57,12 +57,16 @@ func (x *AuthmateFrostFS) CreateContainer(ctx context.Context, prm authmate.PrmC
|
|||
basicACL.AllowOp(acl.OpObjectHead, acl.RoleOthers)
|
||||
basicACL.AllowOp(acl.OpObjectSearch, acl.RoleOthers)
|
||||
|
||||
return x.frostFS.CreateContainer(ctx, layer.PrmContainerCreate{
|
||||
res, err := x.frostFS.CreateContainer(ctx, layer.PrmContainerCreate{
|
||||
Creator: prm.Owner,
|
||||
Policy: prm.Policy,
|
||||
Name: prm.FriendlyName,
|
||||
BasicACL: basicACL,
|
||||
})
|
||||
if err != nil {
|
||||
return cid.ID{}, err
|
||||
}
|
||||
return res.ContainerID, nil
|
||||
}
|
||||
|
||||
// GetCredsPayload implements authmate.FrostFS interface method.
|
||||
|
|
|
@ -106,7 +106,7 @@ var basicACLZero acl.Basic
|
|||
// CreateContainer implements frostfs.FrostFS interface method.
|
||||
//
|
||||
// If prm.BasicACL is zero, 'eacl-public-read-write' is used.
|
||||
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (cid.ID, error) {
|
||||
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
|
||||
if prm.BasicACL == basicACLZero {
|
||||
prm.BasicACL = acl.PublicRWExtended
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
|
|||
|
||||
err := pool.SyncContainerWithNetwork(ctx, &cnr, x.pool)
|
||||
if err != nil {
|
||||
return cid.ID{}, handleObjectError("sync container with the network state", err)
|
||||
return nil, handleObjectError("sync container with the network state", err)
|
||||
}
|
||||
|
||||
prmPut := pool.PrmContainerPut{
|
||||
|
@ -150,7 +150,10 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
|
|||
|
||||
// send request to save the container
|
||||
idCnr, err := x.pool.PutContainer(ctx, prmPut)
|
||||
return idCnr, handleObjectError("save container via connection pool", err)
|
||||
return &layer.ContainerCreateResult{
|
||||
ContainerID: idCnr,
|
||||
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(cnr),
|
||||
}, handleObjectError("save container via connection pool", err)
|
||||
}
|
||||
|
||||
// UserContainers implements frostfs.FrostFS interface method.
|
||||
|
@ -244,6 +247,8 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
|
|||
prmPut.SetPayload(prm.Payload)
|
||||
prmPut.SetCopiesNumberVector(prm.CopiesNumber)
|
||||
prmPut.SetClientCut(prm.ClientCut)
|
||||
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
|
||||
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmPut.UseBearer(*prm.BearerToken)
|
||||
|
|
Loading…
Reference in a new issue