[#349] object: Make patcher apply patching for split header
* Change `PatchApplier` interface: `ApplyAttributesPatch` -> `ApplyHeaderPatch`. Make `ApplyHeaderPatch` receive `ApplyHeaderPatchPrm` as parameter; * Fix `patcher`: apply patch for split header; * Fix `patcher` unit-tests. Add test-case for split header; * Extend `Patch` struct with `NewSplitHeader`; * Change `ObjectPatcher` interface for client: `PatchAttributes` -> `PatchHeader`. Fix `objectPatcher`. * Fix object transformer: since object header sets `SplitHeader` if it's passed. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
76265fe9be
commit
4d36a49d39
6 changed files with 259 additions and 20 deletions
|
@ -26,11 +26,19 @@ import (
|
|||
// usage is unsafe.
|
||||
type ObjectPatcher interface {
|
||||
// PatchAttributes patches attributes. Attributes can be patched no more than once,
|
||||
// otherwise, the server returns an error.
|
||||
// otherwise, the server returns an error. `PatchAttributes` and `PatchHeader` are mutually
|
||||
// exclusive - only one method can be used.
|
||||
//
|
||||
// Result means success. Failure reason can be received via Close.
|
||||
PatchAttributes(ctx context.Context, newAttrs []object.Attribute, replace bool) bool
|
||||
|
||||
// PatchHeader patches object's header. Header can be patched no more than once,
|
||||
// otherwise, the server returns an error. `PatchAttributes` and `PatchHeader` are mutually
|
||||
// exclusive - only one method can be used.
|
||||
//
|
||||
// Result means success. Failure reason can be received via Close.
|
||||
PatchHeader(ctx context.Context, prm PatchHeaderPrm) bool
|
||||
|
||||
// PatchPayload patches the object's payload.
|
||||
//
|
||||
// PatchPayload receives `payloadReader` and thus the payload of the patch is read and sent by chunks of
|
||||
|
@ -60,6 +68,14 @@ type ObjectPatcher interface {
|
|||
Close(_ context.Context) (*ResObjectPatch, error)
|
||||
}
|
||||
|
||||
type PatchHeaderPrm struct {
|
||||
NewSplitHeader *object.SplitHeader
|
||||
|
||||
NewAttributes []object.Attribute
|
||||
|
||||
ReplaceAttributes bool
|
||||
}
|
||||
|
||||
// ResObjectPatch groups resulting values of ObjectPatch operation.
|
||||
type ResObjectPatch struct {
|
||||
statusRes
|
||||
|
@ -163,6 +179,15 @@ func (x *objectPatcher) PatchAttributes(_ context.Context, newAttrs []object.Att
|
|||
})
|
||||
}
|
||||
|
||||
func (x *objectPatcher) PatchHeader(_ context.Context, prm PatchHeaderPrm) bool {
|
||||
return x.patch(&object.Patch{
|
||||
Address: x.addr,
|
||||
NewAttributes: prm.NewAttributes,
|
||||
ReplaceAttributes: prm.ReplaceAttributes,
|
||||
NewSplitHeader: prm.NewSplitHeader,
|
||||
})
|
||||
}
|
||||
|
||||
func (x *objectPatcher) PatchPayload(_ context.Context, rng *object.Range, payloadReader io.Reader) bool {
|
||||
offset := rng.GetOffset()
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ func TestObjectPatcher(t *testing.T) {
|
|||
maxChunkLen: test.maxChunkLen,
|
||||
}
|
||||
|
||||
success := patcher.PatchAttributes(context.Background(), nil, false)
|
||||
success := patcher.PatchHeader(context.Background(), PatchHeaderPrm{})
|
||||
require.True(t, success)
|
||||
|
||||
success = patcher.PatchPayload(context.Background(), test.rng, bytes.NewReader([]byte(test.patchPayload)))
|
||||
|
|
|
@ -18,6 +18,10 @@ type Patch struct {
|
|||
// filled with NewAttributes. Otherwise, the attributes are just merged.
|
||||
ReplaceAttributes bool
|
||||
|
||||
// A new split header which is set to object's header. If `nil`, then split header patching
|
||||
// is ignored.
|
||||
NewSplitHeader *SplitHeader
|
||||
|
||||
// Payload patch. If this field is not set, then it assumed such Patch patches only
|
||||
// header (see NewAttributes, ReplaceAttributes).
|
||||
PayloadPatch *PayloadPatch
|
||||
|
@ -41,6 +45,8 @@ func (p *Patch) ToV2() *v2object.PatchRequestBody {
|
|||
v2.SetNewAttributes(attrs)
|
||||
v2.SetReplaceAttributes(p.ReplaceAttributes)
|
||||
|
||||
v2.SetNewSplitHeader(p.NewSplitHeader.ToV2())
|
||||
|
||||
v2.SetPatch(p.PayloadPatch.ToV2())
|
||||
|
||||
return v2
|
||||
|
@ -63,6 +69,8 @@ func (p *Patch) FromV2(patch *v2object.PatchRequestBody) {
|
|||
|
||||
p.ReplaceAttributes = patch.GetReplaceAttributes()
|
||||
|
||||
p.NewSplitHeader = NewSplitHeaderFromV2(patch.GetNewSplitHeader())
|
||||
|
||||
if v2patch := patch.GetPatch(); v2patch != nil {
|
||||
p.PayloadPatch = new(PayloadPatch)
|
||||
p.PayloadPatch.FromV2(v2patch)
|
||||
|
|
|
@ -11,10 +11,12 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
ErrOffsetExceedsSize = errors.New("patch offset exceeds object size")
|
||||
ErrInvalidPatchOffsetOrder = errors.New("invalid patch offset order")
|
||||
ErrPayloadPatchIsNil = errors.New("nil payload patch")
|
||||
ErrAttrPatchAlreadyApplied = errors.New("attribute patch already applied")
|
||||
ErrOffsetExceedsSize = errors.New("patch offset exceeds object size")
|
||||
ErrInvalidPatchOffsetOrder = errors.New("invalid patch offset order")
|
||||
ErrPayloadPatchIsNil = errors.New("nil payload patch")
|
||||
ErrAttrPatchAlreadyApplied = errors.New("attribute patch already applied")
|
||||
ErrHeaderPatchAlreadyApplied = errors.New("header patch already applied")
|
||||
ErrSplitHeaderPatchAppliedWithPayloadPatch = errors.New("split header patch applied with payload patch")
|
||||
)
|
||||
|
||||
// PatchRes is the result of patch application.
|
||||
|
@ -27,13 +29,24 @@ type PatchApplier interface {
|
|||
// ApplyAttributesPatch applies the patch only for the object's attributes.
|
||||
//
|
||||
// ApplyAttributesPatch can't be invoked few times, otherwise it returns `ErrAttrPatchAlreadyApplied` error.
|
||||
// `ApplyHeaderPatch` and `ApplyAttributesPatch` are mutually exclusive - only one method can be used.
|
||||
//
|
||||
// The call is idempotent for the original header if it's invoked with empty `newAttrs` and
|
||||
// `replaceAttrs = false`.
|
||||
ApplyAttributesPatch(ctx context.Context, newAttrs []objectSDK.Attribute, replaceAttrs bool) error
|
||||
|
||||
// ApplyHeaderPatch applies the patch only for the object's attributes.
|
||||
//
|
||||
// ApplyHeaderPatch can't be invoked few times, otherwise it returns `ErrHeaderPatchAlreadyApplied` error.
|
||||
// `ApplyHeaderPatch` and `ApplyAttributesPatch` are mutually exclusive - only one method can be used.
|
||||
//
|
||||
// The call is idempotent for the original header if it's invoked with `ApplyHeaderPatchPrm` with not set fields.
|
||||
ApplyHeaderPatch(ctx context.Context, prm ApplyHeaderPatchPrm) error
|
||||
|
||||
// ApplyPayloadPatch applies the patch for the object's payload.
|
||||
//
|
||||
// ApplyPayloadPatch returns `ErrSplitHeaderPatchAppliedWithPayloadPatch` when attempting to apply it with a split header patch.
|
||||
//
|
||||
// ApplyPayloadPatch returns `ErrPayloadPatchIsNil` error if patch is nil.
|
||||
ApplyPayloadPatch(ctx context.Context, payloadPatch *objectSDK.PayloadPatch) error
|
||||
|
||||
|
@ -41,6 +54,14 @@ type PatchApplier interface {
|
|||
Close(context.Context) (PatchRes, error)
|
||||
}
|
||||
|
||||
type ApplyHeaderPatchPrm struct {
|
||||
NewSplitHeader *objectSDK.SplitHeader
|
||||
|
||||
NewAttributes []objectSDK.Attribute
|
||||
|
||||
ReplaceAttributes bool
|
||||
}
|
||||
|
||||
// RangeProvider is the interface that provides a method to get original object payload
|
||||
// by a given range.
|
||||
type RangeProvider interface {
|
||||
|
@ -61,7 +82,9 @@ type patcher struct {
|
|||
|
||||
hdr *objectSDK.Object
|
||||
|
||||
attrPatchAlreadyApplied bool
|
||||
hdrPatchAlreadyApplied bool
|
||||
|
||||
splitHeaderPatchAlreadyApplied bool
|
||||
|
||||
readerBuffSize int
|
||||
}
|
||||
|
@ -107,10 +130,10 @@ func New(prm Params) PatchApplier {
|
|||
|
||||
func (p *patcher) ApplyAttributesPatch(ctx context.Context, newAttrs []objectSDK.Attribute, replaceAttrs bool) error {
|
||||
defer func() {
|
||||
p.attrPatchAlreadyApplied = true
|
||||
p.hdrPatchAlreadyApplied = true
|
||||
}()
|
||||
|
||||
if p.attrPatchAlreadyApplied {
|
||||
if p.hdrPatchAlreadyApplied {
|
||||
return ErrAttrPatchAlreadyApplied
|
||||
}
|
||||
|
||||
|
@ -127,7 +150,38 @@ func (p *patcher) ApplyAttributesPatch(ctx context.Context, newAttrs []objectSDK
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *patcher) ApplyHeaderPatch(ctx context.Context, prm ApplyHeaderPatchPrm) error {
|
||||
defer func() {
|
||||
p.hdrPatchAlreadyApplied = true
|
||||
}()
|
||||
|
||||
if p.hdrPatchAlreadyApplied {
|
||||
return ErrHeaderPatchAlreadyApplied
|
||||
}
|
||||
|
||||
if prm.NewSplitHeader != nil {
|
||||
p.hdr.SetSplitHeader(prm.NewSplitHeader)
|
||||
|
||||
p.splitHeaderPatchAlreadyApplied = true
|
||||
}
|
||||
|
||||
if prm.ReplaceAttributes {
|
||||
p.hdr.SetAttributes(prm.NewAttributes...)
|
||||
} else if len(prm.NewAttributes) > 0 {
|
||||
mergedAttrs := mergeAttributes(prm.NewAttributes, p.hdr.Attributes())
|
||||
p.hdr.SetAttributes(mergedAttrs...)
|
||||
}
|
||||
|
||||
if err := p.objectWriter.WriteHeader(ctx, p.hdr); err != nil {
|
||||
return fmt.Errorf("writer header: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *patcher) ApplyPayloadPatch(ctx context.Context, payloadPatch *objectSDK.PayloadPatch) error {
|
||||
if p.splitHeaderPatchAlreadyApplied {
|
||||
return ErrSplitHeaderPatchAppliedWithPayloadPatch
|
||||
}
|
||||
if payloadPatch == nil {
|
||||
return ErrPayloadPatchIsNil
|
||||
}
|
||||
|
|
|
@ -106,7 +106,11 @@ func TestPatchRevert(t *testing.T) {
|
|||
|
||||
patcher := New(prm)
|
||||
|
||||
err := patcher.ApplyAttributesPatch(context.Background(), modifPatch.NewAttributes, modifPatch.ReplaceAttributes)
|
||||
err := patcher.ApplyHeaderPatch(context.Background(), ApplyHeaderPatchPrm{
|
||||
NewSplitHeader: modifPatch.NewSplitHeader,
|
||||
NewAttributes: modifPatch.NewAttributes,
|
||||
ReplaceAttributes: modifPatch.ReplaceAttributes,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = patcher.ApplyPayloadPatch(context.Background(), modifPatch.PayloadPatch)
|
||||
|
@ -145,7 +149,11 @@ func TestPatchRevert(t *testing.T) {
|
|||
|
||||
patcher = New(prm)
|
||||
|
||||
err = patcher.ApplyAttributesPatch(context.Background(), revertPatch.NewAttributes, revertPatch.ReplaceAttributes)
|
||||
err = patcher.ApplyHeaderPatch(context.Background(), ApplyHeaderPatchPrm{
|
||||
NewSplitHeader: revertPatch.NewSplitHeader,
|
||||
NewAttributes: revertPatch.NewAttributes,
|
||||
ReplaceAttributes: revertPatch.ReplaceAttributes,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = patcher.ApplyPayloadPatch(context.Background(), revertPatch.PayloadPatch)
|
||||
|
@ -157,7 +165,7 @@ func TestPatchRevert(t *testing.T) {
|
|||
require.Equal(t, originalObjectPayload, patchedPatchedObj.Payload())
|
||||
}
|
||||
|
||||
func TestPatchRepeatAttributePatch(t *testing.T) {
|
||||
func TestPatchRepeatHeaderPatch(t *testing.T) {
|
||||
obj, _ := newTestObject()
|
||||
|
||||
modifPatch := &objectSDK.Patch{}
|
||||
|
@ -187,11 +195,142 @@ func TestPatchRepeatAttributePatch(t *testing.T) {
|
|||
|
||||
patcher := New(prm)
|
||||
|
||||
err := patcher.ApplyAttributesPatch(context.Background(), modifPatch.NewAttributes, modifPatch.ReplaceAttributes)
|
||||
err := patcher.ApplyHeaderPatch(context.Background(), ApplyHeaderPatchPrm{
|
||||
NewSplitHeader: modifPatch.NewSplitHeader,
|
||||
NewAttributes: modifPatch.NewAttributes,
|
||||
ReplaceAttributes: modifPatch.ReplaceAttributes,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = patcher.ApplyAttributesPatch(context.Background(), modifPatch.NewAttributes, modifPatch.ReplaceAttributes)
|
||||
require.ErrorIs(t, err, ErrAttrPatchAlreadyApplied)
|
||||
err = patcher.ApplyHeaderPatch(context.Background(), ApplyHeaderPatchPrm{
|
||||
NewSplitHeader: modifPatch.NewSplitHeader,
|
||||
NewAttributes: modifPatch.NewAttributes,
|
||||
ReplaceAttributes: modifPatch.ReplaceAttributes,
|
||||
})
|
||||
require.ErrorIs(t, err, ErrHeaderPatchAlreadyApplied)
|
||||
}
|
||||
|
||||
func TestPatchSplitHeader(t *testing.T) {
|
||||
obj, _ := newTestObject()
|
||||
|
||||
const (
|
||||
splitIDStr = "a59c9f87-14bc-4a61-95d1-7eb10f036163"
|
||||
parentStr = "9cRjAaPqUt5zaDAjBkSCqFfPdkE8dHJ7mtRupRjPWp6E"
|
||||
previosStr = "6WaTd9HobT4Z52NnKWHAtjqtQu2Ww5xZwNdT4ptshkKE"
|
||||
)
|
||||
|
||||
splitID := objectSDK.NewSplitID()
|
||||
require.NoError(t, splitID.Parse(splitIDStr))
|
||||
|
||||
var par, prev oid.ID
|
||||
require.NoError(t, par.DecodeString(parentStr))
|
||||
require.NoError(t, prev.DecodeString(previosStr))
|
||||
|
||||
splitHdr := objectSDK.NewSplitHeader()
|
||||
splitHdr.SetSplitID(splitID)
|
||||
splitHdr.SetParentID(par)
|
||||
splitHdr.SetPreviousID(prev)
|
||||
|
||||
originalObjectPayload := []byte("*******************")
|
||||
|
||||
obj.SetPayload(originalObjectPayload)
|
||||
obj.SetPayloadSize(uint64(len(originalObjectPayload)))
|
||||
|
||||
rangeProvider := &mockRangeProvider{
|
||||
originalObjectPayload: originalObjectPayload,
|
||||
}
|
||||
|
||||
t.Run("no payload patch", func(t *testing.T) {
|
||||
patchedObj, _ := newTestObject()
|
||||
|
||||
wr := &mockPatchedObjectWriter{
|
||||
obj: patchedObj,
|
||||
}
|
||||
|
||||
modifPatch := &objectSDK.Patch{
|
||||
NewSplitHeader: splitHdr,
|
||||
}
|
||||
|
||||
prm := Params{
|
||||
Header: obj.CutPayload(),
|
||||
|
||||
RangeProvider: rangeProvider,
|
||||
|
||||
ObjectWriter: wr,
|
||||
}
|
||||
|
||||
patcher := New(prm)
|
||||
|
||||
err := patcher.ApplyHeaderPatch(context.Background(), ApplyHeaderPatchPrm{
|
||||
NewSplitHeader: modifPatch.NewSplitHeader,
|
||||
NewAttributes: modifPatch.NewAttributes,
|
||||
ReplaceAttributes: modifPatch.ReplaceAttributes,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
splitHdrFromPatchedObj := patchedObj.SplitHeader()
|
||||
require.NotNil(t, splitHdrFromPatchedObj)
|
||||
|
||||
patchObjParID, isSet := splitHdrFromPatchedObj.ParentID()
|
||||
require.True(t, isSet)
|
||||
require.True(t, patchObjParID.Equals(par))
|
||||
|
||||
patchObjPrevID, isSet := splitHdrFromPatchedObj.PreviousID()
|
||||
require.True(t, isSet)
|
||||
require.True(t, patchObjPrevID.Equals(prev))
|
||||
|
||||
require.Equal(t, splitHdrFromPatchedObj.SplitID().String(), splitID.String())
|
||||
})
|
||||
|
||||
t.Run("with payload patch", func(t *testing.T) {
|
||||
patchedObj, _ := newTestObject()
|
||||
|
||||
wr := &mockPatchedObjectWriter{
|
||||
obj: patchedObj,
|
||||
}
|
||||
|
||||
modifPatch := &objectSDK.Patch{
|
||||
NewSplitHeader: splitHdr,
|
||||
PayloadPatch: &objectSDK.PayloadPatch{
|
||||
Range: rangeWithOffestWithLength(10, 0),
|
||||
Chunk: []byte(""),
|
||||
},
|
||||
}
|
||||
|
||||
prm := Params{
|
||||
Header: obj.CutPayload(),
|
||||
|
||||
RangeProvider: rangeProvider,
|
||||
|
||||
ObjectWriter: wr,
|
||||
}
|
||||
|
||||
patcher := New(prm)
|
||||
|
||||
err := patcher.ApplyHeaderPatch(context.Background(), ApplyHeaderPatchPrm{
|
||||
NewSplitHeader: modifPatch.NewSplitHeader,
|
||||
NewAttributes: modifPatch.NewAttributes,
|
||||
ReplaceAttributes: modifPatch.ReplaceAttributes,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
splitHdrFromPatchedObj := patchedObj.SplitHeader()
|
||||
require.NotNil(t, splitHdrFromPatchedObj)
|
||||
|
||||
patchObjParID, isSet := splitHdrFromPatchedObj.ParentID()
|
||||
require.True(t, isSet)
|
||||
require.True(t, patchObjParID.Equals(par))
|
||||
|
||||
patchObjPrevID, isSet := splitHdrFromPatchedObj.PreviousID()
|
||||
require.True(t, isSet)
|
||||
require.True(t, patchObjPrevID.Equals(prev))
|
||||
|
||||
require.Equal(t, splitHdrFromPatchedObj.SplitID().String(), splitID.String())
|
||||
|
||||
err = patcher.ApplyPayloadPatch(context.Background(), modifPatch.PayloadPatch)
|
||||
require.Error(t, err, ErrSplitHeaderPatchAppliedWithPayloadPatch)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestPatchEmptyPayloadPatch(t *testing.T) {
|
||||
|
@ -224,7 +363,11 @@ func TestPatchEmptyPayloadPatch(t *testing.T) {
|
|||
|
||||
patcher := New(prm)
|
||||
|
||||
err := patcher.ApplyAttributesPatch(context.Background(), modifPatch.NewAttributes, modifPatch.ReplaceAttributes)
|
||||
err := patcher.ApplyHeaderPatch(context.Background(), ApplyHeaderPatchPrm{
|
||||
NewSplitHeader: modifPatch.NewSplitHeader,
|
||||
NewAttributes: modifPatch.NewAttributes,
|
||||
ReplaceAttributes: modifPatch.ReplaceAttributes,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = patcher.ApplyPayloadPatch(context.Background(), nil)
|
||||
|
@ -599,7 +742,11 @@ func TestPatch(t *testing.T) {
|
|||
|
||||
for i, patch := range test.patches {
|
||||
if i == 0 {
|
||||
_ = patcher.ApplyAttributesPatch(context.Background(), patch.NewAttributes, patch.ReplaceAttributes)
|
||||
_ = patcher.ApplyHeaderPatch(context.Background(), ApplyHeaderPatchPrm{
|
||||
NewSplitHeader: patch.NewSplitHeader,
|
||||
NewAttributes: patch.NewAttributes,
|
||||
ReplaceAttributes: patch.ReplaceAttributes,
|
||||
})
|
||||
}
|
||||
|
||||
if patch.PayloadPatch == nil {
|
||||
|
|
|
@ -116,9 +116,14 @@ func fromObject(obj *object.Object) *object.Object {
|
|||
res.SetAttributes(obj.Attributes()...)
|
||||
res.SetType(obj.Type())
|
||||
|
||||
// obj.SetSplitID creates splitHeader but we don't need to do it in case
|
||||
// of small objects, so we should make nil check.
|
||||
if obj.SplitID() != nil {
|
||||
// There are two ways to specify split information:
|
||||
// 1. Using explicit SplitHeader. Thus, we only propagate whole split information
|
||||
// if it's already set in the source object (use-case: Patch method).
|
||||
// 2. Using SplitID - will automatically generate a SplitHeader, but this is not requiered for
|
||||
// small objects.
|
||||
if obj.SplitHeader() != nil {
|
||||
res.SetSplitHeader(obj.SplitHeader())
|
||||
} else if obj.SplitID() != nil {
|
||||
res.SetSplitID(obj.SplitID())
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue