[#249] pool: Introduce objectPatch
method
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
869d59f6bf
commit
5b77cbdc79
2 changed files with 132 additions and 0 deletions
|
@ -145,6 +145,10 @@ func (m *mockClient) objectPut(context.Context, PrmObjectPut) (ResPutObject, err
|
|||
return ResPutObject{}, nil
|
||||
}
|
||||
|
||||
func (m *mockClient) objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error) {
|
||||
return ResPatchObject{}, nil
|
||||
}
|
||||
|
||||
func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {
|
||||
return nil
|
||||
}
|
||||
|
|
128
pool/pool.go
128
pool/pool.go
|
@ -66,6 +66,8 @@ type client interface {
|
|||
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
|
||||
// see clientWrapper.objectPut.
|
||||
objectPut(context.Context, PrmObjectPut) (ResPutObject, error)
|
||||
// see clientWrapper.objectPatch.
|
||||
objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error)
|
||||
// see clientWrapper.objectDelete.
|
||||
objectDelete(context.Context, PrmObjectDelete) error
|
||||
// see clientWrapper.objectGet.
|
||||
|
@ -160,6 +162,7 @@ const (
|
|||
methodObjectGet
|
||||
methodObjectHead
|
||||
methodObjectRange
|
||||
methodObjectPatch
|
||||
methodSessionCreate
|
||||
methodAPEManagerAddChain
|
||||
methodAPEManagerRemoveChain
|
||||
|
@ -192,6 +195,8 @@ func (m MethodIndex) String() string {
|
|||
return "netMapSnapshot"
|
||||
case methodObjectPut:
|
||||
return "objectPut"
|
||||
case methodObjectPatch:
|
||||
return "objectPatch"
|
||||
case methodObjectDelete:
|
||||
return "objectDelete"
|
||||
case methodObjectGet:
|
||||
|
@ -724,6 +729,44 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
|
|||
return res.NetMap(), nil
|
||||
}
|
||||
|
||||
// objectPatch patches object in FrostFS.
|
||||
func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) {
|
||||
cl, err := c.getClient()
|
||||
if err != nil {
|
||||
return ResPatchObject{}, err
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{
|
||||
Address: prm.addr,
|
||||
Session: prm.stoken,
|
||||
Key: prm.key,
|
||||
BearerToken: prm.btoken,
|
||||
MaxChunkLength: prm.maxPayloadPatchChunkLength,
|
||||
})
|
||||
if err = c.handleError(ctx, nil, err); err != nil {
|
||||
return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err)
|
||||
}
|
||||
c.incRequests(time.Since(start), methodObjectPatch)
|
||||
|
||||
start = time.Now()
|
||||
attrPatchSuccess := pObj.PatchAttributes(ctx, prm.newAttrs, prm.replaceAttrs)
|
||||
c.incRequests(time.Since(start), methodObjectPatch)
|
||||
|
||||
if attrPatchSuccess {
|
||||
start = time.Now()
|
||||
_ = pObj.PatchPayload(ctx, prm.rng, prm.payload)
|
||||
c.incRequests(time.Since(start), methodObjectPatch)
|
||||
}
|
||||
|
||||
res, err := pObj.Close(ctx)
|
||||
if err = c.handleError(ctx, res.Status(), err); err != nil {
|
||||
return ResPatchObject{}, fmt.Errorf("client failure: %w", err)
|
||||
}
|
||||
|
||||
return ResPatchObject{ObjectID: res.ObjectID()}, nil
|
||||
}
|
||||
|
||||
// objectPut writes object to FrostFS.
|
||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
||||
if prm.bufferMaxSize == 0 {
|
||||
|
@ -1545,6 +1588,53 @@ func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
|
|||
x.networkInfo = ni
|
||||
}
|
||||
|
||||
// PrmObjectPatch groups parameters of PatchObject operation.
|
||||
type PrmObjectPatch struct {
|
||||
prmCommon
|
||||
|
||||
addr oid.Address
|
||||
|
||||
rng *object.Range
|
||||
|
||||
payload io.Reader
|
||||
|
||||
newAttrs []object.Attribute
|
||||
|
||||
replaceAttrs bool
|
||||
|
||||
maxPayloadPatchChunkLength int
|
||||
}
|
||||
|
||||
// SetAddress sets the address of the object that is patched.
|
||||
func (x *PrmObjectPatch) SetAddress(addr oid.Address) {
|
||||
x.addr = addr
|
||||
}
|
||||
|
||||
// SetRange sets the patch's range.
|
||||
func (x *PrmObjectPatch) SetRange(rng *object.Range) {
|
||||
x.rng = rng
|
||||
}
|
||||
|
||||
// SetPayloadReader sets a payload reader.
|
||||
func (x *PrmObjectPatch) SetPayloadReader(payload io.Reader) {
|
||||
x.payload = payload
|
||||
}
|
||||
|
||||
// SetRange sets the new attributes to the patch.
|
||||
func (x *PrmObjectPatch) SetNewAttributes(newAttrs []object.Attribute) {
|
||||
x.newAttrs = newAttrs
|
||||
}
|
||||
|
||||
// SetRange sets the replace attributes flag to the patch.
|
||||
func (x *PrmObjectPatch) SetReplaceAttributes(replaceAttrs bool) {
|
||||
x.replaceAttrs = replaceAttrs
|
||||
}
|
||||
|
||||
// SetMaxPayloadPatchChunkSize sets a max buf size to read the patch's payload.
|
||||
func (x *PrmObjectPatch) SetMaxPayloadPatchChunkSize(maxPayloadPatchChunkSize int) {
|
||||
x.maxPayloadPatchChunkLength = maxPayloadPatchChunkSize
|
||||
}
|
||||
|
||||
// PrmObjectDelete groups parameters of DeleteObject operation.
|
||||
type PrmObjectDelete struct {
|
||||
prmCommon
|
||||
|
@ -2389,6 +2479,44 @@ type ResPutObject struct {
|
|||
Epoch uint64
|
||||
}
|
||||
|
||||
// ResPatchObject is designed to provide identifier for the saved patched object.
|
||||
type ResPatchObject struct {
|
||||
ObjectID oid.ID
|
||||
}
|
||||
|
||||
// PatchObject patches an object through a remote server using FrostFS API protocol.
|
||||
//
|
||||
// Main return value MUST NOT be processed on an erroneous return.
|
||||
func (p *Pool) PatchObject(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) {
|
||||
var prmCtx prmContext
|
||||
prmCtx.useDefaultSession()
|
||||
prmCtx.useVerb(session.VerbObjectPatch)
|
||||
prmCtx.useContainer(prm.addr.Container())
|
||||
|
||||
p.fillAppropriateKey(&prm.prmCommon)
|
||||
|
||||
var ctxCall callContext
|
||||
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
||||
return ResPatchObject{}, fmt.Errorf("init call context: %w", err)
|
||||
}
|
||||
|
||||
if ctxCall.sessionDefault {
|
||||
ctxCall.sessionTarget = prm.UseSession
|
||||
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
|
||||
return ResPatchObject{}, fmt.Errorf("open default session: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
res, err := ctxCall.client.objectPatch(ctx, prm)
|
||||
if err != nil {
|
||||
// removes session token from cache in case of token error
|
||||
p.checkSessionTokenErr(err, ctxCall.endpoint)
|
||||
return ResPatchObject{}, fmt.Errorf("init patching on API client %s: %w", ctxCall.endpoint, err)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// PutObject writes an object through a remote server using FrostFS API protocol.
|
||||
//
|
||||
// Main return value MUST NOT be processed on an erroneous return.
|
||||
|
|
Loading…
Reference in a new issue