[#412] Store creation epoch in tree service

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-07-22 12:00:17 +03:00
parent 481520705a
commit 0644067496
13 changed files with 167 additions and 76 deletions

View file

@ -82,6 +82,15 @@ type (
VersionID string VersionID string
NoErrorOnDeleteMarker bool NoErrorOnDeleteMarker bool
} }
// CreatedObjectInfo stores created object info.
CreatedObjectInfo struct {
ID oid.ID
Size uint64
HashSum []byte
MD5Sum []byte
CreationEpoch uint64
}
) )
// SettingsObjectName is a system name for a bucket settings file. // SettingsObjectName is a system name for a bucket settings file.

View file

@ -72,6 +72,7 @@ type BaseNodeVersion struct {
Created *time.Time Created *time.Time
Owner *user.ID Owner *user.ID
IsDeleteMarker bool IsDeleteMarker bool
CreationEpoch uint64
} }
func (v *BaseNodeVersion) GetETag(md5Enabled bool) string { func (v *BaseNodeVersion) GetETag(md5Enabled bool) string {
@ -110,6 +111,7 @@ type MultipartInfo struct {
Meta map[string]string Meta map[string]string
CopiesNumbers []uint32 CopiesNumbers []uint32
Finished bool Finished bool
CreationEpoch uint64
} }
// PartInfo is upload information about part. // PartInfo is upload information about part.

View file

@ -55,12 +55,12 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
prm.Container = corsBkt.CID prm.Container = corsBkt.CID
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, corsBkt) createdObj, err := n.objectPutAndHash(ctx, prm, corsBkt)
if err != nil { if err != nil {
return fmt.Errorf("put cors object: %w", err) return fmt.Errorf("put cors object: %w", err)
} }
objsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID)) objsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, createdObj.ID))
objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove) objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
if err != nil && !objToDeleteNotFound { if err != nil && !objToDeleteNotFound {
return err return err

View file

@ -166,6 +166,12 @@ type PrmObjectCreate struct {
BufferMaxSize uint64 BufferMaxSize uint64
} }
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation.
type CreateObjectResult struct {
ObjectID oid.ID
CreationEpoch uint64
}
// PrmObjectDelete groups parameters of FrostFS.DeleteObject operation. // PrmObjectDelete groups parameters of FrostFS.DeleteObject operation.
type PrmObjectDelete struct { type PrmObjectDelete struct {
// Authentication parameters. // Authentication parameters.
@ -261,15 +267,15 @@ type FrostFS interface {
// CreateObject creates and saves a parameterized object in the FrostFS container. // CreateObject creates and saves a parameterized object in the FrostFS container.
// It sets 'Timestamp' attribute to the current time. // It sets 'Timestamp' attribute to the current time.
// It returns the ID of the saved object. // It returns the ID and creation epoch of the saved object.
// //
// Creation time should be written into the object (UTC). // Creation time should be written into the object (UTC).
// //
// It returns ErrAccessDenied on write access violation. // It returns ErrAccessDenied on write access violation.
// //
// It returns exactly one non-zero value. It returns any error encountered which // It returns exactly one non-nil value. It returns any error encountered which
// prevented the container from being created. // prevented the object from being created.
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) CreateObject(context.Context, PrmObjectCreate) (*CreateObjectResult, error)
// DeleteObject marks the object to be removed from the FrostFS container by identifier. // DeleteObject marks the object to be removed from the FrostFS container by identifier.
// Successful return does not guarantee actual removal. // Successful return does not guarantee actual removal.
@ -295,4 +301,7 @@ type FrostFS interface {
// //
// It returns any error encountered which prevented computing epochs. // It returns any error encountered which prevented computing epochs.
TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error) TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error)
// NetworkInfo returns parameters of FrostFS network.
NetworkInfo(context.Context) (netmap.NetworkInfo, error)
} }

View file

@ -18,6 +18,7 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
@ -255,10 +256,10 @@ func (t *TestFrostFS) RangeObject(ctx context.Context, prm PrmObjectRange) (io.R
return io.NopCloser(bytes.NewReader(payload)), nil return io.NopCloser(bytes.NewReader(payload)), nil
} }
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) { func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (*CreateObjectResult, error) {
b := make([]byte, 32) b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil { if _, err := io.ReadFull(rand.Reader, b); err != nil {
return oid.ID{}, err return nil, err
} }
var id oid.ID var id oid.ID
id.SetSHA256(sha256.Sum256(b)) id.SetSHA256(sha256.Sum256(b))
@ -266,7 +267,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
attrs := make([]object.Attribute, 0) attrs := make([]object.Attribute, 0)
if err := t.objectPutErrors[prm.Filepath]; err != nil { if err := t.objectPutErrors[prm.Filepath]; err != nil {
return oid.ID{}, err return nil, err
} }
if prm.Filepath != "" { if prm.Filepath != "" {
@ -311,7 +312,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
if prm.Payload != nil { if prm.Payload != nil {
all, err := io.ReadAll(prm.Payload) all, err := io.ReadAll(prm.Payload)
if err != nil { if err != nil {
return oid.ID{}, err return nil, err
} }
obj.SetPayload(all) obj.SetPayload(all)
obj.SetPayloadSize(uint64(len(all))) obj.SetPayloadSize(uint64(len(all)))
@ -325,7 +326,10 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
addr := newAddress(cnrID, objID) addr := newAddress(cnrID, objID)
t.objects[addr.EncodeToString()] = obj t.objects[addr.EncodeToString()] = obj
return objID, nil return &CreateObjectResult{
ObjectID: objID,
CreationEpoch: t.currentEpoch - 1,
}, nil
} }
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { func (t *TestFrostFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
@ -404,6 +408,13 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) ([]o
return res, nil return res, nil
} }
func (t *TestFrostFS) NetworkInfo(context.Context) (netmap.NetworkInfo, error) {
ni := netmap.NetworkInfo{}
ni.SetCurrentEpoch(t.currentEpoch)
return ni, nil
}
func (t *TestFrostFS) checkAccess(cnrID cid.ID, owner user.ID) bool { func (t *TestFrostFS) checkAccess(cnrID cid.ID, owner user.ID) bool {
cnr, ok := t.containers[cnrID.EncodeToString()] cnr, ok := t.containers[cnrID.EncodeToString()]
if !ok { if !ok {

View file

@ -42,7 +42,7 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke
prm.Container = lifecycleBkt.CID prm.Container = lifecycleBkt.CID
_, objID, _, md5, err := n.objectPutAndHash(ctx, prm, lifecycleBkt) createdObj, err := n.objectPutAndHash(ctx, prm, lifecycleBkt)
if err != nil { if err != nil {
return fmt.Errorf("put lifecycle object: %w", err) return fmt.Errorf("put lifecycle object: %w", err)
} }
@ -52,13 +52,13 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke
return apiErr.GetAPIError(apiErr.ErrInvalidDigest) return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
} }
if !bytes.Equal(hashBytes, md5) { if !bytes.Equal(hashBytes, createdObj.MD5Sum) {
n.deleteLifecycleObject(ctx, p.BktInfo, newAddress(lifecycleBkt.CID, objID)) n.deleteLifecycleObject(ctx, p.BktInfo, newAddress(lifecycleBkt.CID, createdObj.ID))
return apiErr.GetAPIError(apiErr.ErrInvalidDigest) return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
} }
objsToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, newAddress(lifecycleBkt.CID, objID)) objsToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, newAddress(lifecycleBkt.CID, createdObj.ID))
objsToDeleteNotFound := errors.Is(err, ErrNoNodeToRemove) objsToDeleteNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !objsToDeleteNotFound { if err != nil && !objsToDeleteNotFound {
return err return err

View file

@ -150,6 +150,11 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
metaSize += len(p.Data.TagSet) metaSize += len(p.Data.TagSet)
} }
networkInfo, err := n.frostFS.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info: %w", err)
}
info := &data.MultipartInfo{ info := &data.MultipartInfo{
Key: p.Info.Key, Key: p.Info.Key,
UploadID: p.Info.UploadID, UploadID: p.Info.UploadID,
@ -157,6 +162,7 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
Created: TimeNow(ctx), Created: TimeNow(ctx),
Meta: make(map[string]string, metaSize), Meta: make(map[string]string, metaSize),
CopiesNumbers: p.CopiesNumbers, CopiesNumbers: p.CopiesNumbers,
CreationEpoch: networkInfo.CurrentEpoch(),
} }
for key, val := range p.Header { for key, val := range p.Header {
@ -229,7 +235,7 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo) createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -238,21 +244,21 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
if err != nil { if err != nil {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest) return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
} }
if hex.EncodeToString(hashBytes) != hex.EncodeToString(md5Hash) { if hex.EncodeToString(hashBytes) != hex.EncodeToString(createdObj.MD5Sum) {
prm := PrmObjectDelete{ prm := PrmObjectDelete{
Object: id, Object: createdObj.ID,
Container: bktInfo.CID, Container: bktInfo.CID,
} }
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
err = n.frostFS.DeleteObject(ctx, prm) err = n.frostFS.DeleteObject(ctx, prm)
if err != nil { if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id)) n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
} }
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest) return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
} }
} }
if p.Info.Encryption.Enabled() { if p.Info.Encryption.Enabled() {
size = decSize createdObj.Size = decSize
} }
if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) { if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) {
@ -260,10 +266,10 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
if err != nil { if err != nil {
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch) return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
} }
if !bytes.Equal(contentHashBytes, hash) { if !bytes.Equal(contentHashBytes, createdObj.HashSum) {
err = n.objectDelete(ctx, bktInfo, id) err = n.objectDelete(ctx, bktInfo, createdObj.ID)
if err != nil { if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id)) n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
} }
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch) return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
} }
@ -271,17 +277,17 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
n.reqLogger(ctx).Debug(logs.UploadPart, n.reqLogger(ctx).Debug(logs.UploadPart,
zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber), zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber),
zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id)) zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
partInfo := &data.PartInfo{ partInfo := &data.PartInfo{
Key: p.Info.Key, Key: p.Info.Key,
UploadID: p.Info.UploadID, UploadID: p.Info.UploadID,
Number: p.PartNumber, Number: p.PartNumber,
OID: id, OID: createdObj.ID,
Size: size, Size: createdObj.Size,
ETag: hex.EncodeToString(hash), ETag: hex.EncodeToString(createdObj.HashSum),
Created: prm.CreationTime, Created: prm.CreationTime,
MD5: hex.EncodeToString(md5Hash), MD5: hex.EncodeToString(createdObj.MD5Sum),
} }
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
@ -298,7 +304,7 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
} }
objInfo := &data.ObjectInfo{ objInfo := &data.ObjectInfo{
ID: id, ID: createdObj.ID,
CID: bktInfo.CID, CID: bktInfo.CID,
Owner: bktInfo.Owner, Owner: bktInfo.Owner,

View file

@ -271,7 +271,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
prm.Attributes = append(prm.Attributes, [2]string{k, v}) prm.Attributes = append(prm.Attributes, [2]string{k, v})
} }
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo) createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -280,10 +280,10 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
if err != nil { if err != nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest) return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
} }
if !bytes.Equal(headerMd5Hash, md5Hash) { if !bytes.Equal(headerMd5Hash, createdObj.MD5Sum) {
err = n.objectDelete(ctx, p.BktInfo, id) err = n.objectDelete(ctx, p.BktInfo, createdObj.ID)
if err != nil { if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id)) n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID))
} }
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest) return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
} }
@ -294,25 +294,26 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
if err != nil { if err != nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch) return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch)
} }
if !bytes.Equal(contentHashBytes, hash) { if !bytes.Equal(contentHashBytes, createdObj.HashSum) {
err = n.objectDelete(ctx, p.BktInfo, id) err = n.objectDelete(ctx, p.BktInfo, createdObj.ID)
if err != nil { if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id)) n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID))
} }
return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch) return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch)
} }
} }
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id)) n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID))
now := TimeNow(ctx) now := TimeNow(ctx)
newVersion := &data.NodeVersion{ newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{ BaseNodeVersion: data.BaseNodeVersion{
OID: id, OID: createdObj.ID,
ETag: hex.EncodeToString(hash), ETag: hex.EncodeToString(createdObj.HashSum),
FilePath: p.Object, FilePath: p.Object,
Size: p.Size, Size: p.Size,
Created: &now, Created: &now,
Owner: &n.gateOwner, Owner: &n.gateOwner,
CreationEpoch: createdObj.CreationEpoch,
}, },
IsUnversioned: !bktSettings.VersioningEnabled(), IsUnversioned: !bktSettings.VersioningEnabled(),
IsCombined: p.Header[MultipartObjectSize] != "", IsCombined: p.Header[MultipartObjectSize] != "",
@ -320,7 +321,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
if len(p.CompleteMD5Hash) > 0 { if len(p.CompleteMD5Hash) > 0 {
newVersion.MD5 = p.CompleteMD5Hash newVersion.MD5 = p.CompleteMD5Hash
} else { } else {
newVersion.MD5 = hex.EncodeToString(md5Hash) newVersion.MD5 = hex.EncodeToString(createdObj.MD5Sum)
} }
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
@ -332,7 +333,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
ObjVersion: &data.ObjectVersion{ ObjVersion: &data.ObjectVersion{
BktInfo: p.BktInfo, BktInfo: p.BktInfo,
ObjectName: p.Object, ObjectName: p.Object,
VersionID: id.EncodeToString(), VersionID: createdObj.ID.EncodeToString(),
}, },
NewLock: p.Lock, NewLock: p.Lock,
CopiesNumbers: p.CopiesNumbers, CopiesNumbers: p.CopiesNumbers,
@ -347,13 +348,13 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
n.cache.CleanListCacheEntriesContainingObject(p.Object, p.BktInfo.CID) n.cache.CleanListCacheEntriesContainingObject(p.Object, p.BktInfo.CID)
objInfo := &data.ObjectInfo{ objInfo := &data.ObjectInfo{
ID: id, ID: createdObj.ID,
CID: p.BktInfo.CID, CID: p.BktInfo.CID,
Owner: n.gateOwner, Owner: n.gateOwner,
Bucket: p.BktInfo.Name, Bucket: p.BktInfo.Name,
Name: p.Object, Name: p.Object,
Size: size, Size: createdObj.Size,
Created: prm.CreationTime, Created: prm.CreationTime,
Headers: p.Header, Headers: p.Header,
ContentType: p.Header[api.ContentType], ContentType: p.Header[api.ContentType],
@ -491,8 +492,7 @@ func (n *Layer) objectDeleteBase(ctx context.Context, bktInfo *data.BucketInfo,
} }
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject. // objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
// Returns object ID and payload sha256 hash. func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (*data.CreatedObjectInfo, error) {
func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
prm.ClientCut = n.features.ClientCut() prm.ClientCut = n.features.ClientCut()
prm.BufferMaxSize = n.features.BufferMaxSizeForPut() prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
@ -505,15 +505,21 @@ func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
hash.Write(buf) hash.Write(buf)
md5Hash.Write(buf) md5Hash.Write(buf)
}) })
id, err := n.frostFS.CreateObject(ctx, prm) res, err := n.frostFS.CreateObject(ctx, prm)
if err != nil { if err != nil {
if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil { if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil {
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard)) n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
} }
return 0, oid.ID{}, nil, nil, err return nil, err
} }
return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil return &data.CreatedObjectInfo{
ID: res.ObjectID,
Size: size,
HashSum: hash.Sum(nil),
MD5Sum: md5Hash.Sum(nil),
CreationEpoch: res.CreationEpoch,
}, nil
} }
type logWrapper struct { type logWrapper struct {

View file

@ -44,7 +44,7 @@ func TestGoroutinesDontLeakInPutAndHash(t *testing.T) {
expErr := errors.New("some error") expErr := errors.New("some error")
tc.testFrostFS.SetObjectPutError(tc.obj, expErr) tc.testFrostFS.SetObjectPutError(tc.obj, expErr)
_, _, _, _, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo) _, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo)
require.ErrorIs(t, err, expErr) require.ErrorIs(t, err, expErr)
require.Empty(t, payload.Len(), "body must be read out otherwise goroutines can leak in wrapReader") require.Empty(t, payload.Len(), "body must be read out otherwise goroutines can leak in wrapReader")
} }

View file

@ -126,8 +126,12 @@ func (n *Layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
return oid.ID{}, err return oid.ID{}, err
} }
_, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo) createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo)
return id, err if err != nil {
return oid.ID{}, err
}
return createdObj.ID, nil
} }
func (n *Layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion) (*data.LockInfo, error) { func (n *Layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion) (*data.LockInfo, error) {

View file

@ -137,12 +137,17 @@ func (x *AuthmateFrostFS) CreateObject(ctx context.Context, prm tokens.PrmObject
attributes = append(attributes, [2]string{attr.Key(), attr.Value()}) attributes = append(attributes, [2]string{attr.Key(), attr.Value()})
} }
return x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{ res, err := x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{
Container: prm.Container, Container: prm.Container,
Filepath: prm.Filepath, Filepath: prm.Filepath,
Attributes: attributes, Attributes: attributes,
Payload: bytes.NewReader(prm.Payload), Payload: bytes.NewReader(prm.Payload),
}) })
if err != nil {
return oid.ID{}, err
}
return res.ObjectID, nil
} }
func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address) (*crdt.ObjectVersions, error) { func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address) (*crdt.ObjectVersions, error) {

View file

@ -15,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
@ -51,7 +52,7 @@ func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
} }
} }
// TimeToEpoch implements frostfs.FrostFS interface method. // TimeToEpoch implements layer.FrostFS interface method.
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) { func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
dur := futureTime.Sub(now) dur := futureTime.Sub(now)
if dur < 0 { if dur < 0 {
@ -87,7 +88,7 @@ func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (u
return curr, epoch, nil return curr, epoch, nil
} }
// Container implements frostfs.FrostFS interface method. // Container implements layer.FrostFS interface method.
func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) { func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) {
prm := pool.PrmContainerGet{ prm := pool.PrmContainerGet{
ContainerID: layerPrm.ContainerID, ContainerID: layerPrm.ContainerID,
@ -102,7 +103,7 @@ func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*
return &res, nil return &res, nil
} }
// CreateContainer implements frostfs.FrostFS interface method. // CreateContainer implements layer.FrostFS interface method.
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) { func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
var cnr container.Container var cnr container.Container
cnr.Init() cnr.Init()
@ -150,7 +151,7 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
}, handleObjectError("save container via connection pool", err) }, handleObjectError("save container via connection pool", err)
} }
// UserContainers implements frostfs.FrostFS interface method. // UserContainers implements layer.FrostFS interface method.
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) { func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) {
prm := pool.PrmContainerList{ prm := pool.PrmContainerList{
OwnerID: layerPrm.UserID, OwnerID: layerPrm.UserID,
@ -161,7 +162,7 @@ func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserCont
return r, handleObjectError("list user containers via connection pool", err) return r, handleObjectError("list user containers via connection pool", err)
} }
// DeleteContainer implements frostfs.FrostFS interface method. // DeleteContainer implements layer.FrostFS interface method.
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error { func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await} prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
@ -169,8 +170,8 @@ func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session
return handleObjectError("delete container via connection pool", err) return handleObjectError("delete container via connection pool", err)
} }
// CreateObject implements frostfs.FrostFS interface method. // CreateObject implements layer.FrostFS interface method.
func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, error) { func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (*layer.CreateObjectResult, error) {
attrNum := len(prm.Attributes) + 1 // + creation time attrNum := len(prm.Attributes) + 1 // + creation time
if prm.Filepath != "" { if prm.Filepath != "" {
@ -239,10 +240,13 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
res, err := x.pool.PutObject(ctx, prmPut) res, err := x.pool.PutObject(ctx, prmPut)
if err = handleObjectError("save object via connection pool", err); err != nil { if err = handleObjectError("save object via connection pool", err); err != nil {
return oid.ID{}, err return nil, err
} }
return res.ObjectID, nil return &layer.CreateObjectResult{
ObjectID: res.ObjectID,
CreationEpoch: res.Epoch,
}, nil
} }
// wraps io.ReadCloser and transforms Read errors related to access violation // wraps io.ReadCloser and transforms Read errors related to access violation
@ -259,7 +263,7 @@ func (x payloadReader) Read(p []byte) (int, error) {
return n, handleObjectError("read payload", err) return n, handleObjectError("read payload", err)
} }
// HeadObject implements frostfs.FrostFS interface method. // HeadObject implements layer.FrostFS interface method.
func (x *FrostFS) HeadObject(ctx context.Context, prm layer.PrmObjectHead) (*object.Object, error) { func (x *FrostFS) HeadObject(ctx context.Context, prm layer.PrmObjectHead) (*object.Object, error) {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
@ -282,7 +286,7 @@ func (x *FrostFS) HeadObject(ctx context.Context, prm layer.PrmObjectHead) (*obj
return &res, nil return &res, nil
} }
// GetObject implements frostfs.FrostFS interface method. // GetObject implements layer.FrostFS interface method.
func (x *FrostFS) GetObject(ctx context.Context, prm layer.PrmObjectGet) (*layer.Object, error) { func (x *FrostFS) GetObject(ctx context.Context, prm layer.PrmObjectGet) (*layer.Object, error) {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
@ -308,7 +312,7 @@ func (x *FrostFS) GetObject(ctx context.Context, prm layer.PrmObjectGet) (*layer
}, nil }, nil
} }
// RangeObject implements frostfs.FrostFS interface method. // RangeObject implements layer.FrostFS interface method.
func (x *FrostFS) RangeObject(ctx context.Context, prm layer.PrmObjectRange) (io.ReadCloser, error) { func (x *FrostFS) RangeObject(ctx context.Context, prm layer.PrmObjectRange) (io.ReadCloser, error) {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
@ -333,7 +337,7 @@ func (x *FrostFS) RangeObject(ctx context.Context, prm layer.PrmObjectRange) (io
return payloadReader{&res}, nil return payloadReader{&res}, nil
} }
// DeleteObject implements frostfs.FrostFS interface method. // DeleteObject implements layer.FrostFS interface method.
func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error { func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
@ -352,7 +356,7 @@ func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) e
return handleObjectError("mark object removal via connection pool", err) return handleObjectError("mark object removal via connection pool", err)
} }
// SearchObjects implements frostfs.FrostFS interface method. // SearchObjects implements layer.FrostFS interface method.
func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) { func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) {
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddRootFilter() filters.AddRootFilter()
@ -389,6 +393,16 @@ func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch)
return buf, handleObjectError("read object list", err) return buf, handleObjectError("read object list", err)
} }
// NetworkInfo implements layer.FrostFS interface method.
func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
ni, err := x.pool.NetworkInfo(ctx)
if err != nil {
return ni, handleObjectError("get network info via connection pool", err)
}
return ni, nil
}
// ResolverFrostFS represents virtual connection to the FrostFS network. // ResolverFrostFS represents virtual connection to the FrostFS network.
// It implements resolver.FrostFS. // It implements resolver.FrostFS.
type ResolverFrostFS struct { type ResolverFrostFS struct {

View file

@ -101,6 +101,7 @@ const (
etagKV = "ETag" etagKV = "ETag"
md5KV = "MD5" md5KV = "MD5"
finishedKV = "Finished" finishedKV = "Finished"
creationEpochKV = "CreationEpoch"
// keys for lock. // keys for lock.
isLockKV = "IsLock" isLockKV = "IsLock"
@ -273,6 +274,14 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
} }
} }
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err))
} else {
version.CreationEpoch = epoch
}
}
return version, nil return version, nil
} }
@ -354,6 +363,14 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
} }
} }
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, creationEpoch), zap.Error(err))
} else {
multipartInfo.CreationEpoch = epoch
}
}
return multipartInfo, nil return multipartInfo, nil
} }
@ -389,6 +406,12 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
} else { } else {
multipartInfo.Finished = isFinished multipartInfo.Finished = isFinished
} }
case creationEpochKV:
if epoch, err := strconv.ParseUint(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.String(creationEpochKV, string(kv.GetValue())), zap.Error(err))
} else {
multipartInfo.CreationEpoch = epoch
}
default: default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
} }
@ -759,7 +782,7 @@ func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepa
} }
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) { func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV} meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
path := pathFromName(objectName) path := pathFromName(objectName)
p := &GetNodesParams{ p := &GetNodesParams{
@ -1609,6 +1632,7 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
FileNameKey: path[len(path)-1], FileNameKey: path[len(path)-1],
ownerKV: version.Owner.EncodeToString(), ownerKV: version.Owner.EncodeToString(),
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10), createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
creationEpochKV: strconv.FormatUint(version.CreationEpoch, 10),
} }
if version.Size > 0 { if version.Size > 0 {
@ -1662,7 +1686,7 @@ func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.Bucke
} }
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) { func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV} keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
path := pathFromName(filepath) path := pathFromName(filepath)
p := &GetNodesParams{ p := &GetNodesParams{
BktInfo: bktInfo, BktInfo: bktInfo,
@ -1720,6 +1744,7 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
if info.Finished { if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished) info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
} }
info.Meta[creationEpochKV] = strconv.FormatUint(info.CreationEpoch, 10)
return info.Meta return info.Meta
} }