forked from TrueCloudLab/frostfs-sdk-go
[#83] Allow to split objects in the client
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
66cb5dcf34
commit
2f88460172
4 changed files with 298 additions and 175 deletions
|
@ -3,29 +3,29 @@ package client
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
|
||||||
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
||||||
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
||||||
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// defaultGRPCPayloadChunkLen default value for maxChunkLen.
|
||||||
|
// See PrmObjectPutInit.SetGRPCPayloadChunkLen for details.
|
||||||
|
const defaultGRPCPayloadChunkLen = 3 << 20
|
||||||
|
|
||||||
// PrmObjectPutInit groups parameters of ObjectPutInit operation.
|
// PrmObjectPutInit groups parameters of ObjectPutInit operation.
|
||||||
type PrmObjectPutInit struct {
|
type PrmObjectPutInit struct {
|
||||||
copyNum []uint32
|
copyNum []uint32
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
meta v2session.RequestMetaHeader
|
meta v2session.RequestMetaHeader
|
||||||
maxChunkLen int
|
maxChunkLen int
|
||||||
|
maxSize uint64
|
||||||
|
epochSource transformer.EpochSource
|
||||||
|
withoutHomomorphicHash bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
|
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
|
||||||
|
@ -61,31 +61,35 @@ func (x ResObjectPut) StoredObjectID() oid.ID {
|
||||||
return x.obj
|
return x.obj
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectWriter is designed to write one object to FrostFS system.
|
// ObjectWriter is designed to write one object or
|
||||||
|
// multiple parts of one object to FrostFS system.
|
||||||
//
|
//
|
||||||
// Must be initialized using Client.ObjectPutInit, any other
|
// Must be initialized using Client.ObjectPutInit, any other
|
||||||
// usage is unsafe.
|
// usage is unsafe.
|
||||||
type ObjectWriter struct {
|
type ObjectWriter interface {
|
||||||
cancelCtxStream context.CancelFunc
|
// WriteHeader writes header of the object. Result means success.
|
||||||
|
// Failure reason can be received via Close.
|
||||||
client *Client
|
WriteHeader(context.Context, object.Object) bool
|
||||||
stream interface {
|
// WritePayloadChunk writes chunk of the object payload. Result means success.
|
||||||
Write(*v2object.PutRequest) error
|
// Failure reason can be received via Close.
|
||||||
Close() error
|
WritePayloadChunk(context.Context, []byte) bool
|
||||||
}
|
// Close ends writing the object and returns the result of the operation
|
||||||
|
// along with the final results. Must be called after using the ObjectWriter.
|
||||||
key *ecdsa.PrivateKey
|
//
|
||||||
res ResObjectPut
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
err error
|
// Any client's internal or transport errors are returned as Go built-in error.
|
||||||
|
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
|
||||||
chunkCalled bool
|
// codes are returned as error.
|
||||||
|
//
|
||||||
respV2 v2object.PutResponse
|
// Return statuses:
|
||||||
req v2object.PutRequest
|
// - global (see Client docs);
|
||||||
partInit v2object.PutObjectPartInit
|
// - *apistatus.ContainerNotFound;
|
||||||
partChunk v2object.PutObjectPartChunk
|
// - *apistatus.ObjectAccessDenied;
|
||||||
|
// - *apistatus.ObjectLocked;
|
||||||
maxChunkLen int
|
// - *apistatus.LockNonRegularObject;
|
||||||
|
// - *apistatus.SessionTokenNotFound;
|
||||||
|
// - *apistatus.SessionTokenExpired.
|
||||||
|
Close(context.Context) (*ResObjectPut, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UseKey specifies private key to sign the requests.
|
// UseKey specifies private key to sign the requests.
|
||||||
|
@ -124,122 +128,21 @@ func (x *PrmObjectPutInit) WithXHeaders(hs ...string) {
|
||||||
writeXHeadersToMeta(hs, &x.meta)
|
writeXHeadersToMeta(hs, &x.meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteHeader writes header of the object. Result means success.
|
// WithObjectMaxSize specifies max object size value and use it during object splitting.
|
||||||
// Failure reason can be received via Close.
|
// When specified, start writing to the stream only after the object is formed.
|
||||||
func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
|
// Continue processing the input only when the previous formed object has been successfully written.
|
||||||
v2Hdr := hdr.ToV2()
|
func (x *PrmObjectPutInit) WithObjectMaxSize(maxSize uint64) {
|
||||||
|
x.maxSize = maxSize
|
||||||
x.partInit.SetObjectID(v2Hdr.GetObjectID())
|
|
||||||
x.partInit.SetHeader(v2Hdr.GetHeader())
|
|
||||||
x.partInit.SetSignature(v2Hdr.GetSignature())
|
|
||||||
|
|
||||||
x.req.GetBody().SetObjectPart(&x.partInit)
|
|
||||||
x.req.SetVerificationHeader(nil)
|
|
||||||
|
|
||||||
x.err = signature.SignServiceMessage(x.key, &x.req)
|
|
||||||
if x.err != nil {
|
|
||||||
x.err = fmt.Errorf("sign message: %w", x.err)
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
x.err = x.stream.Write(&x.req)
|
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
|
||||||
return x.err == nil
|
func (x *PrmObjectPutInit) WithoutHomomorphicHash(v bool) {
|
||||||
|
x.withoutHomomorphicHash = v
|
||||||
}
|
}
|
||||||
|
|
||||||
// WritePayloadChunk writes chunk of the object payload. Result means success.
|
// WithEpochSource specifies epoch for object when split it on client side.
|
||||||
// Failure reason can be received via Close.
|
func (x *PrmObjectPutInit) WithEpochSource(es transformer.EpochSource) {
|
||||||
func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
|
x.epochSource = es
|
||||||
if !x.chunkCalled {
|
|
||||||
x.chunkCalled = true
|
|
||||||
x.req.GetBody().SetObjectPart(&x.partChunk)
|
|
||||||
}
|
|
||||||
|
|
||||||
for ln := len(chunk); ln > 0; ln = len(chunk) {
|
|
||||||
if ln > x.maxChunkLen {
|
|
||||||
ln = x.maxChunkLen
|
|
||||||
}
|
|
||||||
|
|
||||||
// we deal with size limit overflow above, but there is another case:
|
|
||||||
// what if method is called with "small" chunk many times? We write
|
|
||||||
// a message to the stream on each call. Alternatively, we could use buffering.
|
|
||||||
// In most cases, the chunk length does not vary between calls. Given this
|
|
||||||
// assumption, as well as the length of the payload from the header, it is
|
|
||||||
// possible to buffer the data of intermediate chunks, and send a message when
|
|
||||||
// the allocated buffer is filled, or when the last chunk is received.
|
|
||||||
// It is mentally assumed that allocating and filling the buffer is better than
|
|
||||||
// synchronous sending, but this needs to be tested.
|
|
||||||
x.partChunk.SetChunk(chunk[:ln])
|
|
||||||
x.req.SetVerificationHeader(nil)
|
|
||||||
|
|
||||||
x.err = signature.SignServiceMessage(x.key, &x.req)
|
|
||||||
if x.err != nil {
|
|
||||||
x.err = fmt.Errorf("sign message: %w", x.err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
x.err = x.stream.Write(&x.req)
|
|
||||||
if x.err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
chunk = chunk[ln:]
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close ends writing the object and returns the result of the operation
|
|
||||||
// along with the final results. Must be called after using the ObjectWriter.
|
|
||||||
//
|
|
||||||
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
|
||||||
// Any client's internal or transport errors are returned as Go built-in error.
|
|
||||||
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
|
|
||||||
// codes are returned as error.
|
|
||||||
//
|
|
||||||
// Return statuses:
|
|
||||||
// - global (see Client docs);
|
|
||||||
// - *apistatus.ContainerNotFound;
|
|
||||||
// - *apistatus.ObjectAccessDenied;
|
|
||||||
// - *apistatus.ObjectLocked;
|
|
||||||
// - *apistatus.LockNonRegularObject;
|
|
||||||
// - *apistatus.SessionTokenNotFound;
|
|
||||||
// - *apistatus.SessionTokenExpired.
|
|
||||||
func (x *ObjectWriter) Close() (*ResObjectPut, error) {
|
|
||||||
defer x.cancelCtxStream()
|
|
||||||
|
|
||||||
// Ignore io.EOF error, because it is expected error for client-side
|
|
||||||
// stream termination by the server. E.g. when stream contains invalid
|
|
||||||
// message. Server returns an error in response message (in status).
|
|
||||||
if x.err != nil && !errors.Is(x.err, io.EOF) {
|
|
||||||
return nil, x.err
|
|
||||||
}
|
|
||||||
|
|
||||||
if x.err = x.stream.Close(); x.err != nil {
|
|
||||||
return nil, x.err
|
|
||||||
}
|
|
||||||
|
|
||||||
x.res.st, x.err = x.client.processResponse(&x.respV2)
|
|
||||||
if x.err != nil {
|
|
||||||
return nil, x.err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !apistatus.IsSuccessful(x.res.st) {
|
|
||||||
return &x.res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
const fieldID = "ID"
|
|
||||||
|
|
||||||
idV2 := x.respV2.GetBody().GetObjectID()
|
|
||||||
if idV2 == nil {
|
|
||||||
return nil, newErrMissingResponseField(fieldID)
|
|
||||||
}
|
|
||||||
|
|
||||||
x.err = x.res.obj.ReadFromV2(*idV2)
|
|
||||||
if x.err != nil {
|
|
||||||
x.err = newErrInvalidResponseField(fieldID, x.err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &x.res, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectPutInit initiates writing an object through a remote server using FrostFS API protocol.
|
// ObjectPutInit initiates writing an object through a remote server using FrostFS API protocol.
|
||||||
|
@ -249,31 +152,9 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) {
|
||||||
//
|
//
|
||||||
// Returns an error if parameters are set incorrectly.
|
// Returns an error if parameters are set incorrectly.
|
||||||
// Context is required and must not be nil. It is used for network communication.
|
// Context is required and must not be nil. It is used for network communication.
|
||||||
func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*ObjectWriter, error) {
|
func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (ObjectWriter, error) {
|
||||||
var w ObjectWriter
|
if prm.maxSize > 0 {
|
||||||
|
return c.objectPutInitTransformer(prm)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx))
|
|
||||||
if err != nil {
|
|
||||||
cancel()
|
|
||||||
return nil, fmt.Errorf("open stream: %w", err)
|
|
||||||
}
|
}
|
||||||
|
return c.objectPutInitRaw(ctx, prm)
|
||||||
w.key = &c.prm.key
|
|
||||||
if prm.key != nil {
|
|
||||||
w.key = prm.key
|
|
||||||
}
|
|
||||||
w.cancelCtxStream = cancel
|
|
||||||
w.client = c
|
|
||||||
w.stream = stream
|
|
||||||
w.partInit.SetCopiesNumber(prm.copyNum)
|
|
||||||
w.req.SetBody(new(v2object.PutRequestBody))
|
|
||||||
if prm.maxChunkLen > 0 {
|
|
||||||
w.maxChunkLen = prm.maxChunkLen
|
|
||||||
} else {
|
|
||||||
w.maxChunkLen = 3 << 20
|
|
||||||
}
|
|
||||||
c.prepareRequest(&w.req, &prm.meta)
|
|
||||||
|
|
||||||
return &w, nil
|
|
||||||
}
|
}
|
||||||
|
|
154
client/object_put_raw.go
Normal file
154
client/object_put_raw.go
Normal file
|
@ -0,0 +1,154 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Client) objectPutInitRaw(ctx context.Context, prm PrmObjectPutInit) (*objectWriterRaw, error) {
|
||||||
|
var w objectWriterRaw
|
||||||
|
stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("open stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.key = &c.prm.key
|
||||||
|
if prm.key != nil {
|
||||||
|
w.key = prm.key
|
||||||
|
}
|
||||||
|
w.client = c
|
||||||
|
w.stream = stream
|
||||||
|
w.partInit.SetCopiesNumber(prm.copyNum)
|
||||||
|
w.req.SetBody(new(v2object.PutRequestBody))
|
||||||
|
if prm.maxChunkLen > 0 {
|
||||||
|
w.maxChunkLen = prm.maxChunkLen
|
||||||
|
} else {
|
||||||
|
w.maxChunkLen = defaultGRPCPayloadChunkLen
|
||||||
|
}
|
||||||
|
c.prepareRequest(&w.req, &prm.meta)
|
||||||
|
return &w, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectWriterRaw struct {
|
||||||
|
client *Client
|
||||||
|
stream interface {
|
||||||
|
Write(*v2object.PutRequest) error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
res ResObjectPut
|
||||||
|
err error
|
||||||
|
chunkCalled bool
|
||||||
|
respV2 v2object.PutResponse
|
||||||
|
req v2object.PutRequest
|
||||||
|
partInit v2object.PutObjectPartInit
|
||||||
|
partChunk v2object.PutObjectPartChunk
|
||||||
|
maxChunkLen int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectWriterRaw) WriteHeader(_ context.Context, hdr object.Object) bool {
|
||||||
|
v2Hdr := hdr.ToV2()
|
||||||
|
|
||||||
|
x.partInit.SetObjectID(v2Hdr.GetObjectID())
|
||||||
|
x.partInit.SetHeader(v2Hdr.GetHeader())
|
||||||
|
x.partInit.SetSignature(v2Hdr.GetSignature())
|
||||||
|
|
||||||
|
x.req.GetBody().SetObjectPart(&x.partInit)
|
||||||
|
x.req.SetVerificationHeader(nil)
|
||||||
|
|
||||||
|
x.err = signature.SignServiceMessage(x.key, &x.req)
|
||||||
|
if x.err != nil {
|
||||||
|
x.err = fmt.Errorf("sign message: %w", x.err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
x.err = x.stream.Write(&x.req)
|
||||||
|
return x.err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectWriterRaw) WritePayloadChunk(_ context.Context, chunk []byte) bool {
|
||||||
|
if !x.chunkCalled {
|
||||||
|
x.chunkCalled = true
|
||||||
|
x.req.GetBody().SetObjectPart(&x.partChunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
for ln := len(chunk); ln > 0; ln = len(chunk) {
|
||||||
|
if ln > x.maxChunkLen {
|
||||||
|
ln = x.maxChunkLen
|
||||||
|
}
|
||||||
|
|
||||||
|
// we deal with size limit overflow above, but there is another case:
|
||||||
|
// what if method is called with "small" chunk many times? We write
|
||||||
|
// a message to the stream on each call. Alternatively, we could use buffering.
|
||||||
|
// In most cases, the chunk length does not vary between calls. Given this
|
||||||
|
// assumption, as well as the length of the payload from the header, it is
|
||||||
|
// possible to buffer the data of intermediate chunks, and send a message when
|
||||||
|
// the allocated buffer is filled, or when the last chunk is received.
|
||||||
|
// It is mentally assumed that allocating and filling the buffer is better than
|
||||||
|
// synchronous sending, but this needs to be tested.
|
||||||
|
x.partChunk.SetChunk(chunk[:ln])
|
||||||
|
x.req.SetVerificationHeader(nil)
|
||||||
|
|
||||||
|
x.err = signature.SignServiceMessage(x.key, &x.req)
|
||||||
|
if x.err != nil {
|
||||||
|
x.err = fmt.Errorf("sign message: %w", x.err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
x.err = x.stream.Write(&x.req)
|
||||||
|
if x.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
chunk = chunk[ln:]
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectWriterRaw) Close(_ context.Context) (*ResObjectPut, error) {
|
||||||
|
// Ignore io.EOF error, because it is expected error for client-side
|
||||||
|
// stream termination by the server. E.g. when stream contains invalid
|
||||||
|
// message. Server returns an error in response message (in status).
|
||||||
|
if x.err != nil && !errors.Is(x.err, io.EOF) {
|
||||||
|
return nil, x.err
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.err = x.stream.Close(); x.err != nil {
|
||||||
|
return nil, x.err
|
||||||
|
}
|
||||||
|
|
||||||
|
x.res.st, x.err = x.client.processResponse(&x.respV2)
|
||||||
|
if x.err != nil {
|
||||||
|
return nil, x.err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !apistatus.IsSuccessful(x.res.st) {
|
||||||
|
return &x.res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const fieldID = "ID"
|
||||||
|
|
||||||
|
idV2 := x.respV2.GetBody().GetObjectID()
|
||||||
|
if idV2 == nil {
|
||||||
|
return nil, newErrMissingResponseField(fieldID)
|
||||||
|
}
|
||||||
|
|
||||||
|
x.err = x.res.obj.ReadFromV2(*idV2)
|
||||||
|
if x.err != nil {
|
||||||
|
x.err = newErrInvalidResponseField(fieldID, x.err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &x.res, nil
|
||||||
|
}
|
88
client/object_put_transformer.go
Normal file
88
client/object_put_transformer.go
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) {
|
||||||
|
var w objectWriterTransformer
|
||||||
|
w.it = internalTarget{
|
||||||
|
client: c,
|
||||||
|
prm: prm,
|
||||||
|
}
|
||||||
|
key := &c.prm.key
|
||||||
|
if prm.key != nil {
|
||||||
|
key = prm.key
|
||||||
|
}
|
||||||
|
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
|
Key: key,
|
||||||
|
NextTargetInit: func() transformer.ObjectTarget { return &w.it },
|
||||||
|
MaxSize: prm.maxSize,
|
||||||
|
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
|
||||||
|
NetworkState: prm.epochSource,
|
||||||
|
})
|
||||||
|
return &w, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectWriterTransformer struct {
|
||||||
|
ot transformer.ObjectTarget
|
||||||
|
it internalTarget
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
|
||||||
|
x.err = x.ot.WriteHeader(ctx, &hdr)
|
||||||
|
return x.err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
|
||||||
|
_, x.err = x.ot.Write(ctx, chunk)
|
||||||
|
return x.err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
|
||||||
|
if ai, err := x.ot.Close(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
if ai != nil && ai.ParentID != nil {
|
||||||
|
x.it.res.obj = *ai.ParentID
|
||||||
|
}
|
||||||
|
return x.it.res, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type internalTarget struct {
|
||||||
|
current *object.Object
|
||||||
|
client *Client
|
||||||
|
res *ResObjectPut
|
||||||
|
prm PrmObjectPutInit
|
||||||
|
payload []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error {
|
||||||
|
it.current = object
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error) {
|
||||||
|
it.payload = append(it.payload, p...)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
||||||
|
it.current.SetPayload(it.payload)
|
||||||
|
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if wrt.WriteHeader(ctx, *it.current) {
|
||||||
|
wrt.WritePayloadChunk(ctx, it.current.Payload())
|
||||||
|
}
|
||||||
|
it.res, err = wrt.Close(ctx)
|
||||||
|
it.current = nil
|
||||||
|
it.payload = nil
|
||||||
|
return nil, err
|
||||||
|
}
|
|
@ -645,7 +645,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
||||||
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if wObj.WriteHeader(prm.hdr) {
|
if wObj.WriteHeader(ctx, prm.hdr) {
|
||||||
sz := prm.hdr.PayloadSize()
|
sz := prm.hdr.PayloadSize()
|
||||||
|
|
||||||
if data := prm.hdr.Payload(); len(data) > 0 {
|
if data := prm.hdr.Payload(); len(data) > 0 {
|
||||||
|
@ -672,7 +672,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
||||||
n, err = prm.payload.Read(buf)
|
n, err = prm.payload.Read(buf)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
successWrite := wObj.WritePayloadChunk(buf[:n])
|
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
|
||||||
c.incRequests(time.Since(start), methodObjectPut)
|
c.incRequests(time.Since(start), methodObjectPut)
|
||||||
if !successWrite {
|
if !successWrite {
|
||||||
break
|
break
|
||||||
|
@ -690,7 +690,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := wObj.Close()
|
res, err := wObj.Close(ctx)
|
||||||
var st apistatus.Status
|
var st apistatus.Status
|
||||||
if res != nil {
|
if res != nil {
|
||||||
st = res.Status()
|
st = res.Status()
|
||||||
|
|
Loading…
Reference in a new issue