From 8d764f37aa3963970ae8b7dd72585c6b8f792f29 Mon Sep 17 00:00:00 2001 From: Marina Biryukova Date: Fri, 12 Jul 2024 14:35:01 +0300 Subject: [PATCH] [#xxx] pool: Return creation epoch from object put Signed-off-by: Marina Biryukova --- client/object_put.go | 7 +++++- client/object_put_raw.go | 1 + client/object_put_single.go | 7 ++++++ client/object_put_transformer.go | 8 ++++-- object/transformer/transformer.go | 1 + object/transformer/types.go | 1 + pool/mock_test.go | 4 +-- pool/object_put_pool_transformer.go | 10 ++++++-- pool/pool.go | 38 ++++++++++++++--------------- pool/pool_test.go | 2 +- 10 files changed, 52 insertions(+), 27 deletions(-) diff --git a/client/object_put.go b/client/object_put.go index bf27f4c..691f459 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -72,7 +72,8 @@ func (x *PrmObjectPutInit) SetGRPCPayloadChunkLen(v int) { type ResObjectPut struct { statusRes - obj oid.ID + obj oid.ID + epoch uint64 } // StoredObjectID returns identifier of the saved object. @@ -80,6 +81,10 @@ func (x ResObjectPut) StoredObjectID() oid.ID { return x.obj } +func (x ResObjectPut) StoredEpoch() uint64 { + return x.epoch +} + // ObjectWriter is designed to write one object or // multiple parts of one object to FrostFS system. // diff --git a/client/object_put_raw.go b/client/object_put_raw.go index dd210a8..c22bd05 100644 --- a/client/object_put_raw.go +++ b/client/object_put_raw.go @@ -175,6 +175,7 @@ func (x *objectWriterRaw) Close(_ context.Context) (*ResObjectPut, error) { if x.err != nil { x.err = newErrInvalidResponseField(fieldID, x.err) } + x.res.epoch = x.respV2.GetMetaHeader().GetEpoch() return &x.res, nil } diff --git a/client/object_put_single.go b/client/object_put_single.go index 8eaeca9..0fbcdfa 100644 --- a/client/object_put_single.go +++ b/client/object_put_single.go @@ -93,6 +93,12 @@ func (prm *PrmObjectPutSingle) SetObject(o *v2object.Object) { // ResObjectPutSingle groups resulting values of PutSingle operation. type ResObjectPutSingle struct { statusRes + + epoch uint64 +} + +func (r *ResObjectPutSingle) Epoch() uint64 { + return r.epoch } func (prm *PrmObjectPutSingle) buildRequest(c *Client) (*v2object.PutSingleRequest, error) { @@ -162,6 +168,7 @@ func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (* if err != nil { return nil, err } + res.epoch = resp.GetMetaHeader().GetEpoch() return &res, nil } diff --git a/client/object_put_transformer.go b/client/object_put_transformer.go index 2347a4f..5314a12 100644 --- a/client/object_put_transformer.go +++ b/client/object_put_transformer.go @@ -58,8 +58,11 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err return nil, err } - if ai != nil && ai.ParentID != nil { - x.it.res.obj = *ai.ParentID + if ai != nil { + x.it.res.epoch = ai.Epoch + if ai.ParentID != nil { + x.it.res.obj = *ai.ParentID + } } return x.it.res, nil } @@ -121,6 +124,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b it.res = &ResObjectPut{ statusRes: res.statusRes, obj: id, + epoch: res.epoch, } if it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) { return true, apistatus.ErrFromStatus(it.res.st) diff --git a/object/transformer/transformer.go b/object/transformer/transformer.go index 37d5471..4cf8c30 100644 --- a/object/transformer/transformer.go +++ b/object/transformer/transformer.go @@ -247,6 +247,7 @@ func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) { ParentID: parID, SelfID: id, ParentHeader: parHdr, + Epoch: curEpoch, }, nil } diff --git a/object/transformer/types.go b/object/transformer/types.go index 212f453..1cda425 100644 --- a/object/transformer/types.go +++ b/object/transformer/types.go @@ -14,6 +14,7 @@ type AccessIdentifiers struct { ParentID *oid.ID SelfID oid.ID ParentHeader *object.Object + Epoch uint64 } // EpochSource is a source for the current epoch. diff --git a/pool/mock_test.go b/pool/mock_test.go index fef6f41..70cfe58 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -142,8 +142,8 @@ func (m *mockClient) netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap. return nm, nil } -func (m *mockClient) objectPut(context.Context, PrmObjectPut) (oid.ID, error) { - return oid.ID{}, nil +func (m *mockClient) objectPut(context.Context, PrmObjectPut) (oid.ID, uint64, error) { + return oid.ID{}, 0, nil } func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error { diff --git a/pool/object_put_pool_transformer.go b/pool/object_put_pool_transformer.go index 099a12d..e596aeb 100644 --- a/pool/object_put_pool_transformer.go +++ b/pool/object_put_pool_transformer.go @@ -72,6 +72,7 @@ func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk [ type ResObjectPut struct { Status apistatus.Status OID oid.ID + Epoch uint64 } // Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing. @@ -81,8 +82,11 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err return nil, err } - if ai != nil && ai.ParentID != nil { - x.it.res.OID = *ai.ParentID + if ai != nil { + x.it.res.Epoch = ai.Epoch + if ai.ParentID != nil { + x.it.res.OID = *ai.ParentID + } } return &x.it.res, nil } @@ -128,6 +132,7 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err if res != nil { it.res.Status = res.Status() it.res.OID = res.StoredObjectID() + it.res.Epoch = res.StoredEpoch() } return err } @@ -154,6 +159,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b it.res = ResObjectPut{ Status: res.Status(), OID: id, + Epoch: res.Epoch(), } if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) { return true, apistatus.ErrFromStatus(it.res.Status) diff --git a/pool/pool.go b/pool/pool.go index 0cffc30..036b49f 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -67,7 +67,7 @@ type client interface { // see clientWrapper.netMapSnapshot netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error) // see clientWrapper.objectPut. - objectPut(context.Context, PrmObjectPut) (oid.ID, error) + objectPut(context.Context, PrmObjectPut) (oid.ID, uint64, error) // see clientWrapper.objectDelete. objectDelete(context.Context, PrmObjectDelete) error // see clientWrapper.objectGet. @@ -798,7 +798,7 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) } // objectPut writes object to FrostFS. -func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, uint64, error) { if prm.bufferMaxSize == 0 { prm.bufferMaxSize = defaultBufferMaxSizeForPut } @@ -810,10 +810,10 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID return c.objectPutServerCut(ctx, prm) } -func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, uint64, error) { cl, err := c.getClient() if err != nil { - return oid.ID{}, err + return oid.ID{}, 0, err } cliPrm := sdkClient.PrmObjectPutInit{ @@ -827,7 +827,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut wObj, err := cl.ObjectPutInit(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(ctx, nil, err); err != nil { - return oid.ID{}, fmt.Errorf("init writing on API client: %w", err) + return oid.ID{}, 0, fmt.Errorf("init writing on API client: %w", err) } if wObj.WriteHeader(ctx, prm.hdr) { @@ -868,7 +868,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut break } - return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) + return oid.ID{}, 0, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) } } } @@ -879,13 +879,13 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors - return oid.ID{}, fmt.Errorf("client failure: %w", err) + return oid.ID{}, 0, fmt.Errorf("client failure: %w", err) } - return res.StoredObjectID(), nil + return res.StoredObjectID(), res.StoredEpoch(), nil } -func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, uint64, error) { putInitPrm := PrmObjectPutClientCutInit{ PrmObjectPut: prm, } @@ -894,7 +894,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut wObj, err := c.objectPutInitTransformer(putInitPrm) c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(ctx, nil, err); err != nil { - return oid.ID{}, fmt.Errorf("init writing on API client: %w", err) + return oid.ID{}, 0, fmt.Errorf("init writing on API client: %w", err) } if wObj.WriteHeader(ctx, prm.hdr) { @@ -935,7 +935,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut break } - return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) + return oid.ID{}, 0, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) } } } @@ -946,10 +946,10 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut st = res.Status } if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors - return oid.ID{}, fmt.Errorf("client failure: %w", err) + return oid.ID{}, 0, fmt.Errorf("client failure: %w", err) } - return res.OID, nil + return res.OID, res.Epoch, nil } // objectDelete invokes sdkClient.ObjectDelete parse response status to error. @@ -2448,7 +2448,7 @@ func (p *Pool) fillAppropriateKey(prm *prmCommon) { // PutObject writes an object through a remote server using FrostFS API protocol. // // Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, uint64, error) { cnr, _ := prm.hdr.ContainerID() var prmCtx prmContext @@ -2461,13 +2461,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) var ctxCall callContext ctxCall.sessionClientCut = prm.clientCut if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { - return oid.ID{}, fmt.Errorf("init call context: %w", err) + return oid.ID{}, 0, fmt.Errorf("init call context: %w", err) } if ctxCall.sessionDefault { ctxCall.sessionTarget = prm.UseSession if err := p.openDefaultSession(ctx, &ctxCall); err != nil { - return oid.ID{}, fmt.Errorf("open default session: %w", err) + return oid.ID{}, 0, fmt.Errorf("open default session: %w", err) } } @@ -2478,14 +2478,14 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) prm.setNetworkInfo(ni) } - id, err := ctxCall.client.objectPut(ctx, prm) + id, epoch, err := ctxCall.client.objectPut(ctx, prm) if err != nil { // removes session token from cache in case of token error p.checkSessionTokenErr(err, ctxCall.endpoint) - return id, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err) + return id, epoch, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err) } - return id, nil + return id, epoch, nil } // DeleteObject marks an object for deletion from the container using FrostFS API protocol. diff --git a/pool/pool_test.go b/pool/pool_test.go index c8721b9..0d54a74 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -314,7 +314,7 @@ func TestSessionCache(t *testing.T) { var prm2 PrmObjectPut prm2.SetHeader(object.Object{}) - _, err = pool.PutObject(ctx, prm2) + _, _, err = pool.PutObject(ctx, prm2) require.NoError(t, err) // cache must contain session token