diff --git a/client/object_patch.go b/client/object_patch.go new file mode 100644 index 0000000..fb6bd60 --- /dev/null +++ b/client/object_patch.go @@ -0,0 +1,270 @@ +package client + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl" + v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" +) + +// ObjectPatcher is designed to patch an object. +// +// Must be initialized using Client.ObjectPatchInit, any other +// usage is unsafe. +type ObjectPatcher interface { + // PatchAttributes patches attributes. Attributes can be patched no more than once, + // otherwise, the server returns an error. + // + // Result means success. Failure reason can be received via Close. + PatchAttributes(ctx context.Context, newAttrs []object.Attribute, replace bool) bool + + // PatchPayload patches the object's payload. + // + // PatchPayload receives `payloadReader` and thus the payload of the patch is read and sent by chunks of + // `MaxChunkLength` length. + // + // Result means success. Failure reason can be received via Close. + PatchPayload(ctx context.Context, rng *object.Range, payloadReader io.Reader) bool + + // Close ends patching the object and returns the result of the operation + // along with the final results. Must be called after using the ObjectPatcher. + // + // Exactly one return value is non-nil. By default, server status is returned in res structure. + // Any client's internal or transport errors are returned as Go built-in error. + // If Client is tuned to resolve FrostFS API statuses, then FrostFS failures + // codes are returned as error. + // + // Return statuses: + // - global (see Client docs); + // - *apistatus.ContainerNotFound; + // - *apistatus.ContainerAccessDenied; + // - *apistatus.ObjectAccessDenied; + // - *apistatus.ObjectAlreadyRemoved; + // - *apistatus.ObjectLocked; + // - *apistatus.ObjectOutOfRange; + // - *apistatus.SessionTokenNotFound; + // - *apistatus.SessionTokenExpired. + Close(_ context.Context) (*ResObjectPatch, error) +} + +// ResObjectPatch groups resulting values of ObjectPatch operation. +type ResObjectPatch struct { + statusRes + + obj oid.ID +} + +// ObjectID returns an object ID of the patched object. +func (r ResObjectPatch) ObjectID() oid.ID { + return r.obj +} + +// PrmObjectPatch groups parameters of ObjectPatch operation. +type PrmObjectPatch struct { + XHeaders []string + + Address oid.Address + + BearerToken *bearer.Token + + Session *session.Object + + Key *ecdsa.PrivateKey + + MaxChunkLength int +} + +// ObjectPatchInit initializes object patcher. +func (c *Client) ObjectPatchInit(ctx context.Context, prm PrmObjectPatch) (ObjectPatcher, error) { + if len(prm.XHeaders)%2 != 0 { + return nil, errorInvalidXHeaders + } + + var objectPatcher objectPatcher + stream, err := rpcapi.Patch(&c.c, &objectPatcher.respV2, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + + objectPatcher.addr = prm.Address + objectPatcher.key = &c.prm.Key + if prm.Key != nil { + objectPatcher.key = prm.Key + } + objectPatcher.client = c + objectPatcher.stream = stream + + if prm.MaxChunkLength > 0 { + objectPatcher.maxChunkLen = prm.MaxChunkLength + } else { + objectPatcher.maxChunkLen = defaultGRPCPayloadChunkLen + } + + objectPatcher.req.SetBody(&v2object.PatchRequestBody{}) + + meta := new(v2session.RequestMetaHeader) + writeXHeadersToMeta(prm.XHeaders, meta) + + if prm.BearerToken != nil { + v2BearerToken := new(acl.BearerToken) + prm.BearerToken.WriteToV2(v2BearerToken) + meta.SetBearerToken(v2BearerToken) + } + + if prm.Session != nil { + v2SessionToken := new(v2session.Token) + prm.Session.WriteToV2(v2SessionToken) + meta.SetSessionToken(v2SessionToken) + } + + c.prepareRequest(&objectPatcher.req, meta) + + return &objectPatcher, nil +} + +type objectPatcher struct { + client *Client + + stream interface { + Write(*v2object.PatchRequest) error + Close() error + } + + key *ecdsa.PrivateKey + res ResObjectPatch + err error + + addr oid.Address + + req v2object.PatchRequest + respV2 v2object.PatchResponse + + maxChunkLen int +} + +func (x *objectPatcher) PatchAttributes(ctx context.Context, newAttrs []object.Attribute, replace bool) bool { + return x.patch(&object.Patch{ + Address: x.addr, + + NewAttributes: newAttrs, + + ReplaceAttributes: replace, + }) +} + +func (x *objectPatcher) PatchPayload(ctx context.Context, rng *object.Range, payloadReader io.Reader) bool { + offset, length := rng.GetOffset(), rng.GetLength() + originalLength := length + + buf := make([]byte, x.maxChunkLen) + + for { + n, err := payloadReader.Read(buf) + if err != nil && err != io.EOF { + x.err = fmt.Errorf("read payload: %w", err) + return false + } + if n == 0 { + break + } + + patchLength := uint64(n) + if length > 0 { + patchLength = min(uint64(n), length) + } + + rngPart := object.NewRange() + rngPart.SetOffset(offset) + + if originalLength > 0 && offset-rng.GetOffset() >= originalLength { + rngPart.SetLength(0) + } else { + rngPart.SetLength(patchLength) + } + + if !x.patch(&object.Patch{ + Address: x.addr, + + PayloadPatch: &object.PayloadPatch{ + Range: rngPart, + + Chunk: buf[:n], + }, + }) { + return false + } + + offset += uint64(n) + if length > 0 { + length -= patchLength + } + if err == io.EOF { + break + } + } + + return true +} + +func (x *objectPatcher) patch(patch *object.Patch) bool { + x.req.SetBody(patch.ToV2()) + x.req.SetVerificationHeader(nil) + + x.err = signature.SignServiceMessage(x.key, &x.req) + if x.err != nil { + x.err = fmt.Errorf("sign message: %w", x.err) + return false + } + + x.err = x.stream.Write(&x.req) + return x.err == nil +} + +func (x *objectPatcher) Close(_ context.Context) (*ResObjectPatch, error) { + // Ignore io.EOF error, because it is expected error for client-side + // stream termination by the server. E.g. when stream contains invalid + // message. Server returns an error in response message (in status). + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err + } + + if x.err = x.stream.Close(); x.err != nil { + return nil, x.err + } + + x.res.st, x.err = x.client.processResponse(&x.respV2) + if x.err != nil { + return nil, x.err + } + + if !apistatus.IsSuccessful(x.res.st) { + return &x.res, nil + } + + const fieldID = "ID" + + idV2 := x.respV2.Body.ObjectID + if idV2 == nil { + return nil, newErrMissingResponseField(fieldID) + } + + x.err = x.res.obj.ReadFromV2(*idV2) + if x.err != nil { + x.err = newErrInvalidResponseField(fieldID, x.err) + } + + return &x.res, nil +} diff --git a/client/object_patch_test.go b/client/object_patch_test.go new file mode 100644 index 0000000..1489e82 --- /dev/null +++ b/client/object_patch_test.go @@ -0,0 +1,158 @@ +package client + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "testing" + + v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +type mockPatchStream struct { + streamedPayloadPatches []*object.PayloadPatch +} + +func (m *mockPatchStream) Write(r *v2object.PatchRequest) error { + pp := new(object.PayloadPatch) + pp.FromV2(r.GetBody().GetPatch()) + + if r.GetBody().GetPatch() != nil { + bodyChunk := r.GetBody().GetPatch().Chunk + pp.Chunk = make([]byte, len(bodyChunk)) + copy(pp.Chunk, bodyChunk) + } + + m.streamedPayloadPatches = append(m.streamedPayloadPatches, pp) + + return nil +} + +func (m *mockPatchStream) Close() error { + return nil +} + +func TestObjectPatcher(t *testing.T) { + t.Run("no split payload patch", func(t *testing.T) { + m := &mockPatchStream{} + + pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + + patcher := objectPatcher{ + client: &Client{}, + + stream: m, + + addr: oidtest.Address(), + + key: pk, + + maxChunkLen: defaultGRPCPayloadChunkLen, + } + + patchPayload := []byte("011111") + + success := patcher.PatchAttributes(context.Background(), nil, false) + require.True(t, success) + + success = patcher.PatchPayload(context.Background(), newRange(0, 6), bytes.NewReader(patchPayload)) + require.True(t, success) + + require.Len(t, m.streamedPayloadPatches, 2) + + // m.streamedPayloadPatches[0] is attribute patch, so skip it + requireRangeChunk(t, m.streamedPayloadPatches[1], 0, 6, "011111") + }) + + t.Run("splitted payload patch", func(t *testing.T) { + m := &mockPatchStream{} + + pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + + const maxChunkLen = 2 + + patcher := objectPatcher{ + client: &Client{}, + + stream: m, + + addr: oidtest.Address(), + + key: pk, + + maxChunkLen: maxChunkLen, + } + + patchPayload := []byte("012345") + + success := patcher.PatchAttributes(context.Background(), nil, false) + require.True(t, success) + + success = patcher.PatchPayload(context.Background(), newRange(0, 6), bytes.NewReader(patchPayload)) + require.True(t, success) + + require.Len(t, m.streamedPayloadPatches, 4) + + // m.streamedPayloadPatches[0] is attribute patch, so skip it + requireRangeChunk(t, m.streamedPayloadPatches[1], 0, 2, "01") + requireRangeChunk(t, m.streamedPayloadPatches[2], 2, 2, "23") + requireRangeChunk(t, m.streamedPayloadPatches[3], 4, 2, "45") + }) + + t.Run("splitted payload patch with zero length", func(t *testing.T) { + m := &mockPatchStream{} + + pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + + const maxChunkLen = 2 + + patcher := objectPatcher{ + client: &Client{}, + + stream: m, + + addr: oidtest.Address(), + + key: pk, + + maxChunkLen: maxChunkLen, + } + + patchPayload := []byte("0123456789!@") + + success := patcher.PatchAttributes(context.Background(), nil, false) + require.True(t, success) + + success = patcher.PatchPayload(context.Background(), newRange(0, 4), bytes.NewReader(patchPayload)) + require.True(t, success) + + require.Len(t, m.streamedPayloadPatches, 7) + + // m.streamedPayloadPatches[0] is attribute patch, so skip it + requireRangeChunk(t, m.streamedPayloadPatches[1], 0, 2, "01") + requireRangeChunk(t, m.streamedPayloadPatches[2], 2, 2, "23") + requireRangeChunk(t, m.streamedPayloadPatches[3], 4, 0, "45") + requireRangeChunk(t, m.streamedPayloadPatches[4], 6, 0, "67") + requireRangeChunk(t, m.streamedPayloadPatches[5], 8, 0, "89") + requireRangeChunk(t, m.streamedPayloadPatches[6], 10, 0, "!@") + }) +} + +func requireRangeChunk(t *testing.T, pp *object.PayloadPatch, offset, length int, chunk string) { + require.NotNil(t, pp) + require.Equal(t, uint64(offset), pp.Range.GetOffset()) + require.Equal(t, uint64(length), pp.Range.GetLength()) + require.Equal(t, []byte(chunk), pp.Chunk) +} + +func newRange(offest, length uint64) *object.Range { + rng := &object.Range{} + rng.SetOffset(offest) + rng.SetLength(length) + return rng +}