From 6992fb664bd29ac60c3621799d809ddf898a50fc Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Mon, 5 Aug 2024 15:46:37 +0300 Subject: [PATCH 1/2] [#248] object: Introduce `SplitByMaxChunkLength` method * Add unit-test to test `SplitByMaxChunkLength`. Signed-off-by: Airat Arifullin --- object/patch.go | 36 ++++++++++++++++++++++++ object/patch_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/object/patch.go b/object/patch.go index 1cc30fc..e930d60 100644 --- a/object/patch.go +++ b/object/patch.go @@ -78,6 +78,42 @@ type PayloadPatch struct { Chunk []byte } +// SplitByMaxChunkLength splits a payload patch into a few payload patches if its `Chunk` size +// exceeds `maxChunkLen`. +func (p *PayloadPatch) SplitByMaxChunkLength(maxChunkLen int) []*PayloadPatch { + if len(p.Chunk) <= maxChunkLen { + return []*PayloadPatch{p} + } + + var result []*PayloadPatch + remainingChunk := p.Chunk + currentOffset := p.Range.GetOffset() + remainingLength := p.Range.GetLength() + + for len(remainingChunk) > 0 { + chunkSize := uint64(min(len(remainingChunk), maxChunkLen)) + newLength := min(remainingLength, chunkSize) + + rng := NewRange() + rng.SetOffset(currentOffset) + rng.SetLength(newLength) + + newPatch := &PayloadPatch{ + Range: rng, + Chunk: remainingChunk[:chunkSize], + } + result = append(result, newPatch) + + remainingChunk = remainingChunk[chunkSize:] + currentOffset += chunkSize + if remainingLength > 0 { + remainingLength -= newLength + } + } + + return result +} + func (p *PayloadPatch) ToV2() *v2object.PatchRequestBodyPatch { if p == nil { return nil diff --git a/object/patch_test.go b/object/patch_test.go index f66fd29..a0c6e60 100644 --- a/object/patch_test.go +++ b/object/patch_test.go @@ -10,6 +10,73 @@ import ( "github.com/stretchr/testify/require" ) +func newRange(offest, length uint64) *Range { + rng := &Range{} + rng.SetOffset(offest) + rng.SetLength(length) + return rng +} + +func TestSplitByMaxChunkLength(t *testing.T) { + t.Run("no break down", func(t *testing.T) { + payloadPatch := &PayloadPatch{ + Range: newRange(0, 42), + Chunk: []byte("100000|010000|001000|000100|000010|000001|"), + } + + payloadPatches := payloadPatch.SplitByMaxChunkLength(42) + + require.Len(t, payloadPatches, 1) + require.Equal(t, []byte("100000|010000|001000|000100|000010|000001|"), payloadPatches[0].Chunk) + + require.Equal(t, uint64(0), payloadPatches[0].Range.GetOffset()) + require.Equal(t, uint64(42), payloadPatches[0].Range.GetLength()) + }) + + t.Run("one replacing and two inserting patches", func(t *testing.T) { + payloadPatch := &PayloadPatch{ + Range: newRange(0, 15), + Chunk: []byte("100000|010000|001000|000100|000010|000001|"), + } + + payloadPatches := payloadPatch.SplitByMaxChunkLength(15) + + require.Len(t, payloadPatches, 3) + + require.Equal(t, []byte("100000|010000|0"), payloadPatches[0].Chunk) + require.Equal(t, uint64(0), payloadPatches[0].Range.GetOffset()) + require.Equal(t, uint64(15), payloadPatches[0].Range.GetLength()) + + require.Equal(t, []byte("01000|000100|00"), payloadPatches[1].Chunk) + require.Equal(t, uint64(15), payloadPatches[1].Range.GetOffset()) + require.Equal(t, uint64(0), payloadPatches[1].Range.GetLength()) + + require.Equal(t, []byte("0010|000001|"), payloadPatches[2].Chunk) + require.Equal(t, uint64(30), payloadPatches[2].Range.GetOffset()) + require.Equal(t, uint64(0), payloadPatches[2].Range.GetLength()) + }) + + t.Run("one replacing and one inserting patches", func(t *testing.T) { + payloadPatch := &PayloadPatch{ + Range: newRange(0, 30), + Chunk: []byte("100000|010000|001000|000100|000010|000001|"), + } + + payloadPatches := payloadPatch.SplitByMaxChunkLength(30) + + require.Len(t, payloadPatches, 2) + require.Equal(t, []byte("100000|010000|001000|000100|00"), payloadPatches[0].Chunk) + + require.Equal(t, uint64(0), payloadPatches[0].Range.GetOffset()) + require.Equal(t, uint64(30), payloadPatches[0].Range.GetLength()) + + require.Equal(t, []byte("0010|000001|"), payloadPatches[1].Chunk) + + require.Equal(t, uint64(30), payloadPatches[1].Range.GetOffset()) + require.Equal(t, uint64(0), payloadPatches[1].Range.GetLength()) + }) +} + func TestPatch(t *testing.T) { t.Run("to v2", func(t *testing.T) { t.Run("only attributes", func(t *testing.T) { -- 2.45.2 From 00547bc32dd43fd317fcbf8bee898965f6e90993 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Mon, 5 Aug 2024 16:29:16 +0300 Subject: [PATCH 2/2] [#248] client: Introduce object patcher Signed-off-by: Airat Arifullin --- client/object_patch.go | 249 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 client/object_patch.go diff --git a/client/object_patch.go b/client/object_patch.go new file mode 100644 index 0000000..55ce220 --- /dev/null +++ b/client/object_patch.go @@ -0,0 +1,249 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" + v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" +) + +// ObjectPatcher is designed to patch an object. +// +// Must be initialized using Client.ObjectPatchInit, any other +// usage is unsafe. +type ObjectPatcher interface { + // Patch patches the object. + // + // If object.Patch parameter sets `NewAttributes`, `ReplaceAttributes` or both fields, then the + // request is taken for patching the object's attributes. Attributes can be patched no more than once, + // otherwise, the server returns an error. + // + // If object.Patch parameter sets `PayloadPatch` with a large `Chunk`, then this patch is splitted + // into a few patches. After split each `PayloadPatch` has a `Chunk` with the size <= `MaxChunkLength`. + // + // Result means success. Failure reason can be received via Close. + Patch(_ context.Context, patch *object.Patch) bool + // Close ends patching the object and returns the result of the operation + // along with the final results. Must be called after using the ObjectPatcher. + // + // Exactly one return value is non-nil. By default, server status is returned in res structure. + // Any client's internal or transport errors are returned as Go built-in error. + // If Client is tuned to resolve FrostFS API statuses, then FrostFS failures + // codes are returned as error. + // + // Return statuses: + // - global (see Client docs); + // - *apistatus.ContainerNotFound; + // - *apistatus.ContainerAccessDenied; + // - *apistatus.ObjectAccessDenied; + // - *apistatus.ObjectAlreadyRemoved; + // - *apistatus.ObjectLocked; + // - *apistatus.ObjectOutOfRange; + // - *apistatus.SessionTokenNotFound; + // - *apistatus.SessionTokenExpired. + Close(_ context.Context) (*ResObjectPatch, error) +} + +// ResObjectPatch groups resulting values of ObjectPatch operation. +type ResObjectPatch struct { + statusRes + + obj oid.ID +} + +// ObjectID returns an object ID of the patched object. +func (r ResObjectPatch) ObjectID() oid.ID { + return r.obj +} + +// PrmObjectPatch groups parameters of ObjectPatch operation. +type PrmObjectPatch struct { + XHeaders []string + + BearerToken *bearer.Token + + Session *session.Object + + Local bool + + Address oid.Address + + Key *ecdsa.PrivateKey + + MaxChunkLength int +} + +// ObjectPatchInit initializes object patcher. +func (c *Client) ObjectPatchInit(ctx context.Context, prm PrmObjectPatch) (ObjectPatcher, error) { + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + + var objectPatcher objectPatcher + stream, err := rpcapi.Patch(&c.c, &objectPatcher.respV2, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + + objectPatcher.key = &c.prm.Key + if prm.Key != nil { + objectPatcher.key = prm.Key + } + objectPatcher.client = c + objectPatcher.stream = stream + objectPatcher.addr = prm.Address + + if prm.MaxChunkLength > 0 { + objectPatcher.maxChunkLen = prm.MaxChunkLength + } else { + objectPatcher.maxChunkLen = defaultGRPCPayloadChunkLen + } + + objectPatcher.req.SetBody(&v2object.PatchRequestBody{}) + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + if prm.Local { + meta.SetTTL(1) + } + + c.prepareRequest(&objectPatcher.req, meta) + + return &objectPatcher, nil +} + +type objectPatcher struct { + client *Client + + stream interface { + Write(*v2object.PatchRequest) error + Close() error + } + + key *ecdsa.PrivateKey + res ResObjectPatch + err error + + addr oid.Address + + req v2object.PatchRequest + respV2 v2object.PatchResponse + + maxChunkLen int +} + +func (x *objectPatcher) Patch(_ context.Context, patch *object.Patch) bool { + if patch == nil { + x.err = errors.New("nil patch") + return false + } + + patches := []*object.Patch{patch} + if patch.PayloadPatch != nil && len(patch.PayloadPatch.Chunk) > x.maxChunkLen { + patches = x.splitPatch(patch) + } + + for _, p := range patches { + if !x.send(p) { + return false + } + } + + return true +} + +// splitPatch splits Patch into a few ones if the original Patch has a large PayloadPatch that is required to be splitted. +func (x *objectPatcher) splitPatch(patch *object.Patch) []*object.Patch { + payloadPatches := patch.PayloadPatch.SplitByMaxChunkLength(x.maxChunkLen) + result := make([]*object.Patch, len(payloadPatches)) + + for i, pp := range payloadPatches { + newPatch := &object.Patch{ + Address: patch.Address, + PayloadPatch: pp, + } + if i == 0 { + newPatch.NewAttributes = patch.NewAttributes + newPatch.ReplaceAttributes = patch.ReplaceAttributes + } + result[i] = newPatch + } + + return result +} + +func (x *objectPatcher) send(patch *object.Patch) bool { + x.req.SetBody(patch.ToV2()) + x.req.SetVerificationHeader(nil) + + x.err = signature.SignServiceMessage(x.key, &x.req) + if x.err != nil { + x.err = fmt.Errorf("sign message: %w", x.err) + return false + } + + x.err = x.stream.Write(&x.req) + return x.err == nil +} + +func (x *objectPatcher) Close(_ context.Context) (*ResObjectPatch, error) { + // Ignore io.EOF error, because it is expected error for client-side + // stream termination by the server. E.g. when stream contains invalid + // message. Server returns an error in response message (in status). + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err + } + + if x.err = x.stream.Close(); x.err != nil { + return nil, x.err + } + + x.res.st, x.err = x.client.processResponse(&x.respV2) + if x.err != nil { + return nil, x.err + } + + if !apistatus.IsSuccessful(x.res.st) { + return &x.res, nil + } + + const fieldID = "ID" + + idV2 := x.respV2.Body.ObjectID + if idV2 == nil { + return nil, newErrMissingResponseField(fieldID) + } + + x.err = x.res.obj.ReadFromV2(*idV2) + if x.err != nil { + x.err = newErrInvalidResponseField(fieldID, x.err) + } + + return &x.res, nil +} -- 2.45.2