From c11741807b950de5dad60fcd42cecbd4d445f8b5 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Wed, 31 Jul 2024 17:56:45 +0300 Subject: [PATCH] [#247] object: Introduce `patcher` package * Introduce `patcher` package that contains such interfaces to be implemented: - `PatchApplier` - the main patching engine that merges the stream of patches and the stream of original object payload divided by ranges. The merged streams result is output to `ChunkedObjectWriter`; - `RangeProvider` - provides the original object payload by ranges; - `HeaderProvider` - provides the original object header. * Introduce `patcher` that implements `PatchApplier`; * Cover all possible cases with unit-tests. Signed-off-by: Airat Arifullin --- object/patcher/patcher.go | 216 +++++++++++++++ object/patcher/patcher_test.go | 471 +++++++++++++++++++++++++++++++++ 2 files changed, 687 insertions(+) create mode 100644 object/patcher/patcher.go create mode 100644 object/patcher/patcher_test.go diff --git a/object/patcher/patcher.go b/object/patcher/patcher.go new file mode 100644 index 0000000..160d8c8 --- /dev/null +++ b/object/patcher/patcher.go @@ -0,0 +1,216 @@ +package patcher + +import ( + "context" + "errors" + "fmt" + "io" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" +) + +var ( + ErrOffsetExceedsSize = errors.New("patch offset exceeds object size") + ErrInvalidPatchOffsetOrder = errors.New("invalid patch offset order") + ErrPayloadPatchIsNil = errors.New("payload patch is nil") +) + +// PatchRes is the result of patch application. +type PatchRes struct { + AccessIdentifiers *transformer.AccessIdentifiers +} + +// PatchApplier is the interface that provides method to apply header and payload patches. +type PatchApplier interface { + // ApplyAttributesPatch applies the patch only for the object's attributes. + // This method must be ALWAYS invoked first as attribute patch must income only with the first patch + // request message. + // + // ApplyAttributesPatch is idempotent for the original header if it's invoked with empty `newAttrs` and + // `replaceAttrs = false`. + ApplyAttributesPatch(ctx context.Context, newAttrs []objectSDK.Attribute, replaceAttrs bool) error + + // ApplyPayloadPatch applies the patch for the object's payload. + // + // If `payloadPatch` is nil, but this `ApplyPayloadPatch` is invoked first time, then the invocation returns nil. + // Otherwise, it returns `ErrPayloadPatchIsNil` error. + // + // This assumption for the first call is due to the fact that first `PatchRequest` message may be targeted only + // to patch attributes but not for payload. Non-first incoming messages are required to bear ONLY payload patches. + ApplyPayloadPatch(ctx context.Context, payloadPatch *objectSDK.PayloadPatch) error + + // Close closes PatchApplier when the patch stream is over. + Close(context.Context) (PatchRes, error) +} + +// RangeProvider is the interface that provides a method to get original object payload +// by a given range. +type RangeProvider interface { + // ReadRange reads an original object payload by the given range. + // The method returns io.Reader over the data range only. This means if the data is read out, + // then ReadRange has to be invoked to provide reader over the next range. + ReadRange(ctx context.Context, rng *objectSDK.Range) io.Reader +} + +type patcher struct { + rangeProvider RangeProvider + + objectWriter transformer.ChunkedObjectWriter + + currOffset uint64 + + originalPayloadSize uint64 + + hdr *objectSDK.Object + + firstApplyCall bool +} + +func New(hdr *objectSDK.Object, objectRangeSplitter RangeProvider, objectWriter transformer.ChunkedObjectWriter) PatchApplier { + return &patcher{ + rangeProvider: objectRangeSplitter, + + objectWriter: objectWriter, + + hdr: hdr, + + originalPayloadSize: hdr.PayloadSize(), + + firstApplyCall: true, + } +} + +func (p *patcher) ApplyAttributesPatch(ctx context.Context, newAttrs []objectSDK.Attribute, replaceAttrs bool) error { + if replaceAttrs { + mergedAttrs := mergeAttributes(newAttrs, p.hdr.Attributes()) + p.hdr.SetAttributes(mergedAttrs...) + } else { + p.hdr.SetAttributes(newAttrs...) + } + + if err := p.objectWriter.WriteHeader(ctx, p.hdr); err != nil { + return fmt.Errorf("writer header error: %w", err) + } + return nil +} + +func (p *patcher) ApplyPayloadPatch(ctx context.Context, payloadPatch *objectSDK.PayloadPatch) error { + if payloadPatch == nil { + if p.firstApplyCall { + return nil + } + return ErrPayloadPatchIsNil + } + + p.firstApplyCall = false + + if payloadPatch.Range.GetOffset() < p.currOffset { + return fmt.Errorf("%w: current = %d, previous = %d", ErrInvalidPatchOffsetOrder, payloadPatch.Range.GetOffset(), p.currOffset) + } + + if payloadPatch.Range.GetOffset() > p.originalPayloadSize { + return fmt.Errorf("%w: offset = %d, object size = %d", ErrOffsetExceedsSize, payloadPatch.Range.GetOffset(), p.originalPayloadSize) + } + + var err error + if p.currOffset, err = p.applyPatch(ctx, payloadPatch, p.currOffset); err != nil { + return fmt.Errorf("apply patch error: %w", err) + } + + return nil +} + +func (p *patcher) Close(ctx context.Context) (PatchRes, error) { + rng := new(objectSDK.Range) + rng.SetOffset(p.currOffset) + rng.SetLength(p.originalPayloadSize - p.currOffset) + + rdr := p.rangeProvider.ReadRange(ctx, rng) + for { + remain := make([]byte, 1024) + n, err := rdr.Read(remain) + if err != nil { + if err == io.EOF { + break + } + return PatchRes{}, fmt.Errorf("read error: %w", err) + } + _, err = p.objectWriter.Write(ctx, remain[:n]) + if err != nil { + return PatchRes{}, fmt.Errorf("write error: %w", err) + } + } + + aid, err := p.objectWriter.Close(ctx) + if err != nil { + return PatchRes{}, fmt.Errorf("close object writer error: %w", err) + } + + return PatchRes{ + AccessIdentifiers: aid, + }, nil +} + +func (p *patcher) applyPatch(ctx context.Context, payloadPatch *objectSDK.PayloadPatch, offset uint64) (newOffset uint64, err error) { + // write the original payload chunk before the start of the patch + if payloadPatch.Range.GetOffset() > offset { + rng := new(objectSDK.Range) + rng.SetOffset(offset) + rng.SetLength(payloadPatch.Range.GetOffset() - offset) + + rdr := p.rangeProvider.ReadRange(ctx, rng) + for { + orig := make([]byte, 1024) + var n int + n, err = rdr.Read(orig) + if err != nil { + if err == io.EOF { + break + } + err = fmt.Errorf("read error: %w", err) + return + } + _, err = p.objectWriter.Write(ctx, orig[:n]) + if err != nil { + err = fmt.Errorf("write error: %w", err) + return + } + } + + newOffset = payloadPatch.Range.GetOffset() + } + + // apply patch + if _, err = p.objectWriter.Write(ctx, payloadPatch.Chunk); err != nil { + return + } + + if payloadPatch.Range.GetLength() > 0 { + newOffset += payloadPatch.Range.GetLength() + } + + return +} + +func mergeAttributes(newAttrs, oldAttrs []objectSDK.Attribute) []objectSDK.Attribute { + attrMap := make(map[string]string) + + for _, attr := range oldAttrs { + attrMap[attr.Key()] = attr.Value() + } + + for _, newAttr := range newAttrs { + attrMap[newAttr.Key()] = newAttr.Value() + } + + var mergedAttrs []objectSDK.Attribute + for key, value := range attrMap { + var attr objectSDK.Attribute + attr.SetKey(key) + attr.SetValue(value) + mergedAttrs = append(mergedAttrs, attr) + } + + return mergedAttrs +} diff --git a/object/patcher/patcher_test.go b/object/patcher/patcher_test.go new file mode 100644 index 0000000..47f5da9 --- /dev/null +++ b/object/patcher/patcher_test.go @@ -0,0 +1,471 @@ +package patcher + +import ( + "bytes" + "context" + "io" + "testing" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" + "github.com/stretchr/testify/require" +) + +type mockPatchedObjectWriter struct { + obj *objectSDK.Object +} + +func (m *mockPatchedObjectWriter) Write(_ context.Context, chunk []byte) (int, error) { + res := append(m.obj.Payload(), chunk...) + + m.obj.SetPayload(res) + m.obj.SetPayloadSize(uint64(len(res))) + + return len(chunk), nil +} + +func (m *mockPatchedObjectWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error { + m.obj.ToV2().SetHeader(hdr.ToV2().GetHeader()) + return nil +} + +func (m *mockPatchedObjectWriter) Close(context.Context) (*transformer.AccessIdentifiers, error) { + return &transformer.AccessIdentifiers{}, nil +} + +type mockRangeProvider struct { + originalObjectPayload []byte +} + +var _ RangeProvider = (*mockRangeProvider)(nil) + +func (m *mockRangeProvider) ReadRange(_ context.Context, rng *objectSDK.Range) io.Reader { + offset := rng.GetOffset() + length := rng.GetLength() + + if length == 0 { + return bytes.NewReader(m.originalObjectPayload[offset:]) + } + return bytes.NewReader(m.originalObjectPayload[offset : offset+length]) +} + +func newTestObject() (*objectSDK.Object, oid.Address) { + obj := objectSDK.New() + + addr := oidtest.Address() + obj.SetContainerID(addr.Container()) + obj.SetID(addr.Object()) + + return obj, addr +} + +func rangeWithOffestWithLength(offset, length uint64) *objectSDK.Range { + rng := new(objectSDK.Range) + rng.SetOffset(offset) + rng.SetLength(length) + return rng +} + +func TestPatchRevert(t *testing.T) { + obj, _ := newTestObject() + + modifPatch := &objectSDK.Patch{ + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + + Chunk: []byte("inserted"), + }, + } + + originalObjectPayload := []byte("*******************") + + obj.SetPayload(originalObjectPayload) + obj.SetPayloadSize(uint64(len(originalObjectPayload))) + + exp := []byte("inserted*******************") + + rangeProvider := &mockRangeProvider{ + originalObjectPayload: originalObjectPayload, + } + + patchedObj, _ := newTestObject() + + wr := &mockPatchedObjectWriter{ + obj: patchedObj, + } + patcher := New(obj.CutPayload(), rangeProvider, wr) + + err := patcher.ApplyAttributesPatch(context.Background(), modifPatch.NewAttributes, modifPatch.ReplaceAttributes) + require.NoError(t, err) + + err = patcher.ApplyPayloadPatch(context.Background(), modifPatch.PayloadPatch) + require.NoError(t, err) + + _, err = patcher.Close(context.Background()) + require.NoError(t, err) + + require.Equal(t, exp, patchedObj.Payload()) + + revertPatch := &objectSDK.Patch{ + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, uint64(len("inserted"))), + + Chunk: []byte{}, + }, + } + + rangeProvider = &mockRangeProvider{ + originalObjectPayload: exp, + } + + patchedPatchedObj, _ := newTestObject() + + wr = &mockPatchedObjectWriter{ + obj: patchedPatchedObj, + } + + patcher = New(patchedObj.CutPayload(), rangeProvider, wr) + + err = patcher.ApplyAttributesPatch(context.Background(), revertPatch.NewAttributes, revertPatch.ReplaceAttributes) + require.NoError(t, err) + + err = patcher.ApplyPayloadPatch(context.Background(), revertPatch.PayloadPatch) + require.NoError(t, err) + + _, err = patcher.Close(context.Background()) + require.NoError(t, err) + + require.Equal(t, originalObjectPayload, patchedPatchedObj.Payload()) +} + +func newTestAttribute(key, val string) objectSDK.Attribute { + var attr objectSDK.Attribute + attr.SetKey(key) + attr.SetValue(val) + return attr +} + +func TestPatch(t *testing.T) { + + for _, test := range []struct { + name string + patches []objectSDK.Patch + originalObjectPayload []byte + patched []byte + expectedPayloadPatchErr error + }{ + { + name: "invalid offset", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(100, 0), + Chunk: []byte(""), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + expectedPayloadPatchErr: ErrOffsetExceedsSize, + }, + { + name: "invalid following patch offset", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(10, 0), + Chunk: []byte(""), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(7, 0), + Chunk: []byte(""), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + expectedPayloadPatchErr: ErrInvalidPatchOffsetOrder, + }, + { + name: "only header patch", + patches: []objectSDK.Patch{ + { + NewAttributes: []objectSDK.Attribute{ + newTestAttribute("key1", "val2"), + newTestAttribute("key2", "val2"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "header and payload", + patches: []objectSDK.Patch{ + { + NewAttributes: []objectSDK.Attribute{ + newTestAttribute("key1", "val2"), + newTestAttribute("key2", "val2"), + }, + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte("inserted at the beginning"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("inserted at the beginning0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "header only then payload", + patches: []objectSDK.Patch{ + { + NewAttributes: []objectSDK.Attribute{ + newTestAttribute("key1", "val2"), + newTestAttribute("key2", "val2"), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte("inserted at the beginning"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("inserted at the beginning0123456789qwertyuiopasdfghjklzxcvbnm"), + expectedPayloadPatchErr: ErrPayloadPatchIsNil, + }, + { + name: "no effect", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte(""), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(12, 0), + Chunk: []byte(""), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(20, 0), + Chunk: []byte(""), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "insert prefix", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte("inserted at the beginning"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("inserted at the beginning0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "insert in the middle", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(5, 0), + Chunk: []byte("inserted somewhere in the middle"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("01234inserted somewhere in the middle56789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "insert at the end", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(36, 0), + Chunk: []byte("inserted somewhere at the end"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("0123456789qwertyuiopasdfghjklzxcvbnminserted somewhere at the end"), + }, + { + name: "replace by range", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 12), + Chunk: []byte("just replace"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("just replaceertyuiopasdfghjklzxcvbnm"), + }, + { + name: "replace and insert some bytes", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 11), + Chunk: []byte("replace and append in the middle"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("replace and append in the middlewertyuiopasdfghjklzxcvbnm"), + }, + { + name: "replace and insert some bytes in the middle", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(5, 3), + Chunk: []byte("@@@@@"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("01234@@@@@89qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "a few patches: prefix, suffix", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte("this_will_be_prefix"), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(36, 0), + Chunk: []byte("this_will_be_suffix"), + }, + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("this_will_be_prefix0123456789qwertyuiopasdfghjklzxcvbnmthis_will_be_suffix"), + }, + { + name: "a few patches: replace and insert some bytes", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(10, 3), + Chunk: []byte("aaaaa"), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(16, 0), + Chunk: []byte("bbbbb"), + }, + }, + }, + originalObjectPayload: []byte("0123456789ABCDEF"), + patched: []byte("0123456789aaaaaDEFbbbbb"), + }, + { + name: "a few patches: various modifiactions", + patches: []objectSDK.Patch{ + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(4, 8), + Chunk: []byte("earliest"), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(13, 0), + Chunk: []byte("known "), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(35, 8), + Chunk: []byte("a small town"), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(62, 6), + Chunk: []byte("tablet"), + }, + }, + { + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(87, 0), + Chunk: []byte("Shar-Kali-Sharri"), + }, + }, + }, + originalObjectPayload: []byte("The ******** mention of Babylon as [insert] appears on a clay ****** from the reign of "), + patched: []byte("The earliest known mention of Babylon as a small town appears on a clay tablet from the reign of Shar-Kali-Sharri"), + }, + } { + t.Run(test.name, func(t *testing.T) { + rangeProvider := &mockRangeProvider{ + originalObjectPayload: test.originalObjectPayload, + } + + originalObject, _ := newTestObject() + originalObject.SetPayload(test.originalObjectPayload) + originalObject.SetPayloadSize(uint64(len(test.originalObjectPayload))) + + patchedObject, _ := newTestObject() + + wr := &mockPatchedObjectWriter{ + obj: patchedObject, + } + + patcher := New(originalObject.CutPayload(), rangeProvider, wr) + + for i, patch := range test.patches { + if i == 0 { + _ = patcher.ApplyAttributesPatch(context.Background(), patch.NewAttributes, patch.ReplaceAttributes) + } + + err := patcher.ApplyPayloadPatch(context.Background(), patch.PayloadPatch) + if err != nil { + if test.expectedPayloadPatchErr != nil { + require.ErrorIs(t, err, test.expectedPayloadPatchErr) + return + } else { + require.NoError(t, err) + } + } + } + + _, err := patcher.Close(context.Background()) + require.NoError(t, err) + require.Equal(t, test.patched, patchedObject.Payload()) + + // attrOfPatchedObject := patchedObject.Attributes() + // sort.Slice(attrOfPatchedObject, func(i, j int) bool { + // return strings.Compare(attrOfPatchedObject[i].Key(), attrOfPatchedObject[j].Key()) < 0 + // }) + + // testAttrs := test.newAttrs.ToSDKAttributes() + // sort.Slice(testAttrs, func(i, j int) bool { + // return strings.Compare(testAttrs[i].Key(), testAttrs[j].Key()) < 0 + // }) + + // require.Equal(t, testAttrs, attrOfPatchedObject) + }) + } + +}