[#237] pool: Return creation epoch from object put #237
9 changed files with 65 additions and 27 deletions
|
@ -72,7 +72,8 @@ func (x *PrmObjectPutInit) SetGRPCPayloadChunkLen(v int) {
|
|||
type ResObjectPut struct {
|
||||
statusRes
|
||||
|
||||
obj oid.ID
|
||||
obj oid.ID
|
||||
epoch uint64
|
||||
}
|
||||
|
||||
// StoredObjectID returns identifier of the saved object.
|
||||
|
@ -80,6 +81,11 @@ func (x ResObjectPut) StoredObjectID() oid.ID {
|
|||
return x.obj
|
||||
}
|
||||
|
||||
// StoredEpoch returns creation epoch of the saved object.
|
||||
dkirillov marked this conversation as resolved
Outdated
|
||||
func (x ResObjectPut) StoredEpoch() uint64 {
|
||||
return x.epoch
|
||||
}
|
||||
|
||||
// ObjectWriter is designed to write one object or
|
||||
// multiple parts of one object to FrostFS system.
|
||||
//
|
||||
|
|
|
@ -175,6 +175,7 @@ func (x *objectWriterRaw) Close(_ context.Context) (*ResObjectPut, error) {
|
|||
if x.err != nil {
|
||||
x.err = newErrInvalidResponseField(fieldID, x.err)
|
||||
}
|
||||
x.res.epoch = x.respV2.GetMetaHeader().GetEpoch()
|
||||
|
||||
return &x.res, nil
|
||||
}
|
||||
|
|
|
@ -93,6 +93,13 @@ func (prm *PrmObjectPutSingle) SetObject(o *v2object.Object) {
|
|||
// ResObjectPutSingle groups resulting values of PutSingle operation.
|
||||
type ResObjectPutSingle struct {
|
||||
statusRes
|
||||
|
||||
epoch uint64
|
||||
}
|
||||
|
||||
// Epoch returns creation epoch of the saved object.
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Also need comment Also need comment
|
||||
func (r *ResObjectPutSingle) Epoch() uint64 {
|
||||
return r.epoch
|
||||
}
|
||||
|
||||
func (prm *PrmObjectPutSingle) buildRequest(c *Client) (*v2object.PutSingleRequest, error) {
|
||||
|
@ -162,6 +169,7 @@ func (c *Client) ObjectPutSingle(ctx context.Context, prm PrmObjectPutSingle) (*
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res.epoch = resp.GetMetaHeader().GetEpoch()
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
|
|
@ -58,8 +58,11 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if ai != nil && ai.ParentID != nil {
|
||||
x.it.res.obj = *ai.ParentID
|
||||
if ai != nil {
|
||||
x.it.res.epoch = ai.Epoch
|
||||
if ai.ParentID != nil {
|
||||
x.it.res.obj = *ai.ParentID
|
||||
}
|
||||
}
|
||||
return x.it.res, nil
|
||||
}
|
||||
|
@ -121,6 +124,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
|
|||
it.res = &ResObjectPut{
|
||||
statusRes: res.statusRes,
|
||||
obj: id,
|
||||
epoch: res.epoch,
|
||||
}
|
||||
if it.client.prm.DisableFrostFSErrorResolution && !apistatus.IsSuccessful(it.res.st) {
|
||||
return true, apistatus.ErrFromStatus(it.res.st)
|
||||
|
|
|
@ -247,6 +247,7 @@ func (s *payloadSizeLimiter) fillHeader() (*AccessIdentifiers, error) {
|
|||
ParentID: parID,
|
||||
SelfID: id,
|
||||
ParentHeader: parHdr,
|
||||
Epoch: curEpoch,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ type AccessIdentifiers struct {
|
|||
ParentID *oid.ID
|
||||
SelfID oid.ID
|
||||
ParentHeader *object.Object
|
||||
Epoch uint64
|
||||
}
|
||||
|
||||
// EpochSource is a source for the current epoch.
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
@ -142,8 +141,8 @@ func (m *mockClient) netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.
|
|||
return nm, nil
|
||||
}
|
||||
|
||||
func (m *mockClient) objectPut(context.Context, PrmObjectPut) (oid.ID, error) {
|
||||
return oid.ID{}, nil
|
||||
func (m *mockClient) objectPut(context.Context, PrmObjectPut) (ResPutObject, error) {
|
||||
return ResPutObject{}, nil
|
||||
}
|
||||
|
||||
func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {
|
||||
|
|
|
@ -72,6 +72,7 @@ func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk [
|
|||
type ResObjectPut struct {
|
||||
Status apistatus.Status
|
||||
OID oid.ID
|
||||
Epoch uint64
|
||||
}
|
||||
|
||||
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
|
||||
|
@ -81,8 +82,11 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if ai != nil && ai.ParentID != nil {
|
||||
x.it.res.OID = *ai.ParentID
|
||||
if ai != nil {
|
||||
x.it.res.Epoch = ai.Epoch
|
||||
if ai.ParentID != nil {
|
||||
x.it.res.OID = *ai.ParentID
|
||||
}
|
||||
}
|
||||
return &x.it.res, nil
|
||||
}
|
||||
|
@ -128,6 +132,7 @@ func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) err
|
|||
if res != nil {
|
||||
it.res.Status = res.Status()
|
||||
it.res.OID = res.StoredObjectID()
|
||||
it.res.Epoch = res.StoredEpoch()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -154,6 +159,7 @@ func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (b
|
|||
it.res = ResObjectPut{
|
||||
Status: res.Status(),
|
||||
OID: id,
|
||||
Epoch: res.Epoch(),
|
||||
}
|
||||
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
|
||||
return true, apistatus.ErrFromStatus(it.res.Status)
|
||||
|
|
50
pool/pool.go
50
pool/pool.go
|
@ -67,7 +67,7 @@ type client interface {
|
|||
// see clientWrapper.netMapSnapshot
|
||||
netMapSnapshot(context.Context, prmNetMapSnapshot) (netmap.NetMap, error)
|
||||
// see clientWrapper.objectPut.
|
||||
objectPut(context.Context, PrmObjectPut) (oid.ID, error)
|
||||
objectPut(context.Context, PrmObjectPut) (ResPutObject, error)
|
||||
// see clientWrapper.objectDelete.
|
||||
objectDelete(context.Context, PrmObjectDelete) error
|
||||
// see clientWrapper.objectGet.
|
||||
|
@ -798,7 +798,7 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
|
|||
}
|
||||
|
||||
// objectPut writes object to FrostFS.
|
||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
||||
if prm.bufferMaxSize == 0 {
|
||||
prm.bufferMaxSize = defaultBufferMaxSizeForPut
|
||||
}
|
||||
|
@ -810,10 +810,10 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
|||
return c.objectPutServerCut(ctx, prm)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
||||
cl, err := c.getClient()
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
return ResPutObject{}, err
|
||||
}
|
||||
|
||||
cliPrm := sdkClient.PrmObjectPutInit{
|
||||
|
@ -827,7 +827,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
|
|||
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
|
||||
c.incRequests(time.Since(start), methodObjectPut)
|
||||
if err = c.handleError(ctx, nil, err); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
||||
return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err)
|
||||
}
|
||||
|
||||
if wObj.WriteHeader(ctx, prm.hdr) {
|
||||
|
@ -868,7 +868,7 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
|
|||
break
|
||||
}
|
||||
|
||||
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
|
||||
return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -879,13 +879,16 @@ func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut
|
|||
st = res.Status()
|
||||
}
|
||||
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
|
||||
return oid.ID{}, fmt.Errorf("client failure: %w", err)
|
||||
return ResPutObject{}, fmt.Errorf("client failure: %w", err)
|
||||
}
|
||||
|
||||
return res.StoredObjectID(), nil
|
||||
return ResPutObject{
|
||||
ObjectID: res.StoredObjectID(),
|
||||
Epoch: res.StoredEpoch(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
||||
putInitPrm := PrmObjectPutClientCutInit{
|
||||
PrmObjectPut: prm,
|
||||
}
|
||||
|
@ -894,7 +897,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
|
|||
wObj, err := c.objectPutInitTransformer(putInitPrm)
|
||||
c.incRequests(time.Since(start), methodObjectPut)
|
||||
if err = c.handleError(ctx, nil, err); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
||||
return ResPutObject{}, fmt.Errorf("init writing on API client: %w", err)
|
||||
}
|
||||
|
||||
if wObj.WriteHeader(ctx, prm.hdr) {
|
||||
|
@ -935,7 +938,7 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
|
|||
break
|
||||
}
|
||||
|
||||
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
|
||||
return ResPutObject{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -946,10 +949,13 @@ func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut
|
|||
st = res.Status
|
||||
}
|
||||
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
|
||||
return oid.ID{}, fmt.Errorf("client failure: %w", err)
|
||||
return ResPutObject{}, fmt.Errorf("client failure: %w", err)
|
||||
}
|
||||
|
||||
return res.OID, nil
|
||||
return ResPutObject{
|
||||
ObjectID: res.OID,
|
||||
Epoch: res.Epoch,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
|
||||
|
@ -2445,10 +2451,16 @@ func (p *Pool) fillAppropriateKey(prm *prmCommon) {
|
|||
}
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Let's introduce new struct that contains both id and epoch rather than return these values individually Let's introduce new struct that contains both id and epoch rather than return these values individually
|
||||
}
|
||||
|
||||
// ResPutObject is designed to provide identifier and creation epoch of the saved object.
|
||||
type ResPutObject struct {
|
||||
ObjectID oid.ID
|
||||
Epoch uint64
|
||||
}
|
||||
|
||||
// PutObject writes an object through a remote server using FrostFS API protocol.
|
||||
//
|
||||
// Main return value MUST NOT be processed on an erroneous return.
|
||||
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, error) {
|
||||
cnr, _ := prm.hdr.ContainerID()
|
||||
|
||||
var prmCtx prmContext
|
||||
|
@ -2461,13 +2473,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
|||
var ctxCall callContext
|
||||
ctxCall.sessionClientCut = prm.clientCut
|
||||
dkirillov
commented
(non-blocking): Can we don't introduce this variable? I suggest:
(non-blocking): Can we don't introduce this variable?
I suggest:
```diff
diff --git a/pool/pool.go b/pool/pool.go
index 6e435a1..651702b 100644
--- a/pool/pool.go
+++ b/pool/pool.go
@@ -2470,19 +2470,16 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, e
p.fillAppropriateKey(&prm.prmCommon)
- var (
- res ResPutObject
- ctxCall callContext
- )
+ var ctxCall callContext
ctxCall.sessionClientCut = prm.clientCut
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
- return res, fmt.Errorf("init call context: %w", err)
+ return ResPutObject{}, fmt.Errorf("init call context: %w", err)
}
if ctxCall.sessionDefault {
ctxCall.sessionTarget = prm.UseSession
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
- return res, fmt.Errorf("open default session: %w", err)
+ return ResPutObject{}, fmt.Errorf("open default session: %w", err)
}
}
@@ -2493,12 +2490,11 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (ResPutObject, e
prm.setNetworkInfo(ni)
}
- var err error
- res, err = ctxCall.client.objectPut(ctx, prm)
+ res, err := ctxCall.client.objectPut(ctx, prm)
if err != nil {
// removes session token from cache in case of token error
p.checkSessionTokenErr(err, ctxCall.endpoint)
- return res, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
+ return ResPutObject{}, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
}
return res, nil
```
|
||||
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("init call context: %w", err)
|
||||
return ResPutObject{}, fmt.Errorf("init call context: %w", err)
|
||||
}
|
||||
|
||||
if ctxCall.sessionDefault {
|
||||
ctxCall.sessionTarget = prm.UseSession
|
||||
if err := p.openDefaultSession(ctx, &ctxCall); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("open default session: %w", err)
|
||||
return ResPutObject{}, fmt.Errorf("open default session: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2478,14 +2490,14 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
|||
prm.setNetworkInfo(ni)
|
||||
}
|
||||
|
||||
id, err := ctxCall.client.objectPut(ctx, prm)
|
||||
res, err := ctxCall.client.objectPut(ctx, prm)
|
||||
if err != nil {
|
||||
// removes session token from cache in case of token error
|
||||
p.checkSessionTokenErr(err, ctxCall.endpoint)
|
||||
return id, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
|
||||
return ResPutObject{}, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
|
||||
}
|
||||
|
||||
return id, nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
|
||||
|
|
Loading…
Add table
Reference in a new issue
We need comment I suppose (to be consistent with other methods)