diff --git a/object/patcher/patcher.go b/object/patcher/patcher.go new file mode 100644 index 0000000..54ee484 --- /dev/null +++ b/object/patcher/patcher.go @@ -0,0 +1,249 @@ +package patcher + +import ( + "context" + "errors" + "fmt" + "io" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" +) + +var ( + ErrOffsetExceedsSize = errors.New("patch offset exceeds object size") + ErrEmptyPayloadPatch = errors.New("patch must contain payload") + ErrInvalidPatchObjectAddress = errors.New("invalid patch object address") + ErrInvalidPatchOffsetOrder = errors.New("invalid patch offset order") +) + +// PatchRes is the result of patch application. +type PatchRes struct { + AccessIdentifiers *transformer.AccessIdentifiers +} + +// PatchApplier is the interface that provides method to apply header and payload patches. +type PatchApplier interface { + // ApplyPatch applies the patch for an object. All patches must contain the same target object address. + // Patch offsets must be passed with non-descending order. + // Patching the payload is performed within all ApplyPatch ivocations. + // + // Unsuccessful call returns false. Error is checked with Close. + ApplyPatch(ctx context.Context, patch *objectSDK.Patch) bool + + // Close closes PatchApplier when the patch stream is over. + Close(context.Context) (PatchRes, error) +} + +// HeaderProvider is the interface that provides a method to get an original's object header. +type HeaderProvider interface { + // GetObjectHeader gets an original's object header. + GetObjectHeader(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) +} + +// RangeProvider is the interface that provides a method to get original object payload +// by a given range. +type RangeProvider interface { + // ReadRange reads an original object payload by the given range. + // The method returns io.Reader over the data range only. This means if the data is read out, + // then ReadRange has to be invoked to provide reader over the next range. + ReadRange(ctx context.Context, addr oid.Address, rng *objectSDK.Range) io.Reader +} + +type patcher struct { + rangeProvider RangeProvider + + objectWriter transformer.ChunkedObjectWriter + + hdrProvider HeaderProvider + + currOffset uint64 + + originalPayloadSize uint64 + + firstApplyPatchCall bool + + addr oid.Address + + closeErr error +} + +func New(hdrProvider HeaderProvider, objectRangeSplitter RangeProvider, objectWriter transformer.ChunkedObjectWriter) PatchApplier { + return &patcher{ + rangeProvider: objectRangeSplitter, + + objectWriter: objectWriter, + + hdrProvider: hdrProvider, + + firstApplyPatchCall: true, + } +} + +func (p *patcher) ApplyPatch(ctx context.Context, currPatch *objectSDK.Patch) bool { + if currPatch == nil { + return true + } + + if p.firstApplyPatchCall { + p.firstApplyPatchCall = false + + p.addr = currPatch.Address + + hdr, err := p.hdrProvider.GetObjectHeader(ctx, currPatch.Address) + if err != nil { + p.closeErr = fmt.Errorf("get header error: %w", err) + return false + } + + p.originalPayloadSize = hdr.PayloadSize() + + if !currPatch.ReplaceAttributes { + mergedAttrs := mergeAttributes(currPatch.NewAttributes, hdr.Attributes()) + hdr.SetAttributes(mergedAttrs...) + } else { + hdr.SetAttributes(currPatch.NewAttributes...) + } + + if err = p.objectWriter.WriteHeader(ctx, hdr); err != nil { + p.closeErr = fmt.Errorf("write header error: %w", err) + return false + } + + // only header patch + if currPatch.PayloadPatch == nil { + return true + } + } else { + // All patches can be applied only for the same object. + if !p.addr.Equals(currPatch.Address) { + p.closeErr = fmt.Errorf("%w: expected = %s, got = %s", + ErrInvalidPatchObjectAddress, + p.addr.EncodeToString(), + currPatch.Address.EncodeToString()) + return false + } + + if currPatch.PayloadPatch == nil { + p.closeErr = ErrEmptyPayloadPatch + return false + } + } + + if currPatch.PayloadPatch.Range.GetOffset() < p.currOffset { + p.closeErr = fmt.Errorf("%w: current = %d, previous = %d", ErrInvalidPatchOffsetOrder, currPatch.PayloadPatch.Range.GetOffset(), p.currOffset) + return false + } + + if currPatch.PayloadPatch.Range.GetOffset() > p.originalPayloadSize { + p.closeErr = fmt.Errorf("%w: offset = %d, object size = %d", ErrOffsetExceedsSize, currPatch.PayloadPatch.Range.GetOffset(), p.originalPayloadSize) + return false + } + + var err error + if p.currOffset, err = p.applyPatch(ctx, currPatch, p.currOffset); err != nil { + p.closeErr = fmt.Errorf("apply patch error: %w", err) + return false + } + + return true +} + +func (p *patcher) Close(ctx context.Context) (PatchRes, error) { + if p.closeErr != nil { + return PatchRes{}, p.closeErr + } + + rng := new(objectSDK.Range) + rng.SetOffset(p.currOffset) + rng.SetLength(p.originalPayloadSize - p.currOffset) + + rdr := p.rangeProvider.ReadRange(ctx, p.addr, rng) + for { + remain := make([]byte, 1024) + n, err := rdr.Read(remain) + if err != nil { + if err == io.EOF { + break + } + return PatchRes{}, fmt.Errorf("read error: %w", err) + } + _, err = p.objectWriter.Write(ctx, remain[:n]) + if err != nil { + return PatchRes{}, fmt.Errorf("write error: %w", err) + } + } + + aid, err := p.objectWriter.Close(ctx) + if err != nil { + return PatchRes{}, fmt.Errorf("close object writer error: %w", err) + } + + return PatchRes{ + AccessIdentifiers: aid, + }, nil +} + +func (p *patcher) applyPatch(ctx context.Context, currPatch *objectSDK.Patch, offset uint64) (newOffset uint64, err error) { + // write the original payload chunk before the start of the patch + if currPatch.PayloadPatch.Range.GetOffset() > offset { + rng := new(objectSDK.Range) + rng.SetOffset(offset) + rng.SetLength(currPatch.PayloadPatch.Range.GetOffset() - offset) + + rdr := p.rangeProvider.ReadRange(ctx, p.addr, rng) + for { + orig := make([]byte, 1024) + var n int + n, err = rdr.Read(orig) + if err != nil { + if err == io.EOF { + break + } + err = fmt.Errorf("read error: %w", err) + return + } + _, err = p.objectWriter.Write(ctx, orig[:n]) + if err != nil { + err = fmt.Errorf("write error: %w", err) + return + } + } + + newOffset = currPatch.PayloadPatch.Range.GetOffset() + } + + // apply patch + if _, err = p.objectWriter.Write(ctx, currPatch.PayloadPatch.Chunk); err != nil { + return + } + + if currPatch.PayloadPatch.Range.GetLength() > 0 { + newOffset += currPatch.PayloadPatch.Range.GetLength() + } + + return +} + +func mergeAttributes(newAttrs, oldAttrs []objectSDK.Attribute) []objectSDK.Attribute { + attrMap := make(map[string]string) + + for _, attr := range oldAttrs { + attrMap[attr.Key()] = attr.Value() + } + + for _, newAttr := range newAttrs { + attrMap[newAttr.Key()] = newAttr.Value() + } + + var mergedAttrs []objectSDK.Attribute + for key, value := range attrMap { + var attr objectSDK.Attribute + attr.SetKey(key) + attr.SetValue(value) + mergedAttrs = append(mergedAttrs, attr) + } + + return mergedAttrs +} diff --git a/object/patcher/patcher_test.go b/object/patcher/patcher_test.go new file mode 100644 index 0000000..95c313e --- /dev/null +++ b/object/patcher/patcher_test.go @@ -0,0 +1,556 @@ +package patcher + +import ( + "context" + "errors" + "io" + "testing" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" + "github.com/stretchr/testify/require" +) + +type mockPatchedObjectWriter struct { + obj *objectSDK.Object +} + +func (m *mockPatchedObjectWriter) Write(_ context.Context, chunk []byte) (int, error) { + res := append(m.obj.Payload(), chunk...) + + m.obj.SetPayload(res) + m.obj.SetPayloadSize(uint64(len(res))) + + return len(chunk), nil +} + +func (m *mockPatchedObjectWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error { + m.obj.ToV2().SetHeader(hdr.ToV2().GetHeader()) + return nil +} + +func (m *mockPatchedObjectWriter) Close(context.Context) (*transformer.AccessIdentifiers, error) { + return &transformer.AccessIdentifiers{}, nil +} + +type mockHeaderProvider struct { + obj *objectSDK.Object +} + +var _ HeaderProvider = (*mockHeaderProvider)(nil) + +func (m *mockHeaderProvider) GetObjectHeader(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { + return m.obj.CutPayload(), nil +} + +type mockRangeProvider struct { + originalObjectPayload []byte +} + +var _ RangeProvider = (*mockRangeProvider)(nil) + +func (m *mockRangeProvider) ReadRange(_ context.Context, addr oid.Address, rng *objectSDK.Range) io.Reader { + rdr, wrt := io.Pipe() + + if rng == nil { + wrt.CloseWithError(errors.New("no range provided")) + } + + if m.originalObjectPayload == nil { + wrt.CloseWithError(errors.New("no original payload provided")) + } + + offset := rng.GetOffset() + length := rng.GetLength() + + go func() { + var err error + + defer func() { + wrt.CloseWithError(err) + }() + + if rng == nil { + err = errors.New("no range provided") + return + } + + if m.originalObjectPayload == nil { + err = errors.New("no original payload provided") + return + } + + if length == 0 { + wrt.Write(m.originalObjectPayload[offset:]) + } + wrt.Write(m.originalObjectPayload[offset : offset+length]) + }() + + return rdr +} + +func newTestObject() (*objectSDK.Object, oid.Address) { + obj := objectSDK.New() + + addr := oidtest.Address() + obj.SetContainerID(addr.Container()) + obj.SetID(addr.Object()) + + return obj, addr +} + +func TestPatchRevert(t *testing.T) { + obj, addr := newTestObject() + + modifPatch := &objectSDK.Patch{ + Address: addr, + + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + + Chunk: []byte("inserted"), + }, + } + + originalObjectPayload := []byte("*******************") + + obj.SetPayload(originalObjectPayload) + obj.SetPayloadSize(uint64(len(originalObjectPayload))) + + exp := []byte("inserted*******************") + + rangeProvider := &mockRangeProvider{ + originalObjectPayload: originalObjectPayload, + } + + hdrProvider := &mockHeaderProvider{ + obj: obj, + } + + patchedObj, patchedAddr := newTestObject() + + wr := &mockPatchedObjectWriter{ + obj: patchedObj, + } + patcher := New(hdrProvider, rangeProvider, wr) + + applyRes := patcher.ApplyPatch(context.Background(), modifPatch) + require.True(t, applyRes) + _, err := patcher.Close(context.Background()) + require.NoError(t, err) + require.Equal(t, exp, patchedObj.Payload()) + + revertPatch := &objectSDK.Patch{ + Address: patchedAddr, + + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, uint64(len("inserted"))), + + Chunk: []byte{}, + }, + } + + rangeProvider = &mockRangeProvider{ + originalObjectPayload: exp, + } + + patchedPatchedObj, _ := newTestObject() + + wr = &mockPatchedObjectWriter{ + obj: patchedPatchedObj, + } + + hdrProvider = &mockHeaderProvider{ + obj: patchedObj, + } + + patcher = New(hdrProvider, rangeProvider, wr) + applyRes = patcher.ApplyPatch(context.Background(), revertPatch) + require.True(t, applyRes) + _, err = patcher.Close(context.Background()) + require.NoError(t, err) + require.Equal(t, originalObjectPayload, patchedPatchedObj.Payload()) +} + +func TestDifferentPatchTargetAddresses(t *testing.T) { + obj, addr := newTestObject() + + _, anyOtherAddr := newTestObject() + + modifPatch := &objectSDK.Patch{ + Address: addr, + + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + + Chunk: []byte("inserted"), + }, + } + + invalidPatch := &objectSDK.Patch{ + Address: anyOtherAddr, + + PayloadPatch: &objectSDK.PayloadPatch{ + Range: rangeWithOffestWithLength(0, 0), + + Chunk: []byte("inserted"), + }, + } + + originalObjectPayload := []byte("*******************") + + obj.SetPayload(originalObjectPayload) + obj.SetPayloadSize(uint64(len(originalObjectPayload))) + + rangeProvider := &mockRangeProvider{ + originalObjectPayload: originalObjectPayload, + } + + hdrProvider := &mockHeaderProvider{ + obj: obj, + } + + patchedObj, _ := newTestObject() + + wr := &mockPatchedObjectWriter{ + obj: patchedObj, + } + patcher := New(hdrProvider, rangeProvider, wr) + + applyRes := patcher.ApplyPatch(context.Background(), modifPatch) + require.True(t, applyRes) + + applyRes = patcher.ApplyPatch(context.Background(), invalidPatch) + require.False(t, applyRes) + + _, err := patcher.Close(context.Background()) + require.ErrorIs(t, err, ErrInvalidPatchObjectAddress) +} + +func rangeWithOffestWithLength(offset, length uint64) *objectSDK.Range { + rng := new(objectSDK.Range) + rng.SetOffset(offset) + rng.SetLength(length) + return rng +} + +type attr struct { + key string + val string +} + +type attrs []attr + +func (a attrs) ToSDKAttributes() []objectSDK.Attribute { + res := make([]objectSDK.Attribute, len(a)) + for i := range a { + res[i].SetKey(a[i].key) + res[i].SetValue(a[i].val) + } + return res +} + +func TestPatch(t *testing.T) { + + for _, test := range []struct { + name string + newAttrs attrs + replaceAttrs bool + patchPayloads []*objectSDK.PayloadPatch + originalObjectPayload []byte + patched []byte + expectedErr error + }{ + { + name: "invalid offset", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(100, 0), + Chunk: []byte(""), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + expectedErr: ErrOffsetExceedsSize, + }, + { + name: "empty payload patch in the second patch", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte(""), + }, + nil, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + expectedErr: ErrEmptyPayloadPatch, + }, + { + name: "invalid following patch offset", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(10, 0), + Chunk: []byte(""), + }, + { + Range: rangeWithOffestWithLength(7, 0), + Chunk: []byte(""), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + expectedErr: ErrInvalidPatchOffsetOrder, + }, + { + name: "only header patch", + newAttrs: attrs{ + attr{ + key: "key1", + val: "val1", + }, + attr{ + key: "key2", + val: "val2", + }, + }, + patchPayloads: []*objectSDK.PayloadPatch{ + nil, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "header and payload", + newAttrs: attrs{ + attr{ + key: "key1", + val: "val1", + }, + attr{ + key: "key2", + val: "val2", + }, + }, + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte("inserted at the beginning"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("inserted at the beginning0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "header, then payload", + newAttrs: attrs{ + attr{ + key: "key1", + val: "val1", + }, + attr{ + key: "key2", + val: "val2", + }, + }, + patchPayloads: []*objectSDK.PayloadPatch{ + nil, + { + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte("inserted at the beginning"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("inserted at the beginning0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "no effect", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte(""), + }, + { + Range: rangeWithOffestWithLength(12, 0), + Chunk: []byte(""), + }, + { + Range: rangeWithOffestWithLength(20, 0), + Chunk: []byte(""), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "insert prefix", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte("inserted at the beginning"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("inserted at the beginning0123456789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "insert in the middle", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(5, 0), + Chunk: []byte("inserted somewhere in the middle"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("01234inserted somewhere in the middle56789qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "insert at the end", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(36, 0), + Chunk: []byte("inserted somewhere at the end"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("0123456789qwertyuiopasdfghjklzxcvbnminserted somewhere at the end"), + }, + { + name: "replace by range", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(0, 12), + Chunk: []byte("just replace"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("just replaceertyuiopasdfghjklzxcvbnm"), + }, + { + name: "replace and insert some bytes", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(0, 11), + Chunk: []byte("replace and append in the middle"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("replace and append in the middlewertyuiopasdfghjklzxcvbnm"), + }, + { + name: "replace and insert some bytes in the middle", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(5, 3), + Chunk: []byte("@@@@@"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("01234@@@@@89qwertyuiopasdfghjklzxcvbnm"), + }, + { + name: "a few patches: prefix, suffix", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(0, 0), + Chunk: []byte("this_will_be_prefix"), + }, + { + Range: rangeWithOffestWithLength(36, 0), + Chunk: []byte("this_will_be_suffix"), + }, + }, + originalObjectPayload: []byte("0123456789qwertyuiopasdfghjklzxcvbnm"), + patched: []byte("this_will_be_prefix0123456789qwertyuiopasdfghjklzxcvbnmthis_will_be_suffix"), + }, + { + name: "a few patches: replace and insert some bytes", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(10, 3), + Chunk: []byte("aaaaa"), + }, + { + Range: rangeWithOffestWithLength(16, 0), + Chunk: []byte("bbbbb"), + }, + }, + originalObjectPayload: []byte("0123456789ABCDEF"), + patched: []byte("0123456789aaaaaDEFbbbbb"), + }, + { + name: "a few patches: various modifiactions", + patchPayloads: []*objectSDK.PayloadPatch{ + { + Range: rangeWithOffestWithLength(4, 8), + Chunk: []byte("earliest"), + }, + { + Range: rangeWithOffestWithLength(13, 0), + Chunk: []byte("known "), + }, + { + Range: rangeWithOffestWithLength(35, 8), + Chunk: []byte("a small town"), + }, + { + Range: rangeWithOffestWithLength(62, 6), + Chunk: []byte("tablet"), + }, + { + Range: rangeWithOffestWithLength(87, 0), + Chunk: []byte("Shar-Kali-Sharri"), + }, + }, + originalObjectPayload: []byte("The ******** mention of Babylon as [insert] appears on a clay ****** from the reign of "), + patched: []byte("The earliest known mention of Babylon as a small town appears on a clay tablet from the reign of Shar-Kali-Sharri"), + }, + } { + t.Run(test.name, func(t *testing.T) { + rangeProvider := &mockRangeProvider{ + originalObjectPayload: test.originalObjectPayload, + } + + originalObject, originalAddr := newTestObject() + originalObject.SetPayload(test.originalObjectPayload) + originalObject.SetPayloadSize(uint64(len(test.originalObjectPayload))) + + patchedObject, _ := newTestObject() + + wr := &mockPatchedObjectWriter{ + obj: patchedObject, + } + + hdrProvider := &mockHeaderProvider{ + obj: originalObject, + } + + patcher := New(hdrProvider, rangeProvider, wr) + + for i, pp := range test.patchPayloads { + patch := &objectSDK.Patch{ + Address: originalAddr, + PayloadPatch: pp, + } + + if i == 0 { + patch.NewAttributes = test.newAttrs.ToSDKAttributes() + patch.ReplaceAttributes = test.replaceAttrs + } + + if !patcher.ApplyPatch(context.Background(), patch) { + break + } + } + + _, err := patcher.Close(context.Background()) + if err != nil { + require.ErrorIs(t, err, test.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, test.patched, patchedObject.Payload()) + require.Equal(t, test.newAttrs.ToSDKAttributes(), patchedObject.Attributes()) + } + }) + } + +}