From 2f884601726cbd9aaba1b10a8b225a1e64ce7c2d Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 27 Jun 2023 11:53:53 +0300 Subject: [PATCH] [#83] Allow to split objects in the client Signed-off-by: Anton Nikiforov --- client/object_put.go | 225 ++++++++----------------------- client/object_put_raw.go | 154 +++++++++++++++++++++ client/object_put_transformer.go | 88 ++++++++++++ pool/pool.go | 6 +- 4 files changed, 298 insertions(+), 175 deletions(-) create mode 100644 client/object_put_raw.go create mode 100644 client/object_put_transformer.go diff --git a/client/object_put.go b/client/object_put.go index d56a6fca..2861ea41 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -3,29 +3,29 @@ package client import ( "context" "crypto/ecdsa" - "errors" - "fmt" - "io" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" - v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" - rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" ) +// defaultGRPCPayloadChunkLen default value for maxChunkLen. +// See PrmObjectPutInit.SetGRPCPayloadChunkLen for details. +const defaultGRPCPayloadChunkLen = 3 << 20 + // PrmObjectPutInit groups parameters of ObjectPutInit operation. type PrmObjectPutInit struct { - copyNum []uint32 - key *ecdsa.PrivateKey - meta v2session.RequestMetaHeader - maxChunkLen int + copyNum []uint32 + key *ecdsa.PrivateKey + meta v2session.RequestMetaHeader + maxChunkLen int + maxSize uint64 + epochSource transformer.EpochSource + withoutHomomorphicHash bool } // SetCopiesNumber sets number of object copies that is enough to consider put successful. @@ -61,31 +61,35 @@ func (x ResObjectPut) StoredObjectID() oid.ID { return x.obj } -// ObjectWriter is designed to write one object to FrostFS system. +// ObjectWriter is designed to write one object or +// multiple parts of one object to FrostFS system. // // Must be initialized using Client.ObjectPutInit, any other // usage is unsafe. -type ObjectWriter struct { - cancelCtxStream context.CancelFunc - - client *Client - stream interface { - Write(*v2object.PutRequest) error - Close() error - } - - key *ecdsa.PrivateKey - res ResObjectPut - err error - - chunkCalled bool - - respV2 v2object.PutResponse - req v2object.PutRequest - partInit v2object.PutObjectPartInit - partChunk v2object.PutObjectPartChunk - - maxChunkLen int +type ObjectWriter interface { + // WriteHeader writes header of the object. Result means success. + // Failure reason can be received via Close. + WriteHeader(context.Context, object.Object) bool + // WritePayloadChunk writes chunk of the object payload. Result means success. + // Failure reason can be received via Close. + WritePayloadChunk(context.Context, []byte) bool + // Close ends writing the object and returns the result of the operation + // along with the final results. Must be called after using the ObjectWriter. + // + // Exactly one return value is non-nil. By default, server status is returned in res structure. + // Any client's internal or transport errors are returned as Go built-in error. + // If Client is tuned to resolve FrostFS API statuses, then FrostFS failures + // codes are returned as error. + // + // Return statuses: + // - global (see Client docs); + // - *apistatus.ContainerNotFound; + // - *apistatus.ObjectAccessDenied; + // - *apistatus.ObjectLocked; + // - *apistatus.LockNonRegularObject; + // - *apistatus.SessionTokenNotFound; + // - *apistatus.SessionTokenExpired. + Close(context.Context) (*ResObjectPut, error) } // UseKey specifies private key to sign the requests. @@ -124,122 +128,21 @@ func (x *PrmObjectPutInit) WithXHeaders(hs ...string) { writeXHeadersToMeta(hs, &x.meta) } -// WriteHeader writes header of the object. Result means success. -// Failure reason can be received via Close. -func (x *ObjectWriter) WriteHeader(hdr object.Object) bool { - v2Hdr := hdr.ToV2() - - x.partInit.SetObjectID(v2Hdr.GetObjectID()) - x.partInit.SetHeader(v2Hdr.GetHeader()) - x.partInit.SetSignature(v2Hdr.GetSignature()) - - x.req.GetBody().SetObjectPart(&x.partInit) - x.req.SetVerificationHeader(nil) - - x.err = signature.SignServiceMessage(x.key, &x.req) - if x.err != nil { - x.err = fmt.Errorf("sign message: %w", x.err) - return false - } - - x.err = x.stream.Write(&x.req) - return x.err == nil +// WithObjectMaxSize specifies max object size value and use it during object splitting. +// When specified, start writing to the stream only after the object is formed. +// Continue processing the input only when the previous formed object has been successfully written. +func (x *PrmObjectPutInit) WithObjectMaxSize(maxSize uint64) { + x.maxSize = maxSize } -// WritePayloadChunk writes chunk of the object payload. Result means success. -// Failure reason can be received via Close. -func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { - if !x.chunkCalled { - x.chunkCalled = true - x.req.GetBody().SetObjectPart(&x.partChunk) - } - - for ln := len(chunk); ln > 0; ln = len(chunk) { - if ln > x.maxChunkLen { - ln = x.maxChunkLen - } - - // we deal with size limit overflow above, but there is another case: - // what if method is called with "small" chunk many times? We write - // a message to the stream on each call. Alternatively, we could use buffering. - // In most cases, the chunk length does not vary between calls. Given this - // assumption, as well as the length of the payload from the header, it is - // possible to buffer the data of intermediate chunks, and send a message when - // the allocated buffer is filled, or when the last chunk is received. - // It is mentally assumed that allocating and filling the buffer is better than - // synchronous sending, but this needs to be tested. - x.partChunk.SetChunk(chunk[:ln]) - x.req.SetVerificationHeader(nil) - - x.err = signature.SignServiceMessage(x.key, &x.req) - if x.err != nil { - x.err = fmt.Errorf("sign message: %w", x.err) - return false - } - - x.err = x.stream.Write(&x.req) - if x.err != nil { - return false - } - - chunk = chunk[ln:] - } - - return true +// WithoutHomomorphicHash if set to true do not use Tillich-ZĂ©mor hash for payload. +func (x *PrmObjectPutInit) WithoutHomomorphicHash(v bool) { + x.withoutHomomorphicHash = v } -// Close ends writing the object and returns the result of the operation -// along with the final results. Must be called after using the ObjectWriter. -// -// Exactly one return value is non-nil. By default, server status is returned in res structure. -// Any client's internal or transport errors are returned as Go built-in error. -// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures -// codes are returned as error. -// -// Return statuses: -// - global (see Client docs); -// - *apistatus.ContainerNotFound; -// - *apistatus.ObjectAccessDenied; -// - *apistatus.ObjectLocked; -// - *apistatus.LockNonRegularObject; -// - *apistatus.SessionTokenNotFound; -// - *apistatus.SessionTokenExpired. -func (x *ObjectWriter) Close() (*ResObjectPut, error) { - defer x.cancelCtxStream() - - // Ignore io.EOF error, because it is expected error for client-side - // stream termination by the server. E.g. when stream contains invalid - // message. Server returns an error in response message (in status). - if x.err != nil && !errors.Is(x.err, io.EOF) { - return nil, x.err - } - - if x.err = x.stream.Close(); x.err != nil { - return nil, x.err - } - - x.res.st, x.err = x.client.processResponse(&x.respV2) - if x.err != nil { - return nil, x.err - } - - if !apistatus.IsSuccessful(x.res.st) { - return &x.res, nil - } - - const fieldID = "ID" - - idV2 := x.respV2.GetBody().GetObjectID() - if idV2 == nil { - return nil, newErrMissingResponseField(fieldID) - } - - x.err = x.res.obj.ReadFromV2(*idV2) - if x.err != nil { - x.err = newErrInvalidResponseField(fieldID, x.err) - } - - return &x.res, nil +// WithEpochSource specifies epoch for object when split it on client side. +func (x *PrmObjectPutInit) WithEpochSource(es transformer.EpochSource) { + x.epochSource = es } // ObjectPutInit initiates writing an object through a remote server using FrostFS API protocol. @@ -249,31 +152,9 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) { // // Returns an error if parameters are set incorrectly. // Context is required and must not be nil. It is used for network communication. -func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*ObjectWriter, error) { - var w ObjectWriter - - ctx, cancel := context.WithCancel(ctx) - stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx)) - if err != nil { - cancel() - return nil, fmt.Errorf("open stream: %w", err) +func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (ObjectWriter, error) { + if prm.maxSize > 0 { + return c.objectPutInitTransformer(prm) } - - w.key = &c.prm.key - if prm.key != nil { - w.key = prm.key - } - w.cancelCtxStream = cancel - w.client = c - w.stream = stream - w.partInit.SetCopiesNumber(prm.copyNum) - w.req.SetBody(new(v2object.PutRequestBody)) - if prm.maxChunkLen > 0 { - w.maxChunkLen = prm.maxChunkLen - } else { - w.maxChunkLen = 3 << 20 - } - c.prepareRequest(&w.req, &prm.meta) - - return &w, nil + return c.objectPutInitRaw(ctx, prm) } diff --git a/client/object_put_raw.go b/client/object_put_raw.go new file mode 100644 index 00000000..64cb2507 --- /dev/null +++ b/client/object_put_raw.go @@ -0,0 +1,154 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + + v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +func (c *Client) objectPutInitRaw(ctx context.Context, prm PrmObjectPutInit) (*objectWriterRaw, error) { + var w objectWriterRaw + stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + + w.key = &c.prm.key + if prm.key != nil { + w.key = prm.key + } + w.client = c + w.stream = stream + w.partInit.SetCopiesNumber(prm.copyNum) + w.req.SetBody(new(v2object.PutRequestBody)) + if prm.maxChunkLen > 0 { + w.maxChunkLen = prm.maxChunkLen + } else { + w.maxChunkLen = defaultGRPCPayloadChunkLen + } + c.prepareRequest(&w.req, &prm.meta) + return &w, nil +} + +type objectWriterRaw struct { + client *Client + stream interface { + Write(*v2object.PutRequest) error + Close() error + } + + key *ecdsa.PrivateKey + res ResObjectPut + err error + chunkCalled bool + respV2 v2object.PutResponse + req v2object.PutRequest + partInit v2object.PutObjectPartInit + partChunk v2object.PutObjectPartChunk + maxChunkLen int +} + +func (x *objectWriterRaw) WriteHeader(_ context.Context, hdr object.Object) bool { + v2Hdr := hdr.ToV2() + + x.partInit.SetObjectID(v2Hdr.GetObjectID()) + x.partInit.SetHeader(v2Hdr.GetHeader()) + x.partInit.SetSignature(v2Hdr.GetSignature()) + + x.req.GetBody().SetObjectPart(&x.partInit) + x.req.SetVerificationHeader(nil) + + x.err = signature.SignServiceMessage(x.key, &x.req) + if x.err != nil { + x.err = fmt.Errorf("sign message: %w", x.err) + return false + } + + x.err = x.stream.Write(&x.req) + return x.err == nil +} + +func (x *objectWriterRaw) WritePayloadChunk(_ context.Context, chunk []byte) bool { + if !x.chunkCalled { + x.chunkCalled = true + x.req.GetBody().SetObjectPart(&x.partChunk) + } + + for ln := len(chunk); ln > 0; ln = len(chunk) { + if ln > x.maxChunkLen { + ln = x.maxChunkLen + } + + // we deal with size limit overflow above, but there is another case: + // what if method is called with "small" chunk many times? We write + // a message to the stream on each call. Alternatively, we could use buffering. + // In most cases, the chunk length does not vary between calls. Given this + // assumption, as well as the length of the payload from the header, it is + // possible to buffer the data of intermediate chunks, and send a message when + // the allocated buffer is filled, or when the last chunk is received. + // It is mentally assumed that allocating and filling the buffer is better than + // synchronous sending, but this needs to be tested. + x.partChunk.SetChunk(chunk[:ln]) + x.req.SetVerificationHeader(nil) + + x.err = signature.SignServiceMessage(x.key, &x.req) + if x.err != nil { + x.err = fmt.Errorf("sign message: %w", x.err) + return false + } + + x.err = x.stream.Write(&x.req) + if x.err != nil { + return false + } + + chunk = chunk[ln:] + } + + return true +} + +func (x *objectWriterRaw) Close(_ context.Context) (*ResObjectPut, error) { + // Ignore io.EOF error, because it is expected error for client-side + // stream termination by the server. E.g. when stream contains invalid + // message. Server returns an error in response message (in status). + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err + } + + if x.err = x.stream.Close(); x.err != nil { + return nil, x.err + } + + x.res.st, x.err = x.client.processResponse(&x.respV2) + if x.err != nil { + return nil, x.err + } + + if !apistatus.IsSuccessful(x.res.st) { + return &x.res, nil + } + + const fieldID = "ID" + + idV2 := x.respV2.GetBody().GetObjectID() + if idV2 == nil { + return nil, newErrMissingResponseField(fieldID) + } + + x.err = x.res.obj.ReadFromV2(*idV2) + if x.err != nil { + x.err = newErrInvalidResponseField(fieldID, x.err) + } + + return &x.res, nil +} diff --git a/client/object_put_transformer.go b/client/object_put_transformer.go new file mode 100644 index 00000000..636e56bd --- /dev/null +++ b/client/object_put_transformer.go @@ -0,0 +1,88 @@ +package client + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" +) + +func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTransformer, error) { + var w objectWriterTransformer + w.it = internalTarget{ + client: c, + prm: prm, + } + key := &c.prm.key + if prm.key != nil { + key = prm.key + } + w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{ + Key: key, + NextTargetInit: func() transformer.ObjectTarget { return &w.it }, + MaxSize: prm.maxSize, + WithoutHomomorphicHash: prm.withoutHomomorphicHash, + NetworkState: prm.epochSource, + }) + return &w, nil +} + +type objectWriterTransformer struct { + ot transformer.ObjectTarget + it internalTarget + err error +} + +func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool { + x.err = x.ot.WriteHeader(ctx, &hdr) + return x.err == nil +} + +func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool { + _, x.err = x.ot.Write(ctx, chunk) + return x.err == nil +} + +func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) { + if ai, err := x.ot.Close(ctx); err != nil { + return nil, err + } else { + if ai != nil && ai.ParentID != nil { + x.it.res.obj = *ai.ParentID + } + return x.it.res, nil + } +} + +type internalTarget struct { + current *object.Object + client *Client + res *ResObjectPut + prm PrmObjectPutInit + payload []byte +} + +func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error { + it.current = object + return nil +} + +func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error) { + it.payload = append(it.payload, p...) + return len(p), nil +} + +func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) { + it.current.SetPayload(it.payload) + wrt, err := it.client.objectPutInitRaw(ctx, it.prm) + if err != nil { + return nil, err + } + if wrt.WriteHeader(ctx, *it.current) { + wrt.WritePayloadChunk(ctx, it.current.Payload()) + } + it.res, err = wrt.Close(ctx) + it.current = nil + it.payload = nil + return nil, err +} diff --git a/pool/pool.go b/pool/pool.go index b56c574c..2f662d36 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -645,7 +645,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID return oid.ID{}, fmt.Errorf("init writing on API client: %w", err) } - if wObj.WriteHeader(prm.hdr) { + if wObj.WriteHeader(ctx, prm.hdr) { sz := prm.hdr.PayloadSize() if data := prm.hdr.Payload(); len(data) > 0 { @@ -672,7 +672,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID n, err = prm.payload.Read(buf) if n > 0 { start = time.Now() - successWrite := wObj.WritePayloadChunk(buf[:n]) + successWrite := wObj.WritePayloadChunk(ctx, buf[:n]) c.incRequests(time.Since(start), methodObjectPut) if !successWrite { break @@ -690,7 +690,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID } } - res, err := wObj.Close() + res, err := wObj.Close(ctx) var st apistatus.Status if res != nil { st = res.Status()