From 3ba7446157d4007d8e34ea0e3fdb9429b823f2cc Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Mon, 5 Aug 2024 23:05:22 +0300 Subject: [PATCH] [#249] client: Introduce `ObjectPatch` Signed-off-by: Airat Arifullin --- client/object_patch.go | 259 ++++++++++++++++++++++++++++++++++++ client/object_patch_test.go | 209 +++++++++++++++++++++++++++++ 2 files changed, 468 insertions(+) create mode 100644 client/object_patch.go create mode 100644 client/object_patch_test.go diff --git a/client/object_patch.go b/client/object_patch.go new file mode 100644 index 0000000..b30bebe --- /dev/null +++ b/client/object_patch.go @@ -0,0 +1,259 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" + v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" +) + +// ObjectPatcher is designed to patch an object. +// +// Must be initialized using Client.ObjectPatchInit, any other +// usage is unsafe. +type ObjectPatcher interface { + // PatchAttributes patches attributes. Attributes can be patched no more than once, + // otherwise, the server returns an error. + // + // Result means success. Failure reason can be received via Close. + PatchAttributes(ctx context.Context, newAttrs []object.Attribute, replace bool) bool + + // PatchPayload patches the object's payload. + // + // PatchPayload receives `payloadReader` and thus the payload of the patch is read and sent by chunks of + // `MaxChunkLength` length. + // + // Result means success. Failure reason can be received via Close. + PatchPayload(ctx context.Context, rng *object.Range, payloadReader io.Reader) bool + + // Close ends patching the object and returns the result of the operation + // along with the final results. Must be called after using the ObjectPatcher. + // + // Exactly one return value is non-nil. By default, server status is returned in res structure. + // Any client's internal or transport errors are returned as Go built-in error. + // If Client is tuned to resolve FrostFS API statuses, then FrostFS failures + // codes are returned as error. + // + // Return statuses: + // - global (see Client docs); + // - *apistatus.ContainerNotFound; + // - *apistatus.ContainerAccessDenied; + // - *apistatus.ObjectAccessDenied; + // - *apistatus.ObjectAlreadyRemoved; + // - *apistatus.ObjectLocked; + // - *apistatus.ObjectOutOfRange; + // - *apistatus.SessionTokenNotFound; + // - *apistatus.SessionTokenExpired. + Close(_ context.Context) (*ResObjectPatch, error) +} + +// ResObjectPatch groups resulting values of ObjectPatch operation. +type ResObjectPatch struct { + statusRes + + obj oid.ID +} + +// ObjectID returns an object ID of the patched object. +func (r ResObjectPatch) ObjectID() oid.ID { + return r.obj +} + +// PrmObjectPatch groups parameters of ObjectPatch operation. +type PrmObjectPatch struct { + XHeaders []string + + Address oid.Address + + BearerToken *bearer.Token + + Session *session.Object + + Key *ecdsa.PrivateKey + + MaxChunkLength int +} + +// ObjectPatchInit initializes object patcher. +func (c *Client) ObjectPatchInit(ctx context.Context, prm PrmObjectPatch) (ObjectPatcher, error) { + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + + var objectPatcher objectPatcher + stream, err := rpcapi.Patch(&c.c, &objectPatcher.respV2, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + + objectPatcher.addr = prm.Address + objectPatcher.key = &c.prm.Key + if prm.Key != nil { + objectPatcher.key = prm.Key + } + objectPatcher.client = c + objectPatcher.stream = stream + objectPatcher.firstPatchPayload = true + + if prm.MaxChunkLength > 0 { + objectPatcher.maxChunkLen = prm.MaxChunkLength + } else { + objectPatcher.maxChunkLen = defaultGRPCPayloadChunkLen + } + + objectPatcher.req.SetBody(&v2object.PatchRequestBody{}) + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + c.prepareRequest(&objectPatcher.req, meta) + + return &objectPatcher, nil +} + +type objectPatcher struct { + client *Client + + stream interface { + Write(*v2object.PatchRequest) error + Close() error + } + + key *ecdsa.PrivateKey + res ResObjectPatch + err error + + addr oid.Address + + req v2object.PatchRequest + respV2 v2object.PatchResponse + + maxChunkLen int + + firstPatchPayload bool +} + +func (x *objectPatcher) PatchAttributes(_ context.Context, newAttrs []object.Attribute, replace bool) bool { + return x.patch(&object.Patch{ + Address: x.addr, + NewAttributes: newAttrs, + ReplaceAttributes: replace, + }) +} + +func (x *objectPatcher) PatchPayload(_ context.Context, rng *object.Range, payloadReader io.Reader) bool { + offset := rng.GetOffset() + + buf := make([]byte, x.maxChunkLen) + + for { + n, err := payloadReader.Read(buf) + if err != nil && err != io.EOF { + x.err = fmt.Errorf("read payload: %w", err) + return false + } + if n == 0 { + break + } + + rngPart := object.NewRange() + if x.firstPatchPayload { + x.firstPatchPayload = false + rngPart.SetOffset(offset) + rngPart.SetLength(rng.GetLength()) + } else { + rngPart.SetOffset(offset + rng.GetLength()) + } + + if !x.patch(&object.Patch{ + Address: x.addr, + PayloadPatch: &object.PayloadPatch{ + Range: rngPart, + Chunk: buf[:n], + }, + }) { + return false + } + + if err == io.EOF { + break + } + } + + return true +} + +func (x *objectPatcher) patch(patch *object.Patch) bool { + x.req.SetBody(patch.ToV2()) + x.req.SetVerificationHeader(nil) + + x.err = signature.SignServiceMessage(x.key, &x.req) + if x.err != nil { + x.err = fmt.Errorf("sign message: %w", x.err) + return false + } + + x.err = x.stream.Write(&x.req) + return x.err == nil +} + +func (x *objectPatcher) Close(_ context.Context) (*ResObjectPatch, error) { + // Ignore io.EOF error, because it is expected error for client-side + // stream termination by the server. E.g. when stream contains invalid + // message. Server returns an error in response message (in status). + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err + } + + if x.err = x.stream.Close(); x.err != nil { + return nil, x.err + } + + x.res.st, x.err = x.client.processResponse(&x.respV2) + if x.err != nil { + return nil, x.err + } + + if !apistatus.IsSuccessful(x.res.st) { + return &x.res, nil + } + + const fieldID = "ID" + + idV2 := x.respV2.Body.ObjectID + if idV2 == nil { + return nil, newErrMissingResponseField(fieldID) + } + + x.err = x.res.obj.ReadFromV2(*idV2) + if x.err != nil { + x.err = newErrInvalidResponseField(fieldID, x.err) + } + + return &x.res, nil +} diff --git a/client/object_patch_test.go b/client/object_patch_test.go new file mode 100644 index 0000000..9c87820 --- /dev/null +++ b/client/object_patch_test.go @@ -0,0 +1,209 @@ +package client + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "testing" + + v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +type mockPatchStream struct { + streamedPayloadPatches []*object.PayloadPatch +} + +func (m *mockPatchStream) Write(r *v2object.PatchRequest) error { + pp := new(object.PayloadPatch) + pp.FromV2(r.GetBody().GetPatch()) + + if r.GetBody().GetPatch() != nil { + bodyChunk := r.GetBody().GetPatch().Chunk + pp.Chunk = make([]byte, len(bodyChunk)) + copy(pp.Chunk, bodyChunk) + } + + m.streamedPayloadPatches = append(m.streamedPayloadPatches, pp) + + return nil +} + +func (m *mockPatchStream) Close() error { + return nil +} + +func TestObjectPatcher(t *testing.T) { + type part struct { + offset int + length int + chunk string + } + + for _, test := range []struct { + name string + patchPayload string + rng *object.Range + maxChunkLen int + expectParts []part + }{ + { + name: "no split payload patch", + patchPayload: "011111", + rng: newRange(0, 6), + maxChunkLen: defaultGRPCPayloadChunkLen, + expectParts: []part{ + { + offset: 0, + length: 6, + chunk: "011111", + }, + }, + }, + { + name: "splitted payload patch", + patchPayload: "012345", + rng: newRange(0, 6), + maxChunkLen: 2, + expectParts: []part{ + { + offset: 0, + length: 6, + chunk: "01", + }, + { + offset: 6, + length: 0, + chunk: "23", + }, + { + offset: 6, + length: 0, + chunk: "45", + }, + }, + }, + { + name: "splitted payload patch with zero-length subpatches", + patchPayload: "0123456789!@", + rng: newRange(0, 4), + maxChunkLen: 2, + expectParts: []part{ + { + offset: 0, + length: 4, + chunk: "01", + }, + { + offset: 4, + length: 0, + chunk: "23", + }, + { + offset: 4, + length: 0, + chunk: "45", + }, + { + offset: 4, + length: 0, + chunk: "67", + }, + { + offset: 4, + length: 0, + chunk: "89", + }, + { + offset: 4, + length: 0, + chunk: "!@", + }, + }, + }, + { + name: "splitted payload patch with zero-length subpatches only", + patchPayload: "0123456789!@", + rng: newRange(0, 0), + maxChunkLen: 2, + expectParts: []part{ + { + offset: 0, + length: 0, + chunk: "01", + }, + { + offset: 0, + length: 0, + chunk: "23", + }, + { + offset: 0, + length: 0, + chunk: "45", + }, + { + offset: 0, + length: 0, + chunk: "67", + }, + { + offset: 0, + length: 0, + chunk: "89", + }, + { + offset: 0, + length: 0, + chunk: "!@", + }, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + m := &mockPatchStream{} + + pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + + patcher := objectPatcher{ + client: &Client{}, + stream: m, + addr: oidtest.Address(), + key: pk, + maxChunkLen: test.maxChunkLen, + firstPatchPayload: true, + } + + success := patcher.PatchAttributes(context.Background(), nil, false) + require.True(t, success) + + success = patcher.PatchPayload(context.Background(), test.rng, bytes.NewReader([]byte(test.patchPayload))) + require.True(t, success) + + require.Len(t, m.streamedPayloadPatches, len(test.expectParts)+1) + + // m.streamedPayloadPatches[0] is attribute patch, so skip it + for i, part := range test.expectParts { + requireRangeChunk(t, m.streamedPayloadPatches[i+1], part.offset, part.length, part.chunk) + } + }) + } +} + +func requireRangeChunk(t *testing.T, pp *object.PayloadPatch, offset, length int, chunk string) { + require.NotNil(t, pp) + require.Equal(t, uint64(offset), pp.Range.GetOffset()) + require.Equal(t, uint64(length), pp.Range.GetLength()) + require.Equal(t, []byte(chunk), pp.Chunk) +} + +func newRange(offest, length uint64) *object.Range { + rng := &object.Range{} + rng.SetOffset(offest) + rng.SetLength(length) + return rng +}