[#248] pool: Introduce objectPatch
method
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
fcbf7de470
commit
c0109f796a
2 changed files with 161 additions and 0 deletions
|
@ -145,6 +145,10 @@ func (m *mockClient) objectPut(context.Context, PrmObjectPut) (ResPutObject, err
|
||||||
return ResPutObject{}, nil
|
return ResPutObject{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockClient) objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error) {
|
||||||
|
return ResPatchObject{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {
|
func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
157
pool/pool.go
157
pool/pool.go
|
@ -66,6 +66,8 @@ type client interface {
|
||||||
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
|
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
|
||||||
// see clientWrapper.objectPut.
|
// see clientWrapper.objectPut.
|
||||||
objectPut(context.Context, PrmObjectPut) (ResPutObject, error)
|
objectPut(context.Context, PrmObjectPut) (ResPutObject, error)
|
||||||
|
// see clientWrapper.objectPatch.
|
||||||
|
objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error)
|
||||||
// see clientWrapper.objectDelete.
|
// see clientWrapper.objectDelete.
|
||||||
objectDelete(context.Context, PrmObjectDelete) error
|
objectDelete(context.Context, PrmObjectDelete) error
|
||||||
// see clientWrapper.objectGet.
|
// see clientWrapper.objectGet.
|
||||||
|
@ -164,6 +166,7 @@ const (
|
||||||
methodAPEManagerAddChain
|
methodAPEManagerAddChain
|
||||||
methodAPEManagerRemoveChain
|
methodAPEManagerRemoveChain
|
||||||
methodAPEManagerListChains
|
methodAPEManagerListChains
|
||||||
|
methodObjectPatch
|
||||||
methodLast
|
methodLast
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -192,6 +195,8 @@ func (m MethodIndex) String() string {
|
||||||
return "netMapSnapshot"
|
return "netMapSnapshot"
|
||||||
case methodObjectPut:
|
case methodObjectPut:
|
||||||
return "objectPut"
|
return "objectPut"
|
||||||
|
case methodObjectPatch:
|
||||||
|
return "objectPatch"
|
||||||
case methodObjectDelete:
|
case methodObjectDelete:
|
||||||
return "objectDelete"
|
return "objectDelete"
|
||||||
case methodObjectGet:
|
case methodObjectGet:
|
||||||
|
@ -724,6 +729,106 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
|
||||||
return res.NetMap(), nil
|
return res.NetMap(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// objectPatch patches object in FrostFS.
|
||||||
|
func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) {
|
||||||
|
cl, err := c.getClient()
|
||||||
|
if err != nil {
|
||||||
|
return ResPatchObject{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{
|
||||||
|
Session: prm.stoken, Key: prm.key, BearerToken: prm.btoken,
|
||||||
|
})
|
||||||
|
if err = c.handleError(ctx, nil, err); err != nil {
|
||||||
|
return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err)
|
||||||
|
}
|
||||||
|
c.incRequests(time.Since(time.Now()), methodObjectPatch)
|
||||||
|
|
||||||
|
if err := c.objectPatchClientCut(ctx, pObj, prm); err != nil {
|
||||||
|
return ResPatchObject{}, c.handleError(ctx, nil, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := pObj.Close(ctx)
|
||||||
|
if err = c.handleError(ctx, res.Status(), err); err != nil {
|
||||||
|
return ResPatchObject{}, fmt.Errorf("client failure: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ResPatchObject{ObjectID: res.ObjectID()}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientWrapper) objectPatchClientCut(ctx context.Context, pObj sdkClient.ObjectPatcher, prm PrmObjectPatch) error {
|
||||||
|
if prm.payload == nil {
|
||||||
|
if !prm.replaceAttrs && len(prm.newAttrs) == 0 {
|
||||||
|
return errors.New("invalid patch params")
|
||||||
|
}
|
||||||
|
|
||||||
|
// patch only attributes
|
||||||
|
return c.sendPatch(ctx, pObj, &object.Patch{
|
||||||
|
Address: prm.addr,
|
||||||
|
NewAttributes: prm.newAttrs,
|
||||||
|
ReplaceAttributes: prm.replaceAttrs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, prm.maxBufSize)
|
||||||
|
offset, length := prm.rng.GetOffset(), prm.rng.GetLength()
|
||||||
|
firstPatch := true
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, err := prm.payload.Read(buf)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return fmt.Errorf("read payload: %w", err)
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
patchLength := uint64(n)
|
||||||
|
if length > 0 {
|
||||||
|
patchLength = min(uint64(n), length)
|
||||||
|
}
|
||||||
|
|
||||||
|
rng := object.NewRange()
|
||||||
|
rng.SetOffset(offset)
|
||||||
|
rng.SetLength(patchLength)
|
||||||
|
|
||||||
|
p := &object.Patch{
|
||||||
|
Address: prm.addr,
|
||||||
|
PayloadPatch: &object.PayloadPatch{
|
||||||
|
Range: rng,
|
||||||
|
Chunk: buf[:n],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if firstPatch {
|
||||||
|
p.NewAttributes = prm.newAttrs
|
||||||
|
p.ReplaceAttributes = prm.replaceAttrs
|
||||||
|
firstPatch = false
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.sendPatch(ctx, pObj, p); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += uint64(n)
|
||||||
|
if length > 0 {
|
||||||
|
length -= patchLength
|
||||||
|
}
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientWrapper) sendPatch(ctx context.Context, patcher sdkClient.ObjectPatcher, p *object.Patch) error {
|
||||||
|
start := time.Now()
|
||||||
|
if !patcher.Patch(ctx, p) {
|
||||||
|
return errors.New("failed to apply patch")
|
||||||
|
}
|
||||||
|
c.incRequests(time.Since(start), methodObjectPatch)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// objectPut writes object to FrostFS.
|
// objectPut writes object to FrostFS.
|
||||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
||||||
if prm.bufferMaxSize == 0 {
|
if prm.bufferMaxSize == 0 {
|
||||||
|
@ -1545,6 +1650,53 @@ func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
|
||||||
x.networkInfo = ni
|
x.networkInfo = ni
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PrmObjectPatch groups parameters of PatchObject operation.
|
||||||
|
type PrmObjectPatch struct {
|
||||||
|
prmCommon
|
||||||
|
|
||||||
|
addr oid.Address
|
||||||
|
|
||||||
|
rng *object.Range
|
||||||
|
|
||||||
|
payload io.Reader
|
||||||
|
|
||||||
|
newAttrs []object.Attribute
|
||||||
|
|
||||||
|
replaceAttrs bool
|
||||||
|
|
||||||
|
maxBufSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetAddress sets the address of the object that is patched.
|
||||||
|
func (x *PrmObjectPatch) SetAddress(addr oid.Address) {
|
||||||
|
x.addr = addr
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRange sets the patch's range.
|
||||||
|
func (x *PrmObjectPatch) SetRange(rng *object.Range) {
|
||||||
|
x.rng = rng
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPayloadReader sets a payload reader.
|
||||||
|
func (x *PrmObjectPatch) SetPayloadReader(payload io.Reader) {
|
||||||
|
x.payload = payload
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRange sets the new attributes to the patch.
|
||||||
|
func (x *PrmObjectPatch) SetNewAttributes(newAttrs []object.Attribute) {
|
||||||
|
x.newAttrs = newAttrs
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRange sets the replace attributes flag to the patch.
|
||||||
|
func (x *PrmObjectPatch) SetReplaceAttributes(replaceAttrs bool) {
|
||||||
|
x.replaceAttrs = replaceAttrs
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetMaxBufSize sets a max buf size to read the patch's payload.
|
||||||
|
func (x *PrmObjectPatch) SetMaxBufSize(maxBufSize int) {
|
||||||
|
x.maxBufSize = maxBufSize
|
||||||
|
}
|
||||||
|
|
||||||
// PrmObjectDelete groups parameters of DeleteObject operation.
|
// PrmObjectDelete groups parameters of DeleteObject operation.
|
||||||
type PrmObjectDelete struct {
|
type PrmObjectDelete struct {
|
||||||
prmCommon
|
prmCommon
|
||||||
|
@ -2389,6 +2541,11 @@ type ResPutObject struct {
|
||||||
Epoch uint64
|
Epoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResPatchObject is designed to provide identifier for the saved patched object.
|
||||||
|
type ResPatchObject struct {
|
||||||
|
ObjectID oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
// PutObject writes an object through a remote server using FrostFS API protocol.
|
// PutObject writes an object through a remote server using FrostFS API protocol.
|
||||||
//
|
//
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
|
Loading…
Reference in a new issue