[#xxx] pool: Return creation epoch from object put
Some checks failed
DCO / DCO (pull_request) Failing after 1m22s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m15s
Tests and linters / Tests (1.20) (pull_request) Successful in 2m28s
Tests and linters / Lint (pull_request) Successful in 5m35s

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-07-12 14:35:01 +03:00
parent 51cefd4908
commit 8d764f37aa
10 changed files with 52 additions and 27 deletions

View file

@ -72,7 +72,8 @@ func (x *PrmObjectPutInit) SetGRPCPayloadChunkLen(v int) {
type ResObjectPut struct {
statusRes
obj oid.ID
obj oid.ID
epoch uint64
}
// StoredObjectID returns identifier of the saved object.
@ -80,6 +81,10 @@ func (x ResObjectPut) StoredObjectID() oid.ID {
return x.obj
}
func (x ResObjectPut) StoredEpoch() uint64 {
return x.epoch
}
// ObjectWriter is designed to write one object or
// multiple parts of one object to FrostFS system.
//

View file

@ -175,6 +175,7 @@ func (x *objectWriterRaw) Close(_ context.Context) (*ResObjectPut, error) {
if x.err != nil {
x.err = newErrInvalidResponseField(fieldID, x.err)
}
x.res.epoch = x.respV2.GetMetaHeader().GetEpoch()
return &x.res, nil
}

View file

@ -93,6 +93,12 @@ func (prm *PrmObjectPutSingle) SetObject(o *v2object.Object) {
// ResObjectPutSingle groups resulting values of PutSingle operation.
type ResObjectPutSingle struct {
statusRes
epoch uint64
}
func (r *ResObjectPutSingle) Epoch() uint64 {
return r.epoch
}
func (prm *PrmObjectPutSingle) buildRequest(c *Client) (*v2object.PutSingleRequest, error) {
@ -162,6 +168,7 @@ func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (*
if err != nil {
return nil, err
}
res.epoch = resp.GetMetaHeader().GetEpoch()
return &res, nil
}

View file

@ -58,8 +58,11 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
return nil, err
}
if ai != nil && ai.ParentID != nil {
x.it.res.obj = *ai.ParentID
if ai != nil {
x.it.res.epoch = ai.Epoch
if ai.ParentID != nil {
x.it.res.obj = *ai.ParentID
}
}
return x.it.res, nil
}
@ -121,6 +124,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
it.res = &ResObjectPut{
statusRes: res.statusRes,
obj: id,
epoch: res.epoch,
}
if it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
return true, apistatus.ErrFromStatus(it.res.st)

View file

@ -247,6 +247,7 @@ func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) {
ParentID: parID,
SelfID: id,
ParentHeader: parHdr,
Epoch: curEpoch,
}, nil
}

View file

@ -14,6 +14,7 @@ type AccessIdentifiers struct {
ParentID *oid.ID
SelfID oid.ID
ParentHeader *object.Object
Epoch uint64
}
// EpochSource is a source for the current epoch.

View file

@ -142,8 +142,8 @@ func (m *mockClient) netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.
return nm, nil
}
func (m *mockClient) objectPut(context.Context, PrmObjectPut) (oid.ID, error) {
return oid.ID{}, nil
func (m *mockClient) objectPut(context.Context, PrmObjectPut) (oid.ID, uint64, error) {
return oid.ID{}, 0, nil
}
func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {

View file

@ -72,6 +72,7 @@ func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk [
type ResObjectPut struct {
Status apistatus.Status
OID oid.ID
Epoch uint64
}
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
@ -81,8 +82,11 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
return nil, err
}
if ai != nil && ai.ParentID != nil {
x.it.res.OID = *ai.ParentID
if ai != nil {
x.it.res.Epoch = ai.Epoch
if ai.ParentID != nil {
x.it.res.OID = *ai.ParentID
}
}
return &x.it.res, nil
}
@ -128,6 +132,7 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err
if res != nil {
it.res.Status = res.Status()
it.res.OID = res.StoredObjectID()
it.res.Epoch = res.StoredEpoch()
}
return err
}
@ -154,6 +159,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
it.res = ResObjectPut{
Status: res.Status(),
OID: id,
Epoch: res.Epoch(),
}
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
return true, apistatus.ErrFromStatus(it.res.Status)

View file

@ -67,7 +67,7 @@ type client interface {
// see clientWrapper.netMapSnapshot
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
// see clientWrapper.objectPut.
objectPut(context.Context, PrmObjectPut) (oid.ID, error)
objectPut(context.Context, PrmObjectPut) (oid.ID, uint64, error)
// see clientWrapper.objectDelete.
objectDelete(context.Context, PrmObjectDelete) error
// see clientWrapper.objectGet.
@ -798,7 +798,7 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
}
// objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, uint64, error) {
if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut
}
@ -810,10 +810,10 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
return c.objectPutServerCut(ctx, prm)
}
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, uint64, error) {
cl, err := c.getClient()
if err != nil {
return oid.ID{}, err
return oid.ID{}, 0, err
}
cliPrm := sdkClient.PrmObjectPutInit{
@ -827,7 +827,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
return oid.ID{}, 0, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
@ -868,7 +868,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
return oid.ID{}, 0, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
@ -879,13 +879,13 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
return oid.ID{}, 0, fmt.Errorf("client failure: %w", err)
}
return res.StoredObjectID(), nil
return res.StoredObjectID(), res.StoredEpoch(), nil
}
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, uint64, error) {
putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPut: prm,
}
@ -894,7 +894,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
wObj, err := c.objectPutInitTransformer(putInitPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
return oid.ID{}, 0, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
@ -935,7 +935,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
return oid.ID{}, 0, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
@ -946,10 +946,10 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
st = res.Status
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
return oid.ID{}, 0, fmt.Errorf("client failure: %w", err)
}
return res.OID, nil
return res.OID, res.Epoch, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
@ -2448,7 +2448,7 @@ func (p *Pool) fillAppropriateKey(prm *prmCommon) {
// PutObject writes an object through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, uint64, error) {
cnr, _ := prm.hdr.ContainerID()
var prmCtx prmContext
@ -2461,13 +2461,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return oid.ID{}, fmt.Errorf("init call context: %w", err)
return oid.ID{}, 0, fmt.Errorf("init call context: %w", err)
}
if ctxCall.sessionDefault {
ctxCall.sessionTarget = prm.UseSession
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
return oid.ID{}, fmt.Errorf("open default session: %w", err)
return oid.ID{}, 0, fmt.Errorf("open default session: %w", err)
}
}
@ -2478,14 +2478,14 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
prm.setNetworkInfo(ni)
}
id, err := ctxCall.client.objectPut(ctx, prm)
id, epoch, err := ctxCall.client.objectPut(ctx, prm)
if err != nil {
// removes session token from cache in case of token error
p.checkSessionTokenErr(err, ctxCall.endpoint)
return id, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
return id, epoch, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
}
return id, nil
return id, epoch, nil
}
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.

View file

@ -314,7 +314,7 @@ func TestSessionCache(t *testing.T) {
var prm2 PrmObjectPut
prm2.SetHeader(object.Object{})
_, err = pool.PutObject(ctx, prm2)
_, _, err = pool.PutObject(ctx, prm2)
require.NoError(t, err)
// cache must contain session token