forked from TrueCloudLab/frostfs-sdk-go
[#66] transformer: Accept constructor in NextTarget
The code of frostfs-node is not yet ready to reuse egress target for multiple objects, let's postpone until #64. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
c42a6119ff
commit
d4fe9a193d
3 changed files with 14 additions and 11 deletions
|
@ -16,9 +16,10 @@ func TestChannelTarget(t *testing.T) {
|
||||||
|
|
||||||
ch := make(chan *objectSDK.Object, 10)
|
ch := make(chan *objectSDK.Object, 10)
|
||||||
tt := new(testTarget)
|
tt := new(testTarget)
|
||||||
|
ct := NewChannelTarget(ch)
|
||||||
|
|
||||||
chTarget, _ := newPayloadSizeLimiter(maxSize, NewChannelTarget(ch))
|
chTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectTarget { return ct })
|
||||||
testTarget, _ := newPayloadSizeLimiter(maxSize, tt)
|
testTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectTarget { return tt })
|
||||||
|
|
||||||
ver := version.Current()
|
ver := version.Current()
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
|
@ -28,11 +28,13 @@ type payloadSizeLimiter struct {
|
||||||
splitID *object.SplitID
|
splitID *object.SplitID
|
||||||
|
|
||||||
parAttrs []object.Attribute
|
parAttrs []object.Attribute
|
||||||
|
|
||||||
|
nextTarget ObjectTarget
|
||||||
}
|
}
|
||||||
|
|
||||||
type Params struct {
|
type Params struct {
|
||||||
Key *ecdsa.PrivateKey
|
Key *ecdsa.PrivateKey
|
||||||
NextTarget ObjectTarget
|
NextTargetInit func() ObjectTarget
|
||||||
SessionToken *session.Object
|
SessionToken *session.Object
|
||||||
NetworkState EpochSource
|
NetworkState EpochSource
|
||||||
MaxSize uint64
|
MaxSize uint64
|
||||||
|
@ -113,7 +115,7 @@ func fromObject(obj *object.Object) *object.Object {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *payloadSizeLimiter) initializeCurrent() {
|
func (s *payloadSizeLimiter) initializeCurrent() {
|
||||||
// create payload hashers
|
s.nextTarget = s.NextTargetInit()
|
||||||
s.writtenCurrent = 0
|
s.writtenCurrent = 0
|
||||||
s.initPayloadHashers()
|
s.initPayloadHashers()
|
||||||
}
|
}
|
||||||
|
@ -156,11 +158,11 @@ func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*Acces
|
||||||
return nil, fmt.Errorf("fillHeader: %w", err)
|
return nil, fmt.Errorf("fillHeader: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.NextTarget.WriteHeader(ctx, s.current); err != nil {
|
if err := s.nextTarget.WriteHeader(ctx, s.current); err != nil {
|
||||||
return nil, fmt.Errorf("could not write header to next target: %w", err)
|
return nil, fmt.Errorf("could not write header to next target: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := s.NextTarget.Close(ctx); err != nil {
|
if _, err := s.nextTarget.Close(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("could not close next target: %w", err)
|
return nil, fmt.Errorf("could not close next target: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,7 +277,7 @@ func (s *payloadSizeLimiter) writeChunk(ctx context.Context, chunk []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *payloadSizeLimiter) writeHashes(ctx context.Context, chunk []byte) error {
|
func (s *payloadSizeLimiter) writeHashes(ctx context.Context, chunk []byte) error {
|
||||||
_, err := s.NextTarget.Write(ctx, chunk)
|
_, err := s.nextTarget.Write(ctx, chunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ func TestTransformer(t *testing.T) {
|
||||||
|
|
||||||
tt := new(testTarget)
|
tt := new(testTarget)
|
||||||
|
|
||||||
target, pk := newPayloadSizeLimiter(maxSize, tt)
|
target, pk := newPayloadSizeLimiter(maxSize, func() ObjectTarget { return tt })
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
hdr := newObject(cnr)
|
hdr := newObject(cnr)
|
||||||
|
@ -131,7 +131,7 @@ func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize in
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
f, _ := newPayloadSizeLimiter(maxSize, benchTarget{})
|
f, _ := newPayloadSizeLimiter(maxSize, func() ObjectTarget { return benchTarget{} })
|
||||||
if err := f.WriteHeader(ctx, header); err != nil {
|
if err := f.WriteHeader(ctx, header); err != nil {
|
||||||
b.Fatalf("write header: %v", err)
|
b.Fatalf("write header: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize in
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPayloadSizeLimiter(maxSize uint64, nextTarget ObjectTarget) (ObjectTarget, *keys.PrivateKey) {
|
func newPayloadSizeLimiter(maxSize uint64, nextTarget func() ObjectTarget) (ObjectTarget, *keys.PrivateKey) {
|
||||||
p, err := keys.NewPrivateKey()
|
p, err := keys.NewPrivateKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -152,7 +152,7 @@ func newPayloadSizeLimiter(maxSize uint64, nextTarget ObjectTarget) (ObjectTarge
|
||||||
|
|
||||||
return NewPayloadSizeLimiter(Params{
|
return NewPayloadSizeLimiter(Params{
|
||||||
Key: &p.PrivateKey,
|
Key: &p.PrivateKey,
|
||||||
NextTarget: nextTarget,
|
NextTargetInit: nextTarget,
|
||||||
NetworkState: dummyEpochSource(123),
|
NetworkState: dummyEpochSource(123),
|
||||||
MaxSize: maxSize,
|
MaxSize: maxSize,
|
||||||
WithoutHomomorphicHash: true,
|
WithoutHomomorphicHash: true,
|
||||||
|
|
Loading…
Reference in a new issue