[#64] transformer: Simplify interface
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
d70ef2187b
commit
c359a7465a
6 changed files with 52 additions and 112 deletions
|
@ -21,7 +21,7 @@ func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTr
|
|||
}
|
||||
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: key,
|
||||
NextTargetInit: func() transformer.ObjectTarget { return &w.it },
|
||||
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
|
||||
MaxSize: prm.maxSize,
|
||||
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
|
||||
NetworkState: prm.epochSource,
|
||||
|
@ -30,7 +30,7 @@ func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTr
|
|||
}
|
||||
|
||||
type objectWriterTransformer struct {
|
||||
ot transformer.ObjectTarget
|
||||
ot transformer.ChunkedObjectWriter
|
||||
it internalTarget
|
||||
err error
|
||||
}
|
||||
|
@ -58,56 +58,40 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
|
|||
}
|
||||
|
||||
type internalTarget struct {
|
||||
current *object.Object
|
||||
client *Client
|
||||
res *ResObjectPut
|
||||
prm PrmObjectPutInit
|
||||
payload []byte
|
||||
useStream bool
|
||||
}
|
||||
|
||||
func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error {
|
||||
it.current = object
|
||||
return nil
|
||||
}
|
||||
|
||||
func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error) {
|
||||
it.payload = append(it.payload, p...)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
||||
it.current.SetPayload(it.payload)
|
||||
|
||||
putSingleImplemented, err := it.tryPutSingle(ctx)
|
||||
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
|
||||
putSingleImplemented, err := it.tryPutSingle(ctx, o)
|
||||
if putSingleImplemented {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
it.useStream = true
|
||||
return nil, it.putAsStream(ctx)
|
||||
return it.putAsStream(ctx, o)
|
||||
}
|
||||
|
||||
func (it *internalTarget) putAsStream(ctx context.Context) error {
|
||||
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
|
||||
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if wrt.WriteHeader(ctx, *it.current) {
|
||||
wrt.WritePayloadChunk(ctx, it.current.Payload())
|
||||
if wrt.WriteHeader(ctx, *o) {
|
||||
wrt.WritePayloadChunk(ctx, o.Payload())
|
||||
}
|
||||
it.res, err = wrt.Close(ctx)
|
||||
it.current = nil
|
||||
it.payload = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (it *internalTarget) tryPutSingle(ctx context.Context) (bool, error) {
|
||||
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
|
||||
if it.useStream {
|
||||
return false, nil
|
||||
}
|
||||
var prm PrmObjectPutSingle
|
||||
prm.SetCopiesNumber(it.prm.copyNum)
|
||||
prm.SetObject(it.current.ToV2())
|
||||
prm.SetObject(o.ToV2())
|
||||
prm.UseKey(prm.key)
|
||||
prm.meta = it.prm.meta
|
||||
|
||||
|
@ -117,13 +101,11 @@ func (it *internalTarget) tryPutSingle(ctx context.Context) (bool, error) {
|
|||
}
|
||||
|
||||
if err == nil {
|
||||
id, _ := it.current.ID()
|
||||
id, _ := o.ID()
|
||||
it.res = &ResObjectPut{
|
||||
statusRes: res.statusRes,
|
||||
obj: id,
|
||||
}
|
||||
}
|
||||
it.current = nil
|
||||
it.payload = nil
|
||||
return true, err
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue