From 7e94a6adf2baaade80c788226ba2b6b4fbcd90a9 Mon Sep 17 00:00:00 2001 From: Marina Biryukova Date: Fri, 12 Jul 2024 14:35:01 +0300 Subject: [PATCH] [#237] pool: Return creation epoch from object put Signed-off-by: Marina Biryukova --- client/object_put.go | 8 ++++- client/object_put_raw.go | 1 + client/object_put_single.go | 8 +++++ client/object_put_transformer.go | 8 +++-- object/transformer/transformer.go | 1 + object/transformer/types.go | 1 + pool/mock_test.go | 5 ++- pool/object_put_pool_transformer.go | 10 ++++-- pool/pool.go | 50 ++++++++++++++++++----------- 9 files changed, 65 insertions(+), 27 deletions(-) diff --git a/client/object_put.go b/client/object_put.go index bf27f4c3..07ca8409 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -72,7 +72,8 @@ func (x *PrmObjectPutInit) SetGRPCPayloadChunkLen(v int) { type ResObjectPut struct { statusRes - obj oid.ID + obj oid.ID + epoch uint64 } // StoredObjectID returns identifier of the saved object. @@ -80,6 +81,11 @@ func (x ResObjectPut) StoredObjectID() oid.ID { return x.obj } +// StoredEpoch returns creation epoch of the saved object. +func (x ResObjectPut) StoredEpoch() uint64 { + return x.epoch +} + // ObjectWriter is designed to write one object or // multiple parts of one object to FrostFS system. // diff --git a/client/object_put_raw.go b/client/object_put_raw.go index dd210a8c..c22bd052 100644 --- a/client/object_put_raw.go +++ b/client/object_put_raw.go @@ -175,6 +175,7 @@ func (x *objectWriterRaw) Close(_ context.Context) (*ResObjectPut, error) { if x.err != nil { x.err = newErrInvalidResponseField(fieldID, x.err) } + x.res.epoch = x.respV2.GetMetaHeader().GetEpoch() return &x.res, nil } diff --git a/client/object_put_single.go b/client/object_put_single.go index 8eaeca99..bdef026a 100644 --- a/client/object_put_single.go +++ b/client/object_put_single.go @@ -93,6 +93,13 @@ func (prm *PrmObjectPutSingle) SetObject(o *v2object.Object) { // ResObjectPutSingle groups resulting values of PutSingle operation. type ResObjectPutSingle struct { statusRes + + epoch uint64 +} + +// Epoch returns creation epoch of the saved object. +func (r *ResObjectPutSingle) Epoch() uint64 { + return r.epoch } func (prm *PrmObjectPutSingle) buildRequest(c *Client) (*v2object.PutSingleRequest, error) { @@ -162,6 +169,7 @@ func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (* if err != nil { return nil, err } + res.epoch = resp.GetMetaHeader().GetEpoch() return &res, nil } diff --git a/client/object_put_transformer.go b/client/object_put_transformer.go index 2347a4fa..5314a128 100644 --- a/client/object_put_transformer.go +++ b/client/object_put_transformer.go @@ -58,8 +58,11 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err return nil, err } - if ai != nil && ai.ParentID != nil { - x.it.res.obj = *ai.ParentID + if ai != nil { + x.it.res.epoch = ai.Epoch + if ai.ParentID != nil { + x.it.res.obj = *ai.ParentID + } } return x.it.res, nil } @@ -121,6 +124,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b it.res = &ResObjectPut{ statusRes: res.statusRes, obj: id, + epoch: res.epoch, } if it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) { return true, apistatus.ErrFromStatus(it.res.st) diff --git a/object/transformer/transformer.go b/object/transformer/transformer.go index 37d5471f..4cf8c30e 100644 --- a/object/transformer/transformer.go +++ b/object/transformer/transformer.go @@ -247,6 +247,7 @@ func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) { ParentID: parID, SelfID: id, ParentHeader: parHdr, + Epoch: curEpoch, }, nil } diff --git a/object/transformer/types.go b/object/transformer/types.go index 212f4537..1cda4251 100644 --- a/object/transformer/types.go +++ b/object/transformer/types.go @@ -14,6 +14,7 @@ type AccessIdentifiers struct { ParentID *oid.ID SelfID oid.ID ParentHeader *object.Object + Epoch uint64 } // EpochSource is a source for the current epoch. diff --git a/pool/mock_test.go b/pool/mock_test.go index fef6f412..03a981b3 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -17,7 +17,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "github.com/google/uuid" ) @@ -142,8 +141,8 @@ func (m *mockClient) netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap. return nm, nil } -func (m *mockClient) objectPut(context.Context, PrmObjectPut) (oid.ID, error) { - return oid.ID{}, nil +func (m *mockClient) objectPut(context.Context, PrmObjectPut) (ResPutObject, error) { + return ResPutObject{}, nil } func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error { diff --git a/pool/object_put_pool_transformer.go b/pool/object_put_pool_transformer.go index 099a12dd..e596aeb3 100644 --- a/pool/object_put_pool_transformer.go +++ b/pool/object_put_pool_transformer.go @@ -72,6 +72,7 @@ func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk [ type ResObjectPut struct { Status apistatus.Status OID oid.ID + Epoch uint64 } // Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing. @@ -81,8 +82,11 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err return nil, err } - if ai != nil && ai.ParentID != nil { - x.it.res.OID = *ai.ParentID + if ai != nil { + x.it.res.Epoch = ai.Epoch + if ai.ParentID != nil { + x.it.res.OID = *ai.ParentID + } } return &x.it.res, nil } @@ -128,6 +132,7 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err if res != nil { it.res.Status = res.Status() it.res.OID = res.StoredObjectID() + it.res.Epoch = res.StoredEpoch() } return err } @@ -154,6 +159,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b it.res = ResObjectPut{ Status: res.Status(), OID: id, + Epoch: res.Epoch(), } if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) { return true, apistatus.ErrFromStatus(it.res.Status) diff --git a/pool/pool.go b/pool/pool.go index 0cffc303..651702b8 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -67,7 +67,7 @@ type client interface { // see clientWrapper.netMapSnapshot netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error) // see clientWrapper.objectPut. - objectPut(context.Context, PrmObjectPut) (oid.ID, error) + objectPut(context.Context, PrmObjectPut) (ResPutObject, error) // see clientWrapper.objectDelete. objectDelete(context.Context, PrmObjectDelete) error // see clientWrapper.objectGet. @@ -798,7 +798,7 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot) } // objectPut writes object to FrostFS. -func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { if prm.bufferMaxSize == 0 { prm.bufferMaxSize = defaultBufferMaxSizeForPut } @@ -810,10 +810,10 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID return c.objectPutServerCut(ctx, prm) } -func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { cl, err := c.getClient() if err != nil { - return oid.ID{}, err + return ResPutObject{}, err } cliPrm := sdkClient.PrmObjectPutInit{ @@ -827,7 +827,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut wObj, err := cl.ObjectPutInit(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(ctx, nil, err); err != nil { - return oid.ID{}, fmt.Errorf("init writing on API client: %w", err) + return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) } if wObj.WriteHeader(ctx, prm.hdr) { @@ -868,7 +868,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut break } - return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) + return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) } } } @@ -879,13 +879,16 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut st = res.Status() } if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors - return oid.ID{}, fmt.Errorf("client failure: %w", err) + return ResPutObject{}, fmt.Errorf("client failure: %w", err) } - return res.StoredObjectID(), nil + return ResPutObject{ + ObjectID: res.StoredObjectID(), + Epoch: res.StoredEpoch(), + }, nil } -func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { putInitPrm := PrmObjectPutClientCutInit{ PrmObjectPut: prm, } @@ -894,7 +897,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut wObj, err := c.objectPutInitTransformer(putInitPrm) c.incRequests(time.Since(start), methodObjectPut) if err = c.handleError(ctx, nil, err); err != nil { - return oid.ID{}, fmt.Errorf("init writing on API client: %w", err) + return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err) } if wObj.WriteHeader(ctx, prm.hdr) { @@ -935,7 +938,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut break } - return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) + return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err)) } } } @@ -946,10 +949,13 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut st = res.Status } if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors - return oid.ID{}, fmt.Errorf("client failure: %w", err) + return ResPutObject{}, fmt.Errorf("client failure: %w", err) } - return res.OID, nil + return ResPutObject{ + ObjectID: res.OID, + Epoch: res.Epoch, + }, nil } // objectDelete invokes sdkClient.ObjectDelete parse response status to error. @@ -2445,10 +2451,16 @@ func (p *Pool) fillAppropriateKey(prm *prmCommon) { } } +// ResPutObject is designed to provide identifier and creation epoch of the saved object. +type ResPutObject struct { + ObjectID oid.ID + Epoch uint64 +} + // PutObject writes an object through a remote server using FrostFS API protocol. // // Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) { cnr, _ := prm.hdr.ContainerID() var prmCtx prmContext @@ -2461,13 +2473,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) var ctxCall callContext ctxCall.sessionClientCut = prm.clientCut if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil { - return oid.ID{}, fmt.Errorf("init call context: %w", err) + return ResPutObject{}, fmt.Errorf("init call context: %w", err) } if ctxCall.sessionDefault { ctxCall.sessionTarget = prm.UseSession if err := p.openDefaultSession(ctx, &ctxCall); err != nil { - return oid.ID{}, fmt.Errorf("open default session: %w", err) + return ResPutObject{}, fmt.Errorf("open default session: %w", err) } } @@ -2478,14 +2490,14 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) prm.setNetworkInfo(ni) } - id, err := ctxCall.client.objectPut(ctx, prm) + res, err := ctxCall.client.objectPut(ctx, prm) if err != nil { // removes session token from cache in case of token error p.checkSessionTokenErr(err, ctxCall.endpoint) - return id, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err) + return ResPutObject{}, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err) } - return id, nil + return res, nil } // DeleteObject marks an object for deletion from the container using FrostFS API protocol.