[#237] pool: Return creation epoch from object put

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-07-12 14:35:01 +03:00 committed by Alexey Vanin
parent ce8270568d
commit 7e94a6adf2
9 changed files with 65 additions and 27 deletions

View file

@ -73,6 +73,7 @@ type ResObjectPut struct {
statusRes
obj oid.ID
epoch uint64
}
// StoredObjectID returns identifier of the saved object.
@ -80,6 +81,11 @@ func (x ResObjectPut) StoredObjectID() oid.ID {
return x.obj
}
// StoredEpoch returns creation epoch of the saved object.
func (x ResObjectPut) StoredEpoch() uint64 {
return x.epoch
}
// ObjectWriter is designed to write one object or
// multiple parts of one object to FrostFS system.
//

View file

@ -175,6 +175,7 @@ func (x *objectWriterRaw) Close(_ context.Context) (*ResObjectPut, error) {
if x.err != nil {
x.err = newErrInvalidResponseField(fieldID, x.err)
}
x.res.epoch = x.respV2.GetMetaHeader().GetEpoch()
return &x.res, nil
}

View file

@ -93,6 +93,13 @@ func (prm *PrmObjectPutSingle) SetObject(o *v2object.Object) {
// ResObjectPutSingle groups resulting values of PutSingle operation.
type ResObjectPutSingle struct {
statusRes
epoch uint64
}
// Epoch returns creation epoch of the saved object.
func (r *ResObjectPutSingle) Epoch() uint64 {
return r.epoch
}
func (prm *PrmObjectPutSingle) buildRequest(c *Client) (*v2object.PutSingleRequest, error) {
@ -162,6 +169,7 @@ func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (*
if err != nil {
return nil, err
}
res.epoch = resp.GetMetaHeader().GetEpoch()
return &res, nil
}

View file

@ -58,9 +58,12 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
return nil, err
}
if ai != nil && ai.ParentID != nil {
if ai != nil {
x.it.res.epoch = ai.Epoch
if ai.ParentID != nil {
x.it.res.obj = *ai.ParentID
}
}
return x.it.res, nil
}
@ -121,6 +124,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
it.res = &ResObjectPut{
statusRes: res.statusRes,
obj: id,
epoch: res.epoch,
}
if it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
return true, apistatus.ErrFromStatus(it.res.st)

View file

@ -247,6 +247,7 @@ func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) {
ParentID: parID,
SelfID: id,
ParentHeader: parHdr,
Epoch: curEpoch,
}, nil
}

View file

@ -14,6 +14,7 @@ type AccessIdentifiers struct {
ParentID *oid.ID
SelfID oid.ID
ParentHeader *object.Object
Epoch uint64
}
// EpochSource is a source for the current epoch.

View file

@ -17,7 +17,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"github.com/google/uuid"
)
@ -142,8 +141,8 @@ func (m *mockClient) netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.
return nm, nil
}
func (m *mockClient) objectPut(context.Context, PrmObjectPut) (oid.ID, error) {
return oid.ID{}, nil
func (m *mockClient) objectPut(context.Context, PrmObjectPut) (ResPutObject, error) {
return ResPutObject{}, nil
}
func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {

View file

@ -72,6 +72,7 @@ func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk [
type ResObjectPut struct {
Status apistatus.Status
OID oid.ID
Epoch uint64
}
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
@ -81,9 +82,12 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
return nil, err
}
if ai != nil && ai.ParentID != nil {
if ai != nil {
x.it.res.Epoch = ai.Epoch
if ai.ParentID != nil {
x.it.res.OID = *ai.ParentID
}
}
return &x.it.res, nil
}
@ -128,6 +132,7 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err
if res != nil {
it.res.Status = res.Status()
it.res.OID = res.StoredObjectID()
it.res.Epoch = res.StoredEpoch()
}
return err
}
@ -154,6 +159,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
it.res = ResObjectPut{
Status: res.Status(),
OID: id,
Epoch: res.Epoch(),
}
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
return true, apistatus.ErrFromStatus(it.res.Status)

View file

@ -67,7 +67,7 @@ type client interface {
// see clientWrapper.netMapSnapshot
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
// see clientWrapper.objectPut.
objectPut(context.Context, PrmObjectPut) (oid.ID, error)
objectPut(context.Context, PrmObjectPut) (ResPutObject, error)
// see clientWrapper.objectDelete.
objectDelete(context.Context, PrmObjectDelete) error
// see clientWrapper.objectGet.
@ -798,7 +798,7 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
}
// objectPut writes object to FrostFS.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
if prm.bufferMaxSize == 0 {
prm.bufferMaxSize = defaultBufferMaxSizeForPut
}
@ -810,10 +810,10 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
return c.objectPutServerCut(ctx, prm)
}
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
cl, err := c.getClient()
if err != nil {
return oid.ID{}, err
return ResPutObject{}, err
}
cliPrm := sdkClient.PrmObjectPutInit{
@ -827,7 +827,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
@ -868,7 +868,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
@ -879,13 +879,16 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
st = res.Status()
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
return ResPutObject{}, fmt.Errorf("client failure: %w", err)
}
return res.StoredObjectID(), nil
return ResPutObject{
ObjectID: res.StoredObjectID(),
Epoch: res.StoredEpoch(),
}, nil
}
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
putInitPrm := PrmObjectPutClientCutInit{
PrmObjectPut: prm,
}
@ -894,7 +897,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
wObj, err := c.objectPutInitTransformer(putInitPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err)
}
if wObj.WriteHeader(ctx, prm.hdr) {
@ -935,7 +938,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
@ -946,10 +949,13 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
st = res.Status
}
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
return ResPutObject{}, fmt.Errorf("client failure: %w", err)
}
return res.OID, nil
return ResPutObject{
ObjectID: res.OID,
Epoch: res.Epoch,
}, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
@ -2445,10 +2451,16 @@ func (p *Pool) fillAppropriateKey(prm *prmCommon) {
}
}
// ResPutObject is designed to provide identifier and creation epoch of the saved object.
type ResPutObject struct {
ObjectID oid.ID
Epoch uint64
}
// PutObject writes an object through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
cnr, _ := prm.hdr.ContainerID()
var prmCtx prmContext
@ -2461,13 +2473,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
return oid.ID{}, fmt.Errorf("init call context: %w", err)
return ResPutObject{}, fmt.Errorf("init call context: %w", err)
}
if ctxCall.sessionDefault {
ctxCall.sessionTarget = prm.UseSession
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
return oid.ID{}, fmt.Errorf("open default session: %w", err)
return ResPutObject{}, fmt.Errorf("open default session: %w", err)
}
}
@ -2478,14 +2490,14 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
prm.setNetworkInfo(ni)
}
id, err := ctxCall.client.objectPut(ctx, prm)
res, err := ctxCall.client.objectPut(ctx, prm)
if err != nil {
// removes session token from cache in case of token error
p.checkSessionTokenErr(err, ctxCall.endpoint)
return id, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
return ResPutObject{}, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
}
return id, nil
return res, nil
}
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.