From f0a8e54735cb39662e0a2bfc3beb59ac3ecd7e2d Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Mon, 5 Aug 2024 23:18:00 +0300 Subject: [PATCH] [#249] pool: Introduce `objectPatch` method Signed-off-by: Airat Arifullin --- pool/mock_test.go | 4 ++ pool/pool.go | 128 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/pool/mock_test.go b/pool/mock_test.go index d555afd..994fecd 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -145,6 +145,10 @@ func (m *mockClient) objectPut(context.Context, PrmObjectPut) (ResPutObject, err return ResPutObject{}, nil } +func (m *mockClient) objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error) { + return ResPatchObject{}, nil +} + func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error { return nil } diff --git a/pool/pool.go b/pool/pool.go index 8cfda26..992c6d7 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -66,6 +66,8 @@ type client interface { netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error) // see clientWrapper.objectPut. objectPut(context.Context, PrmObjectPut) (ResPutObject, error) + // see clientWrapper.objectPatch. + objectPatch(context.Context, PrmObjectPatch) (ResPatchObject, error) // see clientWrapper.objectDelete. objectDelete(context.Context, PrmObjectDelete) error // see clientWrapper.objectGet. @@ -164,6 +166,7 @@ const ( methodAPEManagerAddChain methodAPEManagerRemoveChain methodAPEManagerListChains + methodObjectPatch methodLast ) @@ -192,6 +195,8 @@ func (m MethodIndex) String() string { return "netMapSnapshot" case methodObjectPut: return "objectPut" + case methodObjectPatch: + return "objectPatch" case methodObjectDelete: return "objectDelete" case methodObjectGet: @@ -724,6 +729,44 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) return res.NetMap(), nil } +// objectPatch patches object in FrostFS. +func (c *clientWrapper) objectPatch(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) { + cl, err := c.getClient() + if err != nil { + return ResPatchObject{}, err + } + + start := time.Now() + pObj, err := cl.ObjectPatchInit(ctx, sdkClient.PrmObjectPatch{ + Address: prm.addr, + Session: prm.stoken, + Key: prm.key, + BearerToken: prm.btoken, + MaxChunkLength: prm.maxPayloadPatchChunkLength, + }) + if err = c.handleError(ctx, nil, err); err != nil { + return ResPatchObject{}, fmt.Errorf("init patching on API client: %w", err) + } + c.incRequests(time.Since(start), methodObjectPatch) + + start = time.Now() + attrPatchSuccess := pObj.PatchAttributes(ctx, prm.newAttrs, prm.replaceAttrs) + c.incRequests(time.Since(start), methodObjectPatch) + + if attrPatchSuccess { + start = time.Now() + _ = pObj.PatchPayload(ctx, prm.rng, prm.payload) + c.incRequests(time.Since(start), methodObjectPatch) + } + + res, err := pObj.Close(ctx) + if err = c.handleError(ctx, res.Status(), err); err != nil { + return ResPatchObject{}, fmt.Errorf("client failure: %w", err) + } + + return ResPatchObject{ObjectID: res.ObjectID()}, nil +} + // objectPut writes object to FrostFS. func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { if prm.bufferMaxSize == 0 { @@ -1545,6 +1588,53 @@ func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) { x.networkInfo = ni } +// PrmObjectPatch groups parameters of PatchObject operation. +type PrmObjectPatch struct { + prmCommon + + addr oid.Address + + rng *object.Range + + payload io.Reader + + newAttrs []object.Attribute + + replaceAttrs bool + + maxPayloadPatchChunkLength int +} + +// SetAddress sets the address of the object that is patched. +func (x *PrmObjectPatch) SetAddress(addr oid.Address) { + x.addr = addr +} + +// SetRange sets the patch's range. +func (x *PrmObjectPatch) SetRange(rng *object.Range) { + x.rng = rng +} + +// SetPayloadReader sets a payload reader. +func (x *PrmObjectPatch) SetPayloadReader(payload io.Reader) { + x.payload = payload +} + +// SetRange sets the new attributes to the patch. +func (x *PrmObjectPatch) SetNewAttributes(newAttrs []object.Attribute) { + x.newAttrs = newAttrs +} + +// SetRange sets the replace attributes flag to the patch. +func (x *PrmObjectPatch) SetReplaceAttributes(replaceAttrs bool) { + x.replaceAttrs = replaceAttrs +} + +// SetMaxPayloadPatchChunkSize sets a max buf size to read the patch's payload. +func (x *PrmObjectPatch) SetMaxPayloadPatchChunkSize(maxPayloadPatchChunkSize int) { + x.maxPayloadPatchChunkLength = maxPayloadPatchChunkSize +} + // PrmObjectDelete groups parameters of DeleteObject operation. type PrmObjectDelete struct { prmCommon @@ -2389,6 +2479,44 @@ type ResPutObject struct { Epoch uint64 } +// ResPatchObject is designed to provide identifier for the saved patched object. +type ResPatchObject struct { + ObjectID oid.ID +} + +// PatchObject patches an object through a remote server using FrostFS API protocol. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) PatchObject(ctx context.Context, prm PrmObjectPatch) (ResPatchObject, error) { + var prmCtx prmContext + prmCtx.useDefaultSession() + prmCtx.useVerb(session.VerbObjectPatch) + prmCtx.useContainer(prm.addr.Container()) + + p.fillAppropriateKey(&prm.prmCommon) + + var ctxCall callContext + if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { + return ResPatchObject{}, fmt.Errorf("init call context: %w", err) + } + + if ctxCall.sessionDefault { + ctxCall.sessionTarget = prm.UseSession + if err := p.openDefaultSession(ctx, &ctxCall); err != nil { + return ResPatchObject{}, fmt.Errorf("open default session: %w", err) + } + } + + res, err := ctxCall.client.objectPatch(ctx, prm) + if err != nil { + // removes session token from cache in case of token error + p.checkSessionTokenErr(err, ctxCall.endpoint) + return ResPatchObject{}, fmt.Errorf("init patching on API client %s: %w", ctxCall.endpoint, err) + } + + return res, nil +} + // PutObject writes an object through a remote server using FrostFS API protocol. // // Main return value MUST NOT be processed on an erroneous return.