Allow to split objects in the client #94

Merged
fyrchik merged 2 commits from acid-ant/frostfs-sdk-go:feature/83-cut-obj-client into master 2023-06-28 12:12:39 +00:00
5 changed files with 302 additions and 176 deletions

3
.gitignore vendored
View file

@ -24,3 +24,6 @@ coverage.html
# antlr tool jar
antlr-*.jar
# tempfiles
.cache

View file

@ -3,29 +3,29 @@ package client
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
)
// defaultGRPCPayloadChunkLen default value for maxChunkLen.
// See PrmObjectPutInit.SetGRPCPayloadChunkLen for details.
const defaultGRPCPayloadChunkLen = 3 << 20
// PrmObjectPutInit groups parameters of ObjectPutInit operation.
type PrmObjectPutInit struct {
copyNum []uint32
key *ecdsa.PrivateKey
meta v2session.RequestMetaHeader
maxChunkLen int
maxSize uint64
epochSource transformer.EpochSource
withoutHomomorphicHash bool
}
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
@ -61,31 +61,35 @@ func (x ResObjectPut) StoredObjectID() oid.ID {
return x.obj
}
// ObjectWriter is designed to write one object to FrostFS system.
// ObjectWriter is designed to write one object or
// multiple parts of one object to FrostFS system.
//
// Must be initialized using Client.ObjectPutInit, any other
// usage is unsafe.
type ObjectWriter struct {
cancelCtxStream context.CancelFunc
client *Client
stream interface {
Write(*v2object.PutRequest) error
Close() error
}
key *ecdsa.PrivateKey
res ResObjectPut
err error
chunkCalled bool
respV2 v2object.PutResponse
req v2object.PutRequest
partInit v2object.PutObjectPartInit
partChunk v2object.PutObjectPartChunk
maxChunkLen int
type ObjectWriter interface {
// WriteHeader writes header of the object. Result means success.
// Failure reason can be received via Close.
WriteHeader(context.Context, object.Object) bool
// WritePayloadChunk writes chunk of the object payload. Result means success.
// Failure reason can be received via Close.
WritePayloadChunk(context.Context, []byte) bool
alexvanin marked this conversation as resolved
Review

As far as I understand, this interface hides object transformation details and should be used as a replacement for current payload streaming approach, with the difference that object is created on the client side.

So objectWriter implementation is not suitable for more complex cases when client application wants to resend the object in case of transmission failures. Should client application use transformers directly in the code to do that and then use this interface to upload and reupload created object?

As far as I understand, this interface hides object transformation details and should be used as a replacement for current payload streaming approach, with the difference that object is created on the client side. So `objectWriter` implementation is not suitable for more complex cases when client application wants to resend the object in case of transmission failures. Should client application use transformers directly in the code to do that and _then_ use this interface to upload and reupload created object?
Review

Thought about this, too. There is no mechanism to resend part of the already formed object. Currently, implemented happy path only. So client should transformer directly.
Do you think we need to add some retries when sending parts of the big object?

Thought about this, too. There is no mechanism to resend part of the already formed object. Currently, implemented happy path only. So client should `transformer` directly. Do you think we need to add some retries when sending parts of the big object?
Review

Do you think we need to add some retries when sending parts of the big object?

Probably no. But client library will benefit from having an interface to put already prepared object, in my opinion.

> Do you think we need to add some retries when sending parts of the big object? Probably no. But client library will benefit from having an interface to put already prepared object, in my opinion.
Review

You talk about a new method WriteObject which should wrap WriteHeader and WritePayloadChunk?
And it looks like raw writer only should implement it.

You talk about a new method `WriteObject` which should wrap `WriteHeader` and `WritePayloadChunk`? And it looks like raw writer only should implement it.
Review

Oh yes. I see there is an separate issue for that #64, so we can postpone it. Also we are going to have one more RPC to upload whole object in one request: TrueCloudLab/frostfs-api#9.

Oh yes. I see there is an separate issue for that https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/64, so we can postpone it. Also we are going to have one more RPC to upload whole object in one request: https://git.frostfs.info/TrueCloudLab/frostfs-api/issues/9.
// Close ends writing the object and returns the result of the operation
// along with the final results. Must be called after using the ObjectWriter.
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as Go built-in error.
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
// codes are returned as error.
//
// Return statuses:
// - global (see Client docs);
// - *apistatus.ContainerNotFound;
// - *apistatus.ObjectAccessDenied;
// - *apistatus.ObjectLocked;
// - *apistatus.LockNonRegularObject;
// - *apistatus.SessionTokenNotFound;
// - *apistatus.SessionTokenExpired.
Close(context.Context) (*ResObjectPut, error)
}
// UseKey specifies private key to sign the requests.
@ -124,122 +128,21 @@ func (x *PrmObjectPutInit) WithXHeaders(hs ...string) {
writeXHeadersToMeta(hs, &x.meta)
}
// WriteHeader writes header of the object. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WriteHeader(hdr object.Object) bool {
v2Hdr := hdr.ToV2()
x.partInit.SetObjectID(v2Hdr.GetObjectID())
x.partInit.SetHeader(v2Hdr.GetHeader())
x.partInit.SetSignature(v2Hdr.GetSignature())
x.req.GetBody().SetObjectPart(&x.partInit)
x.req.SetVerificationHeader(nil)
x.err = signature.SignServiceMessage(x.key, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
}
x.err = x.stream.Write(&x.req)
return x.err == nil
// WithObjectMaxSize specifies max object size value and use it during object splitting.
// When specified, start writing to the stream only after the object is formed.
// Continue processing the input only when the previous formed object has been successfully written.
func (x *PrmObjectPutInit) WithObjectMaxSize(maxSize uint64) {
x.maxSize = maxSize
}
// WritePayloadChunk writes chunk of the object payload. Result means success.
// Failure reason can be received via Close.
func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool {
if !x.chunkCalled {
x.chunkCalled = true
x.req.GetBody().SetObjectPart(&x.partChunk)
}
for ln := len(chunk); ln > 0; ln = len(chunk) {
if ln > x.maxChunkLen {
ln = x.maxChunkLen
}
// we deal with size limit overflow above, but there is another case:
// what if method is called with "small" chunk many times? We write
// a message to the stream on each call. Alternatively, we could use buffering.
// In most cases, the chunk length does not vary between calls. Given this
// assumption, as well as the length of the payload from the header, it is
// possible to buffer the data of intermediate chunks, and send a message when
// the allocated buffer is filled, or when the last chunk is received.
// It is mentally assumed that allocating and filling the buffer is better than
// synchronous sending, but this needs to be tested.
x.partChunk.SetChunk(chunk[:ln])
x.req.SetVerificationHeader(nil)
x.err = signature.SignServiceMessage(x.key, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
}
x.err = x.stream.Write(&x.req)
if x.err != nil {
return false
}
chunk = chunk[ln:]
}
return true
// WithoutHomomorphicHash if set to true do not use Tillich-Zémor hash for payload.
func (x *PrmObjectPutInit) WithoutHomomorphicHash(v bool) {
x.withoutHomomorphicHash = v
}
// Close ends writing the object and returns the result of the operation
// along with the final results. Must be called after using the ObjectWriter.
//
// Exactly one return value is non-nil. By default, server status is returned in res structure.
// Any client's internal or transport errors are returned as Go built-in error.
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
// codes are returned as error.
//
// Return statuses:
// - global (see Client docs);
// - *apistatus.ContainerNotFound;
// - *apistatus.ObjectAccessDenied;
// - *apistatus.ObjectLocked;
// - *apistatus.LockNonRegularObject;
// - *apistatus.SessionTokenNotFound;
// - *apistatus.SessionTokenExpired.
func (x *ObjectWriter) Close() (*ResObjectPut, error) {
defer x.cancelCtxStream()
// Ignore io.EOF error, because it is expected error for client-side
// stream termination by the server. E.g. when stream contains invalid
// message. Server returns an error in response message (in status).
if x.err != nil && !errors.Is(x.err, io.EOF) {
return nil, x.err
}
if x.err = x.stream.Close(); x.err != nil {
return nil, x.err
}
x.res.st, x.err = x.client.processResponse(&x.respV2)
if x.err != nil {
return nil, x.err
}
if !apistatus.IsSuccessful(x.res.st) {
return &x.res, nil
}
const fieldID = "ID"
idV2 := x.respV2.GetBody().GetObjectID()
if idV2 == nil {
return nil, newErrMissingResponseField(fieldID)
}
x.err = x.res.obj.ReadFromV2(*idV2)
if x.err != nil {
x.err = newErrInvalidResponseField(fieldID, x.err)
}
return &x.res, nil
// WithEpochSource specifies epoch for object when split it on client side.
func (x *PrmObjectPutInit) WithEpochSource(es transformer.EpochSource) {
x.epochSource = es
}
// ObjectPutInit initiates writing an object through a remote server using FrostFS API protocol.
@ -249,31 +152,9 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) {
//
// Returns an error if parameters are set incorrectly.
// Context is required and must not be nil. It is used for network communication.
func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*ObjectWriter, error) {
var w ObjectWriter
ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("open stream: %w", err)
func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (ObjectWriter, error) {
if prm.maxSize > 0 {
return c.objectPutInitTransformer(prm)
}
w.key = &c.prm.key
if prm.key != nil {
w.key = prm.key
}
w.cancelCtxStream = cancel
w.client = c
w.stream = stream
w.partInit.SetCopiesNumber(prm.copyNum)
w.req.SetBody(new(v2object.PutRequestBody))
if prm.maxChunkLen > 0 {
w.maxChunkLen = prm.maxChunkLen
} else {
w.maxChunkLen = 3 << 20
}
c.prepareRequest(&w.req, &prm.meta)
return &w, nil
return c.objectPutInitRaw(ctx, prm)
}

154
client/object_put_raw.go Normal file
View file

@ -0,0 +1,154 @@
package client
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
func (c *Client) objectPutInitRaw(ctx context.Context, prm PrmObjectPutInit) (*objectWriterRaw, error) {
var w objectWriterRaw
stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("open stream: %w", err)
}
w.key = &c.prm.key
if prm.key != nil {
w.key = prm.key
}
w.client = c
w.stream = stream
w.partInit.SetCopiesNumber(prm.copyNum)
w.req.SetBody(new(v2object.PutRequestBody))
if prm.maxChunkLen > 0 {
w.maxChunkLen = prm.maxChunkLen
} else {
w.maxChunkLen = defaultGRPCPayloadChunkLen
}
c.prepareRequest(&w.req, &prm.meta)
return &w, nil
}
type objectWriterRaw struct {
client *Client
stream interface {
Write(*v2object.PutRequest) error
Close() error
}
key *ecdsa.PrivateKey
res ResObjectPut
err error
chunkCalled bool
respV2 v2object.PutResponse
req v2object.PutRequest
partInit v2object.PutObjectPartInit
partChunk v2object.PutObjectPartChunk
maxChunkLen int
}
func (x *objectWriterRaw) WriteHeader(_ context.Context, hdr object.Object) bool {
v2Hdr := hdr.ToV2()
x.partInit.SetObjectID(v2Hdr.GetObjectID())
x.partInit.SetHeader(v2Hdr.GetHeader())
x.partInit.SetSignature(v2Hdr.GetSignature())
x.req.GetBody().SetObjectPart(&x.partInit)
x.req.SetVerificationHeader(nil)
x.err = signature.SignServiceMessage(x.key, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
}
x.err = x.stream.Write(&x.req)
return x.err == nil
}
func (x *objectWriterRaw) WritePayloadChunk(_ context.Context, chunk []byte) bool {
if !x.chunkCalled {
x.chunkCalled = true
x.req.GetBody().SetObjectPart(&x.partChunk)
}
for ln := len(chunk); ln > 0; ln = len(chunk) {
if ln > x.maxChunkLen {
ln = x.maxChunkLen
}
// we deal with size limit overflow above, but there is another case:
// what if method is called with "small" chunk many times? We write
// a message to the stream on each call. Alternatively, we could use buffering.
// In most cases, the chunk length does not vary between calls. Given this
// assumption, as well as the length of the payload from the header, it is
// possible to buffer the data of intermediate chunks, and send a message when
// the allocated buffer is filled, or when the last chunk is received.
// It is mentally assumed that allocating and filling the buffer is better than
// synchronous sending, but this needs to be tested.
x.partChunk.SetChunk(chunk[:ln])
x.req.SetVerificationHeader(nil)
x.err = signature.SignServiceMessage(x.key, &x.req)
if x.err != nil {
x.err = fmt.Errorf("sign message: %w", x.err)
return false
}
x.err = x.stream.Write(&x.req)
if x.err != nil {
return false
}
chunk = chunk[ln:]
}
return true
}
func (x *objectWriterRaw) Close(_ context.Context) (*ResObjectPut, error) {
// Ignore io.EOF error, because it is expected error for client-side
// stream termination by the server. E.g. when stream contains invalid
// message. Server returns an error in response message (in status).
if x.err != nil && !errors.Is(x.err, io.EOF) {
return nil, x.err
}
if x.err = x.stream.Close(); x.err != nil {
return nil, x.err
}
x.res.st, x.err = x.client.processResponse(&x.respV2)
if x.err != nil {
return nil, x.err
}
if !apistatus.IsSuccessful(x.res.st) {
return &x.res, nil
}
const fieldID = "ID"
idV2 := x.respV2.GetBody().GetObjectID()
if idV2 == nil {
return nil, newErrMissingResponseField(fieldID)
}
x.err = x.res.obj.ReadFromV2(*idV2)
if x.err != nil {
x.err = newErrInvalidResponseField(fieldID, x.err)
}
return &x.res, nil
}

View file

@ -0,0 +1,88 @@
package client
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) {
var w objectWriterTransformer
w.it = internalTarget{
client: c,
prm: prm,
}
key := &c.prm.key
if prm.key != nil {
key = prm.key
}
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectTarget { return &w.it },
MaxSize: prm.maxSize,
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
NetworkState: prm.epochSource,
})
return &w, nil
}
type objectWriterTransformer struct {
ot transformer.ObjectTarget
it internalTarget
err error
}
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
x.err = x.ot.WriteHeader(ctx, &hdr)
return x.err == nil
}
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
_, x.err = x.ot.Write(ctx, chunk)
return x.err == nil
}
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
if ai, err := x.ot.Close(ctx); err != nil {
return nil, err
} else {
if ai != nil && ai.ParentID != nil {
x.it.res.obj = *ai.ParentID
}
return x.it.res, nil
}
}
type internalTarget struct {
current *object.Object
client *Client
res *ResObjectPut
prm PrmObjectPutInit
payload []byte
}
func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error {
it.current = object
return nil
}
func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error) {
it.payload = append(it.payload, p...)
return len(p), nil
}
func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
it.current.SetPayload(it.payload)
wrt, err := it.client.objectPutInitRaw(ctx, it.prm)
if err != nil {
return nil, err
}
if wrt.WriteHeader(ctx, *it.current) {
wrt.WritePayloadChunk(ctx, it.current.Payload())
}
it.res, err = wrt.Close(ctx)
it.current = nil
it.payload = nil
return nil, err
}

View file

@ -645,7 +645,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(prm.hdr) {
if wObj.WriteHeader(ctx, prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
@ -672,7 +672,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
n, err = prm.payload.Read(buf)
if n > 0 {
start = time.Now()
successWrite := wObj.WritePayloadChunk(buf[:n])
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
c.incRequests(time.Since(start), methodObjectPut)
if !successWrite {
break
@ -690,7 +690,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
}
}
res, err := wObj.Close()
res, err := wObj.Close(ctx)
var st apistatus.Status
if res != nil {
st = res.Status()