forked from TrueCloudLab/frostfs-node
[#1064] putsvc: Refactor distributed target
Extract object builder. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
92569b9bbf
commit
39da643354
3 changed files with 60 additions and 48 deletions
54
pkg/services/object/put/builder.go
Normal file
54
pkg/services/object/put/builder.go
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
package putsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ transformer.ChunkedObjectWriter = (*inMemoryObjectBuilder)(nil)
|
||||||
|
|
||||||
|
type inMemoryObjectBuilder struct {
|
||||||
|
objectWriter transformer.ObjectWriter
|
||||||
|
payload *payload
|
||||||
|
|
||||||
|
obj *objectSDK.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func newInMemoryObjectBuilder(objectWriter transformer.ObjectWriter) *inMemoryObjectBuilder {
|
||||||
|
return &inMemoryObjectBuilder{
|
||||||
|
objectWriter: objectWriter,
|
||||||
|
payload: getPayload(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
||||||
|
defer func() {
|
||||||
|
putPayload(b.payload)
|
||||||
|
b.payload = nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
b.obj.SetPayload(b.payload.Data)
|
||||||
|
|
||||||
|
if err := b.objectWriter.WriteObject(ctx, b.obj); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
id, _ := b.obj.ID()
|
||||||
|
return &transformer.AccessIdentifiers{
|
||||||
|
SelfID: id,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) Write(_ context.Context, p []byte) (int, error) {
|
||||||
|
b.payload.Data = append(b.payload.Data, p...)
|
||||||
|
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) WriteHeader(_ context.Context, obj *objectSDK.Object) error {
|
||||||
|
b.obj = obj
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type preparedObjectTarget interface {
|
type preparedObjectTarget interface {
|
||||||
|
@ -16,15 +15,12 @@ type preparedObjectTarget interface {
|
||||||
|
|
||||||
type distributedTarget struct {
|
type distributedTarget struct {
|
||||||
placementOpts []placement.Option
|
placementOpts []placement.Option
|
||||||
extraBroadcastEnabled bool
|
|
||||||
|
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
objMeta object.ContentMeta
|
objMeta object.ContentMeta
|
||||||
|
|
||||||
*cfg
|
*cfg
|
||||||
|
|
||||||
payload *payload
|
|
||||||
|
|
||||||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||||
|
|
||||||
relay func(context.Context, nodeDesc) error
|
relay func(context.Context, nodeDesc) error
|
||||||
|
@ -91,36 +87,6 @@ func (x errIncompletePut) Error() string {
|
||||||
return commonMsg
|
return commonMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) WriteHeader(_ context.Context, obj *objectSDK.Object) error {
|
|
||||||
t.obj = obj
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *distributedTarget) Write(_ context.Context, p []byte) (n int, err error) {
|
|
||||||
t.payload.Data = append(t.payload.Data, p...)
|
|
||||||
|
|
||||||
return len(p), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
|
||||||
defer func() {
|
|
||||||
putPayload(t.payload)
|
|
||||||
t.payload = nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
t.obj.SetPayload(t.payload.Data)
|
|
||||||
|
|
||||||
if err := t.WriteObject(ctx, t.obj); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
id, _ := t.obj.ID()
|
|
||||||
return &transformer.AccessIdentifiers{
|
|
||||||
SelfID: id,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteObject implements the transformer.ObjectWriter interface.
|
// WriteObject implements the transformer.ObjectWriter interface.
|
||||||
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
t.obj = obj
|
t.obj = obj
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
)
|
)
|
||||||
|
@ -79,7 +78,7 @@ func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
||||||
|
|
||||||
// prepare untrusted-Put object target
|
// prepare untrusted-Put object target
|
||||||
p.target = &validatingPreparedTarget{
|
p.target = &validatingPreparedTarget{
|
||||||
nextTarget: p.newCommonTarget(prm),
|
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
|
|
||||||
maxPayloadSz: p.maxPayloadSz,
|
maxPayloadSz: p.maxPayloadSz,
|
||||||
|
@ -133,7 +132,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
Key: sessionKey,
|
Key: sessionKey,
|
||||||
NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) },
|
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
|
||||||
NetworkState: p.networkState,
|
NetworkState: p.networkState,
|
||||||
MaxSize: p.maxPayloadSz,
|
MaxSize: p.maxPayloadSz,
|
||||||
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
||||||
|
@ -196,7 +195,7 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||||
var relay func(context.Context, nodeDesc) error
|
var relay func(context.Context, nodeDesc) error
|
||||||
if p.relay != nil {
|
if p.relay != nil {
|
||||||
relay = func(ctx context.Context, node nodeDesc) error {
|
relay = func(ctx context.Context, node nodeDesc) error {
|
||||||
|
@ -213,16 +212,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// enable additional container broadcast on non-local operation
|
|
||||||
// if object has TOMBSTONE or LOCK type.
|
|
||||||
typ := prm.hdr.Type()
|
|
||||||
withBroadcast := !prm.common.LocalOnly() && (typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLock)
|
|
||||||
|
|
||||||
return &distributedTarget{
|
return &distributedTarget{
|
||||||
cfg: p.cfg,
|
cfg: p.cfg,
|
||||||
placementOpts: prm.traverseOpts,
|
placementOpts: prm.traverseOpts,
|
||||||
extraBroadcastEnabled: withBroadcast,
|
|
||||||
payload: getPayload(),
|
|
||||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||||
if node.local {
|
if node.local {
|
||||||
return localTarget{
|
return localTarget{
|
||||||
|
|
Loading…
Reference in a new issue