client: Introduce ObjectPatch
method #249
259
client/object_patch.go
Normal file
|
@ -0,0 +1,259 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
|
||||||
|
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ObjectPatcher is designed to patch an object.
|
||||||
|
//
|
||||||
|
// Must be initialized using Client.ObjectPatchInit, any other
|
||||||
|
// usage is unsafe.
|
||||||
|
type ObjectPatcher interface {
|
||||||
|
// PatchAttributes patches attributes. Attributes can be patched no more than once,
|
||||||
|
// otherwise, the server returns an error.
|
||||||
|
//
|
||||||
|
// Result means success. Failure reason can be received via Close.
|
||||||
|
PatchAttributes(ctx context.Context, newAttrs []object.Attribute, replace bool) bool
|
||||||
|
|
||||||
|
// PatchPayload patches the object's payload.
|
||||||
|
//
|
||||||
|
// PatchPayload receives `payloadReader` and thus the payload of the patch is read and sent by chunks of
|
||||||
|
// `MaxChunkLength` length.
|
||||||
|
//
|
||||||
|
|||||||
|
// Result means success. Failure reason can be received via Close.
|
||||||
|
PatchPayload(ctx context.Context, rng *object.Range, payloadReader io.Reader) bool
|
||||||
|
|
||||||
|
// Close ends patching the object and returns the result of the operation
|
||||||
|
// along with the final results. Must be called after using the ObjectPatcher.
|
||||||
|
//
|
||||||
|
// Exactly one return value is non-nil. By default, server status is returned in res structure.
|
||||||
|
// Any client's internal or transport errors are returned as Go built-in error.
|
||||||
|
// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
|
||||||
|
// codes are returned as error.
|
||||||
|
//
|
||||||
|
// Return statuses:
|
||||||
|
// - global (see Client docs);
|
||||||
|
// - *apistatus.ContainerNotFound;
|
||||||
|
// - *apistatus.ContainerAccessDenied;
|
||||||
|
// - *apistatus.ObjectAccessDenied;
|
||||||
|
// - *apistatus.ObjectAlreadyRemoved;
|
||||||
|
// - *apistatus.ObjectLocked;
|
||||||
|
// - *apistatus.ObjectOutOfRange;
|
||||||
|
// - *apistatus.SessionTokenNotFound;
|
||||||
|
// - *apistatus.SessionTokenExpired.
|
||||||
|
Close(_ context.Context) (*ResObjectPatch, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResObjectPatch groups resulting values of ObjectPatch operation.
|
||||||
|
type ResObjectPatch struct {
|
||||||
|
statusRes
|
||||||
|
|
||||||
|
obj oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectID returns an object ID of the patched object.
|
||||||
|
func (r ResObjectPatch) ObjectID() oid.ID {
|
||||||
|
return r.obj
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrmObjectPatch groups parameters of ObjectPatch operation.
|
||||||
|
type PrmObjectPatch struct {
|
||||||
|
XHeaders []string
|
||||||
|
|
||||||
|
Address oid.Address
|
||||||
|
|
||||||
|
BearerToken *bearer.Token
|
||||||
|
|
||||||
|
Session *session.Object
|
||||||
|
|
||||||
|
Key *ecdsa.PrivateKey
|
||||||
|
|
||||||
|
MaxChunkLength int
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectPatchInit initializes object patcher.
|
||||||
|
func (c *Client) ObjectPatchInit(ctx context.Context, prm PrmObjectPatch) (ObjectPatcher, error) {
|
||||||
|
if len(prm.XHeaders)%2 != 0 {
|
||||||
|
return nil, errorInvalidXHeaders
|
||||||
|
}
|
||||||
|
|
||||||
|
var objectPatcher objectPatcher
|
||||||
|
stream, err := rpcapi.Patch(&c.c, &objectPatcher.respV2, client.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("open stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objectPatcher.addr = prm.Address
|
||||||
|
objectPatcher.key = &c.prm.Key
|
||||||
|
if prm.Key != nil {
|
||||||
|
objectPatcher.key = prm.Key
|
||||||
|
}
|
||||||
|
objectPatcher.client = c
|
||||||
|
objectPatcher.stream = stream
|
||||||
|
objectPatcher.firstPatchPayload = true
|
||||||
|
|
||||||
|
if prm.MaxChunkLength > 0 {
|
||||||
|
objectPatcher.maxChunkLen = prm.MaxChunkLength
|
||||||
|
} else {
|
||||||
|
objectPatcher.maxChunkLen = defaultGRPCPayloadChunkLen
|
||||||
|
}
|
||||||
|
|
||||||
|
objectPatcher.req.SetBody(&v2object.PatchRequestBody{})
|
||||||
|
|
||||||
|
meta := new(v2session.RequestMetaHeader)
|
||||||
|
writeXHeadersToMeta(prm.XHeaders, meta)
|
||||||
|
|
||||||
|
if prm.BearerToken != nil {
|
||||||
|
v2BearerToken := new(acl.BearerToken)
|
||||||
|
prm.BearerToken.WriteToV2(v2BearerToken)
|
||||||
|
meta.SetBearerToken(v2BearerToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
if prm.Session != nil {
|
||||||
|
v2SessionToken := new(v2session.Token)
|
||||||
|
prm.Session.WriteToV2(v2SessionToken)
|
||||||
|
meta.SetSessionToken(v2SessionToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.prepareRequest(&objectPatcher.req, meta)
|
||||||
|
|
||||||
|
return &objectPatcher, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectPatcher struct {
|
||||||
|
client *Client
|
||||||
|
|
||||||
|
stream interface {
|
||||||
|
Write(*v2object.PatchRequest) error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
res ResObjectPatch
|
||||||
|
err error
|
||||||
|
|
||||||
fyrchik
commented
The interface requires us to perform splitting, even though it also performs it internally. The interface requires us to perform splitting, even though it also performs it internally.
Can we avoid the duplication here?
We might have `Init` function return a more convenient interface on top of it (with `PatchHeader` and `PatchAttributes`) and make this `patch` private (or allow the user to cast to this "raw" interface via another method).
|
|||||||
|
addr oid.Address
|
||||||
|
|
||||||
|
req v2object.PatchRequest
|
||||||
|
respV2 v2object.PatchResponse
|
||||||
|
|
||||||
|
maxChunkLen int
|
||||||
|
|
||||||
|
firstPatchPayload bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectPatcher) PatchAttributes(_ context.Context, newAttrs []object.Attribute, replace bool) bool {
|
||||||
|
return x.patch(&object.Patch{
|
||||||
|
Address: x.addr,
|
||||||
|
NewAttributes: newAttrs,
|
||||||
|
ReplaceAttributes: replace,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectPatcher) PatchPayload(_ context.Context, rng *object.Range, payloadReader io.Reader) bool {
|
||||||
|
offset := rng.GetOffset()
|
||||||
|
|
||||||
|
buf := make([]byte, x.maxChunkLen)
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, err := payloadReader.Read(buf)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
x.err = fmt.Errorf("read payload: %w", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
rngPart := object.NewRange()
|
||||||
|
if x.firstPatchPayload {
|
||||||
|
x.firstPatchPayload = false
|
||||||
|
rngPart.SetOffset(offset)
|
||||||
|
rngPart.SetLength(rng.GetLength())
|
||||||
|
} else {
|
||||||
|
rngPart.SetOffset(offset + rng.GetLength())
|
||||||
|
}
|
||||||
|
|
||||||
|
if !x.patch(&object.Patch{
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It is the default value, no need to set. It is the default value, no need to set.
aarifullin
commented
Fixed Fixed
|
|||||||
|
Address: x.addr,
|
||||||
|
PayloadPatch: &object.PayloadPatch{
|
||||||
|
Range: rngPart,
|
||||||
|
Chunk: buf[:n],
|
||||||
|
},
|
||||||
|
}) {
|
||||||
|
return false
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why are there spaces in every struct literal? Why are there spaces in every struct literal?
aarifullin
commented
Fixed Fixed
|
|||||||
|
}
|
||||||
|
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectPatcher) patch(patch *object.Patch) bool {
|
||||||
|
x.req.SetBody(patch.ToV2())
|
||||||
fyrchik
commented
The whole offset tracking thing is rather tedious. The whole offset tracking thing is rather tedious.
I suggest using `(offset, length)` for the first message and `(offset+length,0)` for the rest. Should be less code, because we don't need any calculations.
aarifullin
commented
Sorry but I don't get the idea and how it helps to recude code. These calculations are needed because we can't just simply break down to > I suggest using (offset, length) for the first message and (offset+length,0)
Sorry but I don't get the idea and how it helps to recude code. These calculations are needed because we can't just simply break down to `(offset, length)` and `(offset + length, 0)` because `chunk` can be very large. This leads to the point when we need to split `(offset, length - max_buf_size)` and `(offset + length - max_buf_size, length - max_buf_size)`
fyrchik
commented
I do not see any problem here. Where do we need to take >we can't just simply break down to (offset, length) and (offset + length, 0) because chunk can be very large
I do not see any problem here.
You replace `(offset,length)` with the first part (of any possible length, you pick).
You replace `(offset+length,0)` with the second part (of any possible length, you pick).
Where do we need to take `chunk` into account?
aarifullin
commented
Let's consider such a patch:
Let then the patch should be splitted like that:
if the first read message is like
So, we need to track the length because it's not always Let's consider such a patch:
```
{ .offset: 0, .length: 4, .payload: '0123456789!@' }
```
Let `maxChunkLen = 2` -> the size for the buff to read data
then the patch should be splitted like that:
```
{ .offset = 0, .length = 2, .payload = '01' } -> replaces 'length' bytes of the original payload
{ .offset = 2, .length = 2, .payload = '23' } -> replaces 'length' bytes of the original payload
{ .offset = 4, .length = 0, .payload = '45' } -> inserts bytes at the original payload
{ .offset = 6, .length = 0, .payload = '67' } -> inserts bytes at the original payload
{ .offset = 8, .length = 0, .payload = '89' } -> inserts bytes at the original payload
{ .offset = 10, .length = 0, .payload = '!@' } -> inserts bytes at the original payload
```
if the first read message is like `(offset, length)` and the rest are `(offset + length, 0)`, then
```
{ .offset = 0, .length = 2, .payload = '01' }
{ .offset = 2, .length = 0, .payload = '23' } -> incorrect
```
So, we need to track the length because it's not always `0` in `(offset + length, 0)`
fyrchik
commented
Why is this correct Why is this correct `{ .offset = 4, .length = 0, .payload = '45' } -> inserts bytes at the original payload `
And this is not `{ .offset = 2, .length = 0, .payload = '23' } -> incorrect` ?
fyrchik
commented
The misunderstanding can stem from the fact that patch size CAN differ from the part in the original payload, so you can replace The misunderstanding can stem from the fact that patch size CAN differ from the part in the original payload, so you can replace `X` bytes with another `Y` bytes, and there is no relation between X and Y.
aarifullin
commented
We replacing first
So, consider the buffer size is
then we insert
So, that how I assumed how it should work 1. Currently it works like that:
```
[0][1][2][3][4][5][6][7][8][9][!][@] {.offset = 0, .length = 4 }
[o][r][i][g][i][n][a][l][p][a][y][l][o][a][d]
```
We replacing first `.length = 4` bytes and just insert the rest
```
[0][1][2][3][4][5][6][7][8][9][!][@] {.offset = 0, .length = 4 }
↓ ↓ ↓ ↓
[o][r][i][g] _ _ _ _ _ _ _ _ [i][n][a][l][p][a][y][l][o][a][d]
[0][1][2][3][4][5][6][7][8][9][!][@][i][n][a][l][p][a][y][l][o][a][d]
```
So, consider the buffer size is `2`, so each subpatch's payload can be only `<=2`
```
[0][1] { .offset = 0, .length = 2, .payload = '01' }
↓ ↓
[o][r][i][g][i][n][a][l][p][a][y][l][o][a][d] -> [0][1][i][g][i][n][a][l][p][a][y][l][o][a][d]
[2][3] { .offset = 2, .length = 2, .payload = '23' }
↓ ↓
[0][1][i][g][i][n][a][l][p][a][y][l][o][a][d] -> [0][1][2][3][i][n][a][l][p][a][y][l][o][a][d]
[4][5] { .offset = 2, .length = 0, .payload = '45' }
↓ ↓
[0][1][2][3] _ _ [i][n][a][l][p][a][y][l][o][a][d] -> [0][1][2][3][4][5][i][n][a][l][p][a][y][l][o][a][d]
...so on...
```
2. If `{ .offset = 2, .length = 0, .payload = '23' }` whould be correct
then we insert `23`, not replace
```
[0][1] { .offset = 0, .length = 2, .payload = '01' }
↓ ↓
[o][r][i][g][i][n][a][l][p][a][y][l][o][a][d] -> [0][1][i][g][i][n][a][l][p][a][y][l][o][a][d]
[2][3] { .offset = 2, .length = 0, .payload = '23' }
↓ ↓
[0][1] _ _ [i][g][i][n][a][l][p][a][y][l][o][a][d] -> [0][1][2][3][i][g][i][n][a][l][p][a][y][l][o][a][d]
```
So, that how I assumed how it should work
fyrchik
commented
You need to replace After After You need to replace `{ .offset = 0, .length = 2, .payload = '01' }` with `{ .offset = 0, .length = 4, .payload = '01' }`.
It _should_ work according to spec, irregardless of what is implemented now.
After `{ .offset = 0, .length = 4, .payload = '01' }`:
[0][1][i][n][a][l][p][a][y][l][o][a][d]
After `{.offset = 4, .length = 0, .payload = '23' }`:
[0][1][2][3][i][n][a][l][p][a][y][l][o][a][d]
aarifullin
commented
Finally, I get your point. Alright, let me check Finally, I get your point. Alright, let me check `patcher` then, because I didn't take this fact in account when I was designing it
aarifullin
commented
I've checked I've checked `patcher`. It correctly works. So, I fixed `PatchPayload` according to your suggestion - it's really perfect
aarifullin
commented
For those who don't get the idea we have been discussing:
For those who don't get the idea we have been discussing:
`ε - empty character, it erases a character with the same position`
```
The original payload
[0][1][2][3][4][5][6][7][8][9][!][@] {.offset = 0, .length = 4 }
[0][1] ε ε { .offset = 0, .length = 4, .payload = '01' } -- length is greater than payload
↓ ↓ ↓ ↓
[o][r][i][g][i][n][a][l][p][a][y][l][o][a][d] -> [0][1][i][n][a][l][p][a][y][l][o][a][d]
[2][3] { .offset = 0 + 4, .length = 0, .payload = '23' }
↓
____________[i][n][a][l][p][a][y][l][o][a][d]
replaced inserted
↓ ↓ ↓ ↓
[0][1] ε ε [2][3] [i][n][a][l][p][a][y][l][o][a][d] -> [0][1][2][3][i][n][a][l][p][a][y][l][o][a][d]
```
fyrchik
commented
I don't see where it is changed, you still have I don't see where it is changed, you still have `length -= patchLength` and patchLength depends on the number of items you have read (`n, err`)
aarifullin
commented
Um... sorry for the confusion - forgot to push after local rebase. Please, check this out Um... sorry for the confusion - forgot to push after local rebase. Please, check this out
|
|||||||
|
x.req.SetVerificationHeader(nil)
|
||||||
|
|
||||||
|
x.err = signature.SignServiceMessage(x.key, &x.req)
|
||||||
|
if x.err != nil {
|
||||||
|
x.err = fmt.Errorf("sign message: %w", x.err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
x.err = x.stream.Write(&x.req)
|
||||||
|
return x.err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *objectPatcher) Close(_ context.Context) (*ResObjectPatch, error) {
|
||||||
|
// Ignore io.EOF error, because it is expected error for client-side
|
||||||
|
// stream termination by the server. E.g. when stream contains invalid
|
||||||
|
// message. Server returns an error in response message (in status).
|
||||||
|
if x.err != nil && !errors.Is(x.err, io.EOF) {
|
||||||
|
return nil, x.err
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.err = x.stream.Close(); x.err != nil {
|
||||||
|
return nil, x.err
|
||||||
|
}
|
||||||
|
|
||||||
|
x.res.st, x.err = x.client.processResponse(&x.respV2)
|
||||||
|
if x.err != nil {
|
||||||
|
return nil, x.err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !apistatus.IsSuccessful(x.res.st) {
|
||||||
|
return &x.res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const fieldID = "ID"
|
||||||
|
|
||||||
|
idV2 := x.respV2.Body.ObjectID
|
||||||
|
if idV2 == nil {
|
||||||
|
return nil, newErrMissingResponseField(fieldID)
|
||||||
|
}
|
||||||
|
|
||||||
|
x.err = x.res.obj.ReadFromV2(*idV2)
|
||||||
|
if x.err != nil {
|
||||||
|
x.err = newErrInvalidResponseField(fieldID, x.err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &x.res, nil
|
||||||
|
}
|
209
client/object_patch_test.go
Normal file
|
@ -0,0 +1,209 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"crypto/elliptic"
|
||||||
|
"crypto/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockPatchStream struct {
|
||||||
|
streamedPayloadPatches []*object.PayloadPatch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockPatchStream) Write(r *v2object.PatchRequest) error {
|
||||||
|
pp := new(object.PayloadPatch)
|
||||||
|
pp.FromV2(r.GetBody().GetPatch())
|
||||||
|
|
||||||
|
if r.GetBody().GetPatch() != nil {
|
||||||
|
bodyChunk := r.GetBody().GetPatch().Chunk
|
||||||
|
pp.Chunk = make([]byte, len(bodyChunk))
|
||||||
|
copy(pp.Chunk, bodyChunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.streamedPayloadPatches = append(m.streamedPayloadPatches, pp)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockPatchStream) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestObjectPatcher(t *testing.T) {
|
||||||
|
type part struct {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It seems this suit of tests could be less in size by using table tests pattern: we have payload, we have range and that's all. It seems this suit of tests could be less in size by using table tests pattern: we have payload, we have range and that's all.
What do you think?
aarifullin
commented
Fixed Fixed
|
|||||||
|
offset int
|
||||||
|
length int
|
||||||
|
chunk string
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range []struct {
|
||||||
|
name string
|
||||||
|
patchPayload string
|
||||||
|
rng *object.Range
|
||||||
|
maxChunkLen int
|
||||||
|
expectParts []part
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no split payload patch",
|
||||||
|
patchPayload: "011111",
|
||||||
|
rng: newRange(0, 6),
|
||||||
|
maxChunkLen: defaultGRPCPayloadChunkLen,
|
||||||
|
expectParts: []part{
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 6,
|
||||||
|
chunk: "011111",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "splitted payload patch",
|
||||||
|
patchPayload: "012345",
|
||||||
|
rng: newRange(0, 6),
|
||||||
|
maxChunkLen: 2,
|
||||||
|
expectParts: []part{
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 6,
|
||||||
|
chunk: "01",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 6,
|
||||||
|
length: 0,
|
||||||
|
chunk: "23",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 6,
|
||||||
|
length: 0,
|
||||||
|
chunk: "45",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "splitted payload patch with zero-length subpatches",
|
||||||
|
patchPayload: "0123456789!@",
|
||||||
|
rng: newRange(0, 4),
|
||||||
|
maxChunkLen: 2,
|
||||||
|
expectParts: []part{
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 4,
|
||||||
|
chunk: "01",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 4,
|
||||||
|
length: 0,
|
||||||
|
chunk: "23",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 4,
|
||||||
|
length: 0,
|
||||||
|
chunk: "45",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 4,
|
||||||
|
length: 0,
|
||||||
|
chunk: "67",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 4,
|
||||||
|
length: 0,
|
||||||
|
chunk: "89",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 4,
|
||||||
|
length: 0,
|
||||||
|
chunk: "!@",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "splitted payload patch with zero-length subpatches only",
|
||||||
|
patchPayload: "0123456789!@",
|
||||||
|
rng: newRange(0, 0),
|
||||||
|
maxChunkLen: 2,
|
||||||
|
expectParts: []part{
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 0,
|
||||||
|
chunk: "01",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 0,
|
||||||
|
chunk: "23",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 0,
|
||||||
|
chunk: "45",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 0,
|
||||||
|
chunk: "67",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 0,
|
||||||
|
chunk: "89",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
offset: 0,
|
||||||
|
length: 0,
|
||||||
|
chunk: "!@",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
m := &mockPatchStream{}
|
||||||
|
|
||||||
|
pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||||
|
|
||||||
|
patcher := objectPatcher{
|
||||||
|
client: &Client{},
|
||||||
|
stream: m,
|
||||||
|
addr: oidtest.Address(),
|
||||||
|
key: pk,
|
||||||
|
maxChunkLen: test.maxChunkLen,
|
||||||
|
firstPatchPayload: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
success := patcher.PatchAttributes(context.Background(), nil, false)
|
||||||
|
require.True(t, success)
|
||||||
|
|
||||||
|
success = patcher.PatchPayload(context.Background(), test.rng, bytes.NewReader([]byte(test.patchPayload)))
|
||||||
|
require.True(t, success)
|
||||||
|
|
||||||
|
require.Len(t, m.streamedPayloadPatches, len(test.expectParts)+1)
|
||||||
|
|
||||||
|
// m.streamedPayloadPatches[0] is attribute patch, so skip it
|
||||||
|
for i, part := range test.expectParts {
|
||||||
|
requireRangeChunk(t, m.streamedPayloadPatches[i+1], part.offset, part.length, part.chunk)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireRangeChunk(t *testing.T, pp *object.PayloadPatch, offset, length int, chunk string) {
|
||||||
|
require.NotNil(t, pp)
|
||||||
|
require.Equal(t, uint64(offset), pp.Range.GetOffset())
|
||||||
|
require.Equal(t, uint64(length), pp.Range.GetLength())
|
||||||
|
require.Equal(t, []byte(chunk), pp.Chunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRange(offest, length uint64) *object.Range {
|
||||||
|
rng := &object.Range{}
|
||||||
|
rng.SetOffset(offest)
|
||||||
|
rng.SetLength(length)
|
||||||
|
return rng
|
||||||
|
}
|
2
go.mod
|
@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-sdk-go
|
||||||
go 1.21
|
go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240730145254-c27b978770a3
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240806093111-ebaf78c8faab
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
|
|
BIN
go.sum
|
@ -145,6 +145,10 @@ func (m *mockClient) objectPut(context.Context, PrmObjectPut) (ResPutObject, err
|
||||||
return ResPutObject{}, nil
|
return ResPutObject{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockClient) objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error) {
|
||||||
|
return ResPatchObject{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {
|
func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
128
pool/pool.go
|
@ -66,6 +66,8 @@ type client interface {
|
||||||
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
|
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
|
||||||
// see clientWrapper.objectPut.
|
// see clientWrapper.objectPut.
|
||||||
objectPut(context.Context, PrmObjectPut) (ResPutObject, error)
|
objectPut(context.Context, PrmObjectPut) (ResPutObject, error)
|
||||||
|
// see clientWrapper.objectPatch.
|
||||||
|
objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error)
|
||||||
// see clientWrapper.objectDelete.
|
// see clientWrapper.objectDelete.
|
||||||
objectDelete(context.Context, PrmObjectDelete) error
|
objectDelete(context.Context, PrmObjectDelete) error
|
||||||
// see clientWrapper.objectGet.
|
// see clientWrapper.objectGet.
|
||||||
|
@ -160,6 +162,7 @@ const (
|
||||||
methodObjectGet
|
methodObjectGet
|
||||||
methodObjectHead
|
methodObjectHead
|
||||||
methodObjectRange
|
methodObjectRange
|
||||||
|
methodObjectPatch
|
||||||
methodSessionCreate
|
methodSessionCreate
|
||||||
methodAPEManagerAddChain
|
methodAPEManagerAddChain
|
||||||
methodAPEManagerRemoveChain
|
methodAPEManagerRemoveChain
|
||||||
|
@ -192,6 +195,8 @@ func (m MethodIndex) String() string {
|
||||||
return "netMapSnapshot"
|
return "netMapSnapshot"
|
||||||
case methodObjectPut:
|
case methodObjectPut:
|
||||||
return "objectPut"
|
return "objectPut"
|
||||||
|
case methodObjectPatch:
|
||||||
|
return "objectPatch"
|
||||||
case methodObjectDelete:
|
case methodObjectDelete:
|
||||||
return "objectDelete"
|
return "objectDelete"
|
||||||
case methodObjectGet:
|
case methodObjectGet:
|
||||||
|
@ -724,6 +729,44 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
|
||||||
return res.NetMap(), nil
|
return res.NetMap(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// objectPatch patches object in FrostFS.
|
||||||
|
func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) {
|
||||||
|
cl, err := c.getClient()
|
||||||
|
if err != nil {
|
||||||
|
return ResPatchObject{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{
|
||||||
|
Address: prm.addr,
|
||||||
|
Session: prm.stoken,
|
||||||
|
Key: prm.key,
|
||||||
|
BearerToken: prm.btoken,
|
||||||
|
MaxChunkLength: prm.maxPayloadPatchChunkLength,
|
||||||
|
})
|
||||||
|
if err = c.handleError(ctx, nil, err); err != nil {
|
||||||
|
return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err)
|
||||||
|
}
|
||||||
|
c.incRequests(time.Since(start), methodObjectPatch)
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
|
attrPatchSuccess := pObj.PatchAttributes(ctx, prm.newAttrs, prm.replaceAttrs)
|
||||||
|
c.incRequests(time.Since(start), methodObjectPatch)
|
||||||
|
|
||||||
|
if attrPatchSuccess {
|
||||||
|
start = time.Now()
|
||||||
|
_ = pObj.PatchPayload(ctx, prm.rng, prm.payload)
|
||||||
|
c.incRequests(time.Since(start), methodObjectPatch)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := pObj.Close(ctx)
|
||||||
|
if err = c.handleError(ctx, res.Status(), err); err != nil {
|
||||||
|
return ResPatchObject{}, fmt.Errorf("client failure: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ResPatchObject{ObjectID: res.ObjectID()}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// objectPut writes object to FrostFS.
|
// objectPut writes object to FrostFS.
|
||||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
||||||
if prm.bufferMaxSize == 0 {
|
if prm.bufferMaxSize == 0 {
|
||||||
|
@ -1545,6 +1588,53 @@ func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
|
||||||
x.networkInfo = ni
|
x.networkInfo = ni
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PrmObjectPatch groups parameters of PatchObject operation.
|
||||||
|
type PrmObjectPatch struct {
|
||||||
|
prmCommon
|
||||||
|
|
||||||
|
addr oid.Address
|
||||||
|
|
||||||
|
rng *object.Range
|
||||||
|
|
||||||
|
payload io.Reader
|
||||||
|
|
||||||
|
newAttrs []object.Attribute
|
||||||
|
|
||||||
|
replaceAttrs bool
|
||||||
|
|
||||||
|
maxPayloadPatchChunkLength int
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetAddress sets the address of the object that is patched.
|
||||||
|
func (x *PrmObjectPatch) SetAddress(addr oid.Address) {
|
||||||
|
x.addr = addr
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRange sets the patch's range.
|
||||||
|
func (x *PrmObjectPatch) SetRange(rng *object.Range) {
|
||||||
|
x.rng = rng
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPayloadReader sets a payload reader.
|
||||||
|
func (x *PrmObjectPatch) SetPayloadReader(payload io.Reader) {
|
||||||
|
x.payload = payload
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRange sets the new attributes to the patch.
|
||||||
|
func (x *PrmObjectPatch) SetNewAttributes(newAttrs []object.Attribute) {
|
||||||
|
x.newAttrs = newAttrs
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRange sets the replace attributes flag to the patch.
|
||||||
|
func (x *PrmObjectPatch) SetReplaceAttributes(replaceAttrs bool) {
|
||||||
|
x.replaceAttrs = replaceAttrs
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetMaxPayloadPatchChunkSize sets a max buf size to read the patch's payload.
|
||||||
|
func (x *PrmObjectPatch) SetMaxPayloadPatchChunkSize(maxPayloadPatchChunkSize int) {
|
||||||
|
x.maxPayloadPatchChunkLength = maxPayloadPatchChunkSize
|
||||||
|
}
|
||||||
|
|
||||||
// PrmObjectDelete groups parameters of DeleteObject operation.
|
// PrmObjectDelete groups parameters of DeleteObject operation.
|
||||||
type PrmObjectDelete struct {
|
type PrmObjectDelete struct {
|
||||||
prmCommon
|
prmCommon
|
||||||
|
@ -2389,6 +2479,44 @@ type ResPutObject struct {
|
||||||
Epoch uint64
|
Epoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResPatchObject is designed to provide identifier for the saved patched object.
|
||||||
|
type ResPatchObject struct {
|
||||||
|
ObjectID oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// PatchObject patches an object through a remote server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) PatchObject(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) {
|
||||||
|
var prmCtx prmContext
|
||||||
|
prmCtx.useDefaultSession()
|
||||||
|
prmCtx.useVerb(session.VerbObjectPatch)
|
||||||
|
prmCtx.useContainer(prm.addr.Container())
|
||||||
|
|
||||||
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
|
var ctxCall callContext
|
||||||
|
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
||||||
|
return ResPatchObject{}, fmt.Errorf("init call context: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctxCall.sessionDefault {
|
||||||
|
ctxCall.sessionTarget = prm.UseSession
|
||||||
|
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
|
||||||
|
return ResPatchObject{}, fmt.Errorf("open default session: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := ctxCall.client.objectPatch(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
// removes session token from cache in case of token error
|
||||||
|
p.checkSessionTokenErr(err, ctxCall.endpoint)
|
||||||
|
return ResPatchObject{}, fmt.Errorf("init patching on API client %s: %w", ctxCall.endpoint, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
// PutObject writes an object through a remote server using FrostFS API protocol.
|
// PutObject writes an object through a remote server using FrostFS API protocol.
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
|
|
@ -237,6 +237,7 @@ const (
|
||||||
VerbObjectDelete // Delete rpc
|
VerbObjectDelete // Delete rpc
|
||||||
VerbObjectRange // GetRange rpc
|
VerbObjectRange // GetRange rpc
|
||||||
VerbObjectRangeHash // GetRangeHash rpc
|
VerbObjectRangeHash // GetRangeHash rpc
|
||||||
|
VerbObjectPatch // Patch rpc
|
||||||
)
|
)
|
||||||
|
|
||||||
// ForVerb specifies the object operation of the session scope. Each
|
// ForVerb specifies the object operation of the session scope. Each
|
||||||
|
|
|
@ -599,6 +599,7 @@ func TestObject_ForVerb(t *testing.T) {
|
||||||
session.VerbObjectRangeHash: v2session.ObjectVerbRangeHash,
|
session.VerbObjectRangeHash: v2session.ObjectVerbRangeHash,
|
||||||
session.VerbObjectRange: v2session.ObjectVerbRange,
|
session.VerbObjectRange: v2session.ObjectVerbRange,
|
||||||
session.VerbObjectDelete: v2session.ObjectVerbDelete,
|
session.VerbObjectDelete: v2session.ObjectVerbDelete,
|
||||||
|
session.VerbObjectPatch: v2session.ObjectVerbPatch,
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Why Why `v2session.ObjectPatch`, but not `v2session.ObjectVerbPatch` like other methods?
aarifullin
commented
https://git.frostfs.info/TrueCloudLab/frostfs-api-go/pulls/100#issuecomment-46447
|
|||||||
} {
|
} {
|
||||||
val.ForVerb(from)
|
val.ForVerb(from)
|
||||||
|
|
||||||
|
|
@fyrchik, please, let's go on discussing here
(from #248 (comment))
let's introduce complexity when it is really needed
Oh, I see now, it is similar to
ObjectWriter
. Then OK.Can we make it more similar to
ObjectWriter
then? It hasWriteHeader
andWriteChunk
as separate methods, similar to what you have done for patcher implementation.Please, check the refactored interface out