* Introduce `patcher` package that contains such interfaces to be implemented: - `PatchApplier` - the main patching engine that merges the stream of patches and the stream of original object payload divided by ranges. The merged streams result is output to `ChunkedObjectWriter`; - `RangeProvider` - provides the original object payload by ranges; - `HeaderProvider` - provides the original object header. * Introduce `patcher` that implements `PatchApplier`; * Cover all possible cases with unit-tests. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
249 lines
6.6 KiB
Go
249 lines
6.6 KiB
Go
package patcher
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
|
)
|
|
|
|
var (
|
|
ErrOffsetExceedsSize = errors.New("patch offset exceeds object size")
|
|
ErrEmptyPayloadPatch = errors.New("patch must contain payload")
|
|
ErrInvalidPatchObjectAddress = errors.New("invalid patch object address")
|
|
ErrInvalidPatchOffsetOrder = errors.New("invalid patch offset order")
|
|
)
|
|
|
|
// PatchRes is the result of patch application.
|
|
type PatchRes struct {
|
|
AccessIdentifiers *transformer.AccessIdentifiers
|
|
}
|
|
|
|
// PatchApplier is the interface that provides method to apply header and payload patches.
|
|
type PatchApplier interface {
|
|
// ApplyPatch applies the patch for an object. All patches must contain the same target object address.
|
|
// Patch offsets must be passed with non-descending order.
|
|
// Patching the payload is performed within all ApplyPatch ivocations.
|
|
//
|
|
// Unsuccessful call returns false. Error is checked with Close.
|
|
ApplyPatch(ctx context.Context, patch *objectSDK.Patch) bool
|
|
|
|
// Close closes PatchApplier when the patch stream is over.
|
|
Close(context.Context) (PatchRes, error)
|
|
}
|
|
|
|
// HeaderProvider is the interface that provides a method to get an original's object header.
|
|
type HeaderProvider interface {
|
|
// GetObjectHeader gets an original's object header.
|
|
GetObjectHeader(ctx context.Context, addr oid.Address) (*objectSDK.Object, error)
|
|
}
|
|
|
|
// RangeProvider is the interface that provides a method to get original object payload
|
|
// by a given range.
|
|
type RangeProvider interface {
|
|
// ReadRange reads an original object payload by the given range.
|
|
// The method returns io.Reader over the data range only. This means if the data is read out,
|
|
// then ReadRange has to be invoked to provide reader over the next range.
|
|
ReadRange(ctx context.Context, addr oid.Address, rng *objectSDK.Range) io.Reader
|
|
}
|
|
|
|
type patcher struct {
|
|
rangeProvider RangeProvider
|
|
|
|
objectWriter transformer.ChunkedObjectWriter
|
|
|
|
hdrProvider HeaderProvider
|
|
|
|
currOffset uint64
|
|
|
|
originalPayloadSize uint64
|
|
|
|
firstApplyPatchCall bool
|
|
|
|
addr oid.Address
|
|
|
|
closeErr error
|
|
}
|
|
|
|
func New(hdrProvider HeaderProvider, objectRangeSplitter RangeProvider, objectWriter transformer.ChunkedObjectWriter) PatchApplier {
|
|
return &patcher{
|
|
rangeProvider: objectRangeSplitter,
|
|
|
|
objectWriter: objectWriter,
|
|
|
|
hdrProvider: hdrProvider,
|
|
|
|
firstApplyPatchCall: true,
|
|
}
|
|
}
|
|
|
|
func (p *patcher) ApplyPatch(ctx context.Context, currPatch *objectSDK.Patch) bool {
|
|
if currPatch == nil {
|
|
return true
|
|
}
|
|
|
|
if p.firstApplyPatchCall {
|
|
p.firstApplyPatchCall = false
|
|
|
|
p.addr = currPatch.Address
|
|
|
|
hdr, err := p.hdrProvider.GetObjectHeader(ctx, currPatch.Address)
|
|
if err != nil {
|
|
p.closeErr = fmt.Errorf("get header error: %w", err)
|
|
return false
|
|
}
|
|
|
|
p.originalPayloadSize = hdr.PayloadSize()
|
|
|
|
if !currPatch.ReplaceAttributes {
|
|
mergedAttrs := mergeAttributes(currPatch.NewAttributes, hdr.Attributes())
|
|
hdr.SetAttributes(mergedAttrs...)
|
|
} else {
|
|
hdr.SetAttributes(currPatch.NewAttributes...)
|
|
}
|
|
|
|
if err = p.objectWriter.WriteHeader(ctx, hdr); err != nil {
|
|
p.closeErr = fmt.Errorf("write header error: %w", err)
|
|
return false
|
|
}
|
|
|
|
// only header patch
|
|
if currPatch.PayloadPatch == nil {
|
|
return true
|
|
}
|
|
} else {
|
|
// All patches can be applied only for the same object.
|
|
if !p.addr.Equals(currPatch.Address) {
|
|
p.closeErr = fmt.Errorf("%w: expected = %s, got = %s",
|
|
ErrInvalidPatchObjectAddress,
|
|
p.addr.EncodeToString(),
|
|
currPatch.Address.EncodeToString())
|
|
return false
|
|
}
|
|
|
|
if currPatch.PayloadPatch == nil {
|
|
p.closeErr = ErrEmptyPayloadPatch
|
|
return false
|
|
}
|
|
}
|
|
|
|
if currPatch.PayloadPatch.Range.GetOffset() < p.currOffset {
|
|
p.closeErr = fmt.Errorf("%w: current = %d, previous = %d", ErrInvalidPatchOffsetOrder, currPatch.PayloadPatch.Range.GetOffset(), p.currOffset)
|
|
return false
|
|
}
|
|
|
|
if currPatch.PayloadPatch.Range.GetOffset() > p.originalPayloadSize {
|
|
p.closeErr = fmt.Errorf("%w: offset = %d, object size = %d", ErrOffsetExceedsSize, currPatch.PayloadPatch.Range.GetOffset(), p.originalPayloadSize)
|
|
return false
|
|
}
|
|
|
|
var err error
|
|
if p.currOffset, err = p.applyPatch(ctx, currPatch, p.currOffset); err != nil {
|
|
p.closeErr = fmt.Errorf("apply patch error: %w", err)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (p *patcher) Close(ctx context.Context) (PatchRes, error) {
|
|
if p.closeErr != nil {
|
|
return PatchRes{}, p.closeErr
|
|
}
|
|
|
|
rng := new(objectSDK.Range)
|
|
rng.SetOffset(p.currOffset)
|
|
rng.SetLength(p.originalPayloadSize - p.currOffset)
|
|
|
|
rdr := p.rangeProvider.ReadRange(ctx, p.addr, rng)
|
|
for {
|
|
remain := make([]byte, 1024)
|
|
n, err := rdr.Read(remain)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return PatchRes{}, fmt.Errorf("read error: %w", err)
|
|
}
|
|
_, err = p.objectWriter.Write(ctx, remain[:n])
|
|
if err != nil {
|
|
return PatchRes{}, fmt.Errorf("write error: %w", err)
|
|
}
|
|
}
|
|
|
|
aid, err := p.objectWriter.Close(ctx)
|
|
if err != nil {
|
|
return PatchRes{}, fmt.Errorf("close object writer error: %w", err)
|
|
}
|
|
|
|
return PatchRes{
|
|
AccessIdentifiers: aid,
|
|
}, nil
|
|
}
|
|
|
|
func (p *patcher) applyPatch(ctx context.Context, currPatch *objectSDK.Patch, offset uint64) (newOffset uint64, err error) {
|
|
// write the original payload chunk before the start of the patch
|
|
if currPatch.PayloadPatch.Range.GetOffset() > offset {
|
|
rng := new(objectSDK.Range)
|
|
rng.SetOffset(offset)
|
|
rng.SetLength(currPatch.PayloadPatch.Range.GetOffset() - offset)
|
|
|
|
rdr := p.rangeProvider.ReadRange(ctx, p.addr, rng)
|
|
for {
|
|
orig := make([]byte, 1024)
|
|
var n int
|
|
n, err = rdr.Read(orig)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
err = fmt.Errorf("read error: %w", err)
|
|
return
|
|
}
|
|
_, err = p.objectWriter.Write(ctx, orig[:n])
|
|
if err != nil {
|
|
err = fmt.Errorf("write error: %w", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
newOffset = currPatch.PayloadPatch.Range.GetOffset()
|
|
}
|
|
|
|
// apply patch
|
|
if _, err = p.objectWriter.Write(ctx, currPatch.PayloadPatch.Chunk); err != nil {
|
|
return
|
|
}
|
|
|
|
if currPatch.PayloadPatch.Range.GetLength() > 0 {
|
|
newOffset += currPatch.PayloadPatch.Range.GetLength()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func mergeAttributes(newAttrs, oldAttrs []objectSDK.Attribute) []objectSDK.Attribute {
|
|
attrMap := make(map[string]string)
|
|
|
|
for _, attr := range oldAttrs {
|
|
attrMap[attr.Key()] = attr.Value()
|
|
}
|
|
|
|
for _, newAttr := range newAttrs {
|
|
attrMap[newAttr.Key()] = newAttr.Value()
|
|
}
|
|
|
|
var mergedAttrs []objectSDK.Attribute
|
|
for key, value := range attrMap {
|
|
var attr objectSDK.Attribute
|
|
attr.SetKey(key)
|
|
attr.SetValue(value)
|
|
mergedAttrs = append(mergedAttrs, attr)
|
|
}
|
|
|
|
return mergedAttrs
|
|
}
|