[#412] Store creation epoch in tree service #424
15 changed files with 172 additions and 77 deletions
|
@ -82,6 +82,15 @@ type (
|
||||||
VersionID string
|
VersionID string
|
||||||
NoErrorOnDeleteMarker bool
|
NoErrorOnDeleteMarker bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreatedObjectInfo stores created object info.
|
||||||
|
CreatedObjectInfo struct {
|
||||||
|
ID oid.ID
|
||||||
|
Size uint64
|
||||||
|
HashSum []byte
|
||||||
|
MD5Sum []byte
|
||||||
|
CreationEpoch uint64
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// SettingsObjectName is a system name for a bucket settings file.
|
// SettingsObjectName is a system name for a bucket settings file.
|
||||||
|
|
|
@ -72,6 +72,7 @@ type BaseNodeVersion struct {
|
||||||
Created *time.Time
|
Created *time.Time
|
||||||
Owner *user.ID
|
Owner *user.ID
|
||||||
IsDeleteMarker bool
|
IsDeleteMarker bool
|
||||||
|
CreationEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *BaseNodeVersion) GetETag(md5Enabled bool) string {
|
func (v *BaseNodeVersion) GetETag(md5Enabled bool) string {
|
||||||
|
@ -110,6 +111,7 @@ type MultipartInfo struct {
|
||||||
Meta map[string]string
|
Meta map[string]string
|
||||||
CopiesNumbers []uint32
|
CopiesNumbers []uint32
|
||||||
Finished bool
|
Finished bool
|
||||||
|
CreationEpoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// PartInfo is upload information about part.
|
// PartInfo is upload information about part.
|
||||||
|
|
|
@ -44,12 +44,12 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
||||||
CopiesNumber: p.CopiesNumbers,
|
CopiesNumber: p.CopiesNumbers,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("put system object: %w", err)
|
return fmt.Errorf("put system object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objIDToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, objID)
|
objIDToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, createdObj.ID)
|
||||||
objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||||
if err != nil && !objIDToDeleteNotFound {
|
if err != nil && !objIDToDeleteNotFound {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -148,6 +148,12 @@ type PrmObjectCreate struct {
|
||||||
BufferMaxSize uint64
|
BufferMaxSize uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation.
|
||||||
|
type CreateObjectResult struct {
|
||||||
|
ObjectID oid.ID
|
||||||
|
CreationEpoch uint64
|
||||||
|
}
|
||||||
|
|
||||||
// PrmObjectDelete groups parameters of FrostFS.DeleteObject operation.
|
// PrmObjectDelete groups parameters of FrostFS.DeleteObject operation.
|
||||||
type PrmObjectDelete struct {
|
type PrmObjectDelete struct {
|
||||||
// Authentication parameters.
|
// Authentication parameters.
|
||||||
|
@ -231,15 +237,15 @@ type FrostFS interface {
|
||||||
|
|
||||||
// CreateObject creates and saves a parameterized object in the FrostFS container.
|
// CreateObject creates and saves a parameterized object in the FrostFS container.
|
||||||
// It sets 'Timestamp' attribute to the current time.
|
// It sets 'Timestamp' attribute to the current time.
|
||||||
// It returns the ID of the saved object.
|
// It returns the ID and creation epoch of the saved object.
|
||||||
//
|
//
|
||||||
// Creation time should be written into the object (UTC).
|
// Creation time should be written into the object (UTC).
|
||||||
//
|
//
|
||||||
// It returns ErrAccessDenied on write access violation.
|
// It returns ErrAccessDenied on write access violation.
|
||||||
//
|
//
|
||||||
// It returns exactly one non-zero value. It returns any error encountered which
|
// It returns exactly one non-nil value. It returns any error encountered which
|
||||||
// prevented the container from being created.
|
// prevented the object from being created.
|
||||||
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
CreateObject(context.Context, PrmObjectCreate) (*CreateObjectResult, error)
|
||||||
|
|
||||||
// DeleteObject marks the object to be removed from the FrostFS container by identifier.
|
// DeleteObject marks the object to be removed from the FrostFS container by identifier.
|
||||||
// Successful return does not guarantee actual removal.
|
// Successful return does not guarantee actual removal.
|
||||||
|
@ -265,4 +271,7 @@ type FrostFS interface {
|
||||||
//
|
//
|
||||||
// It returns any error encountered which prevented computing epochs.
|
// It returns any error encountered which prevented computing epochs.
|
||||||
TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error)
|
TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error)
|
||||||
|
|
||||||
|
// NetworkInfo returns parameters of FrostFS network.
|
||||||
|
NetworkInfo(context.Context) (netmap.NetworkInfo, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
|
@ -237,10 +238,10 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
|
||||||
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
|
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (*CreateObjectResult, error) {
|
||||||
b := make([]byte, 32)
|
b := make([]byte, 32)
|
||||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var id oid.ID
|
var id oid.ID
|
||||||
id.SetSHA256(sha256.Sum256(b))
|
id.SetSHA256(sha256.Sum256(b))
|
||||||
|
@ -248,7 +249,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
|
||||||
attrs := make([]object.Attribute, 0)
|
attrs := make([]object.Attribute, 0)
|
||||||
|
|
||||||
if err := t.objectPutErrors[prm.Filepath]; err != nil {
|
if err := t.objectPutErrors[prm.Filepath]; err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.Filepath != "" {
|
if prm.Filepath != "" {
|
||||||
|
@ -293,7 +294,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
|
||||||
if prm.Payload != nil {
|
if prm.Payload != nil {
|
||||||
all, err := io.ReadAll(prm.Payload)
|
all, err := io.ReadAll(prm.Payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
obj.SetPayload(all)
|
obj.SetPayload(all)
|
||||||
obj.SetPayloadSize(uint64(len(all)))
|
obj.SetPayloadSize(uint64(len(all)))
|
||||||
|
@ -307,7 +308,10 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
|
||||||
|
|
||||||
addr := newAddress(cnrID, objID)
|
addr := newAddress(cnrID, objID)
|
||||||
t.objects[addr.EncodeToString()] = obj
|
t.objects[addr.EncodeToString()] = obj
|
||||||
return objID, nil
|
return &CreateObjectResult{
|
||||||
|
ObjectID: objID,
|
||||||
|
CreationEpoch: t.currentEpoch - 1,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
|
@ -386,6 +390,13 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) ([]o
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) NetworkInfo(context.Context) (netmap.NetworkInfo, error) {
|
||||||
|
ni := netmap.NetworkInfo{}
|
||||||
|
ni.SetCurrentEpoch(t.currentEpoch)
|
||||||
|
|
||||||
|
return ni, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) checkAccess(cnrID cid.ID, owner user.ID) bool {
|
func (t *TestFrostFS) checkAccess(cnrID cid.ID, owner user.ID) bool {
|
||||||
cnr, ok := t.containers[cnrID.EncodeToString()]
|
cnr, ok := t.containers[cnrID.EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -43,7 +43,7 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke
|
||||||
|
|
||||||
prm.Container = lifecycleBkt.CID
|
prm.Container = lifecycleBkt.CID
|
||||||
|
|
||||||
_, objID, _, md5, err := n.objectPutAndHash(ctx, prm, lifecycleBkt)
|
createdObj, err := n.objectPutAndHash(ctx, prm, lifecycleBkt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -53,13 +53,13 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke
|
||||||
return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
|
return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(hashBytes, md5) {
|
if !bytes.Equal(hashBytes, createdObj.MD5Sum) {
|
||||||
n.deleteLifecycleObject(ctx, lifecycleBkt, objID)
|
n.deleteLifecycleObject(ctx, lifecycleBkt, createdObj.ID)
|
||||||
|
|
||||||
return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
|
return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
|
||||||
}
|
}
|
||||||
|
|
||||||
objIDToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, objID)
|
objIDToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, createdObj.ID)
|
||||||
objIDToDeleteNotFound := errors.Is(err, ErrNoNodeToRemove)
|
objIDToDeleteNotFound := errors.Is(err, ErrNoNodeToRemove)
|
||||||
if err != nil && !objIDToDeleteNotFound {
|
if err != nil && !objIDToDeleteNotFound {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -150,6 +150,11 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
|
||||||
metaSize += len(p.Data.TagSet)
|
metaSize += len(p.Data.TagSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
networkInfo, err := n.frostFS.NetworkInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get network info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
info := &data.MultipartInfo{
|
info := &data.MultipartInfo{
|
||||||
Key: p.Info.Key,
|
Key: p.Info.Key,
|
||||||
UploadID: p.Info.UploadID,
|
UploadID: p.Info.UploadID,
|
||||||
|
@ -157,6 +162,7 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
|
||||||
Created: TimeNow(ctx),
|
Created: TimeNow(ctx),
|
||||||
Meta: make(map[string]string, metaSize),
|
Meta: make(map[string]string, metaSize),
|
||||||
CopiesNumbers: p.CopiesNumbers,
|
CopiesNumbers: p.CopiesNumbers,
|
||||||
|
CreationEpoch: networkInfo.CurrentEpoch(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for key, val := range p.Header {
|
for key, val := range p.Header {
|
||||||
|
@ -229,7 +235,7 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
|
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
|
||||||
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
|
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
|
||||||
|
|
||||||
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -238,21 +244,21 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
||||||
}
|
}
|
||||||
if hex.EncodeToString(hashBytes) != hex.EncodeToString(md5Hash) {
|
if hex.EncodeToString(hashBytes) != hex.EncodeToString(createdObj.MD5Sum) {
|
||||||
prm := PrmObjectDelete{
|
prm := PrmObjectDelete{
|
||||||
Object: id,
|
Object: createdObj.ID,
|
||||||
Container: bktInfo.CID,
|
Container: bktInfo.CID,
|
||||||
}
|
}
|
||||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||||
err = n.frostFS.DeleteObject(ctx, prm)
|
err = n.frostFS.DeleteObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
||||||
}
|
}
|
||||||
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if p.Info.Encryption.Enabled() {
|
if p.Info.Encryption.Enabled() {
|
||||||
size = decSize
|
createdObj.Size = decSize
|
||||||
}
|
}
|
||||||
|
|
||||||
if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) {
|
if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) {
|
||||||
|
@ -260,10 +266,10 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
|
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(contentHashBytes, hash) {
|
if !bytes.Equal(contentHashBytes, createdObj.HashSum) {
|
||||||
err = n.objectDelete(ctx, bktInfo, id)
|
err = n.objectDelete(ctx, bktInfo, createdObj.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
||||||
}
|
}
|
||||||
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
|
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
|
||||||
}
|
}
|
||||||
|
@ -271,17 +277,17 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
|
|
||||||
n.reqLogger(ctx).Debug(logs.UploadPart,
|
n.reqLogger(ctx).Debug(logs.UploadPart,
|
||||||
zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber),
|
zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber),
|
||||||
zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
|
zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
||||||
|
|
||||||
partInfo := &data.PartInfo{
|
partInfo := &data.PartInfo{
|
||||||
Key: p.Info.Key,
|
Key: p.Info.Key,
|
||||||
UploadID: p.Info.UploadID,
|
UploadID: p.Info.UploadID,
|
||||||
Number: p.PartNumber,
|
Number: p.PartNumber,
|
||||||
OID: id,
|
OID: createdObj.ID,
|
||||||
Size: size,
|
Size: createdObj.Size,
|
||||||
ETag: hex.EncodeToString(hash),
|
ETag: hex.EncodeToString(createdObj.HashSum),
|
||||||
Created: prm.CreationTime,
|
Created: prm.CreationTime,
|
||||||
MD5: hex.EncodeToString(md5Hash),
|
MD5: hex.EncodeToString(createdObj.MD5Sum),
|
||||||
}
|
}
|
||||||
|
|
||||||
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
||||||
|
@ -298,7 +304,7 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo := &data.ObjectInfo{
|
objInfo := &data.ObjectInfo{
|
||||||
ID: id,
|
ID: createdObj.ID,
|
||||||
CID: bktInfo.CID,
|
CID: bktInfo.CID,
|
||||||
|
|
||||||
Owner: bktInfo.Owner,
|
Owner: bktInfo.Owner,
|
||||||
|
|
|
@ -273,7 +273,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||||
}
|
}
|
||||||
|
|
||||||
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -282,10 +282,10 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
|
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(headerMd5Hash, md5Hash) {
|
if !bytes.Equal(headerMd5Hash, createdObj.MD5Sum) {
|
||||||
err = n.objectDelete(ctx, p.BktInfo, id)
|
err = n.objectDelete(ctx, p.BktInfo, createdObj.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
||||||
}
|
}
|
||||||
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
|
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
|
||||||
}
|
}
|
||||||
|
@ -296,25 +296,26 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch)
|
return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(contentHashBytes, hash) {
|
if !bytes.Equal(contentHashBytes, createdObj.HashSum) {
|
||||||
err = n.objectDelete(ctx, p.BktInfo, id)
|
err = n.objectDelete(ctx, p.BktInfo, createdObj.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
||||||
}
|
}
|
||||||
return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch)
|
return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
||||||
now := TimeNow(ctx)
|
now := TimeNow(ctx)
|
||||||
newVersion := &data.NodeVersion{
|
newVersion := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
OID: id,
|
OID: createdObj.ID,
|
||||||
ETag: hex.EncodeToString(hash),
|
ETag: hex.EncodeToString(createdObj.HashSum),
|
||||||
FilePath: p.Object,
|
FilePath: p.Object,
|
||||||
Size: p.Size,
|
Size: p.Size,
|
||||||
Created: &now,
|
Created: &now,
|
||||||
Owner: &n.gateOwner,
|
Owner: &n.gateOwner,
|
||||||
|
CreationEpoch: createdObj.CreationEpoch,
|
||||||
},
|
},
|
||||||
IsUnversioned: !bktSettings.VersioningEnabled(),
|
IsUnversioned: !bktSettings.VersioningEnabled(),
|
||||||
IsCombined: p.Header[MultipartObjectSize] != "",
|
IsCombined: p.Header[MultipartObjectSize] != "",
|
||||||
|
@ -322,7 +323,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
if len(p.CompleteMD5Hash) > 0 {
|
if len(p.CompleteMD5Hash) > 0 {
|
||||||
newVersion.MD5 = p.CompleteMD5Hash
|
newVersion.MD5 = p.CompleteMD5Hash
|
||||||
} else {
|
} else {
|
||||||
newVersion.MD5 = hex.EncodeToString(md5Hash)
|
newVersion.MD5 = hex.EncodeToString(createdObj.MD5Sum)
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
|
@ -334,7 +335,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
ObjVersion: &data.ObjectVersion{
|
ObjVersion: &data.ObjectVersion{
|
||||||
BktInfo: p.BktInfo,
|
BktInfo: p.BktInfo,
|
||||||
ObjectName: p.Object,
|
ObjectName: p.Object,
|
||||||
VersionID: id.EncodeToString(),
|
VersionID: createdObj.ID.EncodeToString(),
|
||||||
},
|
},
|
||||||
NewLock: p.Lock,
|
NewLock: p.Lock,
|
||||||
CopiesNumbers: p.CopiesNumbers,
|
CopiesNumbers: p.CopiesNumbers,
|
||||||
|
@ -349,13 +350,13 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
n.cache.CleanListCacheEntriesContainingObject(p.Object, p.BktInfo.CID)
|
n.cache.CleanListCacheEntriesContainingObject(p.Object, p.BktInfo.CID)
|
||||||
|
|
||||||
objInfo := &data.ObjectInfo{
|
objInfo := &data.ObjectInfo{
|
||||||
ID: id,
|
ID: createdObj.ID,
|
||||||
CID: p.BktInfo.CID,
|
CID: p.BktInfo.CID,
|
||||||
|
|
||||||
Owner: n.gateOwner,
|
Owner: n.gateOwner,
|
||||||
Bucket: p.BktInfo.Name,
|
Bucket: p.BktInfo.Name,
|
||||||
Name: p.Object,
|
Name: p.Object,
|
||||||
Size: size,
|
Size: createdObj.Size,
|
||||||
Created: prm.CreationTime,
|
Created: prm.CreationTime,
|
||||||
Headers: p.Header,
|
Headers: p.Header,
|
||||||
ContentType: p.Header[api.ContentType],
|
ContentType: p.Header[api.ContentType],
|
||||||
|
@ -493,8 +494,7 @@ func (n *Layer) objectDeleteBase(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
|
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
|
||||||
// Returns object ID and payload sha256 hash.
|
func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (*data.CreatedObjectInfo, error) {
|
||||||
func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
|
|
||||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||||
prm.ClientCut = n.features.ClientCut()
|
prm.ClientCut = n.features.ClientCut()
|
||||||
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
|
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
|
||||||
|
@ -507,15 +507,21 @@ func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
|
||||||
hash.Write(buf)
|
hash.Write(buf)
|
||||||
md5Hash.Write(buf)
|
md5Hash.Write(buf)
|
||||||
})
|
})
|
||||||
id, err := n.frostFS.CreateObject(ctx, prm)
|
res, err := n.frostFS.CreateObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil {
|
if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil {
|
||||||
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
|
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, oid.ID{}, nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil
|
return &data.CreatedObjectInfo{
|
||||||
|
ID: res.ObjectID,
|
||||||
|
Size: size,
|
||||||
|
HashSum: hash.Sum(nil),
|
||||||
|
MD5Sum: md5Hash.Sum(nil),
|
||||||
|
CreationEpoch: res.CreationEpoch,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type logWrapper struct {
|
type logWrapper struct {
|
||||||
|
|
|
@ -44,7 +44,7 @@ func TestGoroutinesDontLeakInPutAndHash(t *testing.T) {
|
||||||
|
|
||||||
expErr := errors.New("some error")
|
expErr := errors.New("some error")
|
||||||
tc.testFrostFS.SetObjectPutError(tc.obj, expErr)
|
tc.testFrostFS.SetObjectPutError(tc.obj, expErr)
|
||||||
_, _, _, _, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo)
|
_, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo)
|
||||||
require.ErrorIs(t, err, expErr)
|
require.ErrorIs(t, err, expErr)
|
||||||
require.Empty(t, payload.Len(), "body must be read out otherwise goroutines can leak in wrapReader")
|
require.Empty(t, payload.Len(), "body must be read out otherwise goroutines can leak in wrapReader")
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,8 +125,12 @@ func (n *Layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
|
||||||
return oid.ID{}, err
|
return oid.ID{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
||||||
return id, err
|
if err != nil {
|
||||||
|
return oid.ID{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return createdObj.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion) (*data.LockInfo, error) {
|
func (n *Layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion) (*data.LockInfo, error) {
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -6,7 +6,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722061523-7e94a6adf2ba
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a
|
||||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||||
github.com/aws/aws-sdk-go v1.44.6
|
github.com/aws/aws-sdk-go v1.44.6
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -44,8 +44,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36 h1:MV/vKJWLQT34RRbXYvkNKFYGNjL5bRNuCQMXkbC7fLI=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722061523-7e94a6adf2ba h1:OP5fCRRQ5ndRmAYyuLr7rBOMz5nKrB+o9B1XZ+mm3XY=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36/go.mod h1:vluJ/+yQMcq8ZIZZSA7Te+JKClr0lgtRErjICvb8wto=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722061523-7e94a6adf2ba/go.mod h1:vluJ/+yQMcq8ZIZZSA7Te+JKClr0lgtRErjICvb8wto=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a h1:Bk1fB4cQASPKgAVGCdlBOEp5ohZfDxqK6fZM8eP+Emo=
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a h1:Bk1fB4cQASPKgAVGCdlBOEp5ohZfDxqK6fZM8eP+Emo=
|
||||||
|
|
|
@ -122,12 +122,17 @@ func (x *AuthmateFrostFS) CreateObject(ctx context.Context, prm tokens.PrmObject
|
||||||
attributes = append(attributes, [2]string{attr.Key(), attr.Value()})
|
attributes = append(attributes, [2]string{attr.Key(), attr.Value()})
|
||||||
}
|
}
|
||||||
|
|
||||||
return x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{
|
res, err := x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{
|
||||||
Container: prm.Container,
|
Container: prm.Container,
|
||||||
Filepath: prm.Filepath,
|
Filepath: prm.Filepath,
|
||||||
Attributes: attributes,
|
Attributes: attributes,
|
||||||
Payload: bytes.NewReader(prm.Payload),
|
Payload: bytes.NewReader(prm.Payload),
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return oid.ID{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.ObjectID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address) (*crdt.ObjectVersions, error) {
|
func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address) (*crdt.ObjectVersions, error) {
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
@ -51,7 +52,7 @@ func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TimeToEpoch implements frostfs.FrostFS interface method.
|
// TimeToEpoch implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
|
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
|
||||||
dur := futureTime.Sub(now)
|
dur := futureTime.Sub(now)
|
||||||
if dur < 0 {
|
if dur < 0 {
|
||||||
|
@ -87,7 +88,7 @@ func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (u
|
||||||
return curr, epoch, nil
|
return curr, epoch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Container implements frostfs.FrostFS interface method.
|
// Container implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) {
|
func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) {
|
||||||
prm := pool.PrmContainerGet{
|
prm := pool.PrmContainerGet{
|
||||||
ContainerID: layerPrm.ContainerID,
|
ContainerID: layerPrm.ContainerID,
|
||||||
|
@ -102,7 +103,7 @@ func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateContainer implements frostfs.FrostFS interface method.
|
// CreateContainer implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
|
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
|
||||||
var cnr container.Container
|
var cnr container.Container
|
||||||
cnr.Init()
|
cnr.Init()
|
||||||
|
@ -150,7 +151,7 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
|
||||||
}, handleObjectError("save container via connection pool", err)
|
}, handleObjectError("save container via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UserContainers implements frostfs.FrostFS interface method.
|
// UserContainers implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) {
|
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) {
|
||||||
prm := pool.PrmContainerList{
|
prm := pool.PrmContainerList{
|
||||||
OwnerID: layerPrm.UserID,
|
OwnerID: layerPrm.UserID,
|
||||||
|
@ -161,7 +162,7 @@ func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserCont
|
||||||
return r, handleObjectError("list user containers via connection pool", err)
|
return r, handleObjectError("list user containers via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteContainer implements frostfs.FrostFS interface method.
|
// DeleteContainer implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
|
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
|
||||||
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
|
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
|
||||||
|
|
||||||
|
@ -169,8 +170,8 @@ func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session
|
||||||
return handleObjectError("delete container via connection pool", err)
|
return handleObjectError("delete container via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateObject implements frostfs.FrostFS interface method.
|
// CreateObject implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, error) {
|
func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (*layer.CreateObjectResult, error) {
|
||||||
attrNum := len(prm.Attributes) + 1 // + creation time
|
attrNum := len(prm.Attributes) + 1 // + creation time
|
||||||
|
|
||||||
if prm.Filepath != "" {
|
if prm.Filepath != "" {
|
||||||
|
@ -237,8 +238,15 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
|
||||||
prmPut.UseKey(prm.PrivateKey)
|
prmPut.UseKey(prm.PrivateKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
idObj, err := x.pool.PutObject(ctx, prmPut)
|
res, err := x.pool.PutObject(ctx, prmPut)
|
||||||
return idObj, handleObjectError("save object via connection pool", err)
|
if err = handleObjectError("save object via connection pool", err); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &layer.CreateObjectResult{
|
||||||
|
ObjectID: res.ObjectID,
|
||||||
|
CreationEpoch: res.Epoch,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// wraps io.ReadCloser and transforms Read errors related to access violation
|
// wraps io.ReadCloser and transforms Read errors related to access violation
|
||||||
|
@ -255,7 +263,7 @@ func (x payloadReader) Read(p []byte) (int, error) {
|
||||||
return n, handleObjectError("read payload", err)
|
return n, handleObjectError("read payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadObject implements frostfs.FrostFS interface method.
|
// ReadObject implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
|
func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(prm.Container)
|
addr.SetContainer(prm.Container)
|
||||||
|
@ -340,7 +348,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObject implements frostfs.FrostFS interface method.
|
// DeleteObject implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error {
|
func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(prm.Container)
|
addr.SetContainer(prm.Container)
|
||||||
|
@ -359,7 +367,7 @@ func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) e
|
||||||
return handleObjectError("mark object removal via connection pool", err)
|
return handleObjectError("mark object removal via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchObjects implements frostfs.FrostFS interface method.
|
// SearchObjects implements layer.FrostFS interface method.
|
||||||
func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) {
|
func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) {
|
||||||
filters := object.NewSearchFilters()
|
filters := object.NewSearchFilters()
|
||||||
filters.AddRootFilter()
|
filters.AddRootFilter()
|
||||||
|
@ -396,6 +404,16 @@ func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch)
|
||||||
return buf, handleObjectError("read object list", err)
|
return buf, handleObjectError("read object list", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetworkInfo implements layer.FrostFS interface method.
|
||||||
|
func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
|
ni, err := x.pool.NetworkInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return ni, handleObjectError("get network info via connection pool", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ni, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
||||||
// It implements resolver.FrostFS.
|
// It implements resolver.FrostFS.
|
||||||
type ResolverFrostFS struct {
|
type ResolverFrostFS struct {
|
||||||
|
|
|
@ -94,6 +94,7 @@ const (
|
||||||
etagKV = "ETag"
|
etagKV = "ETag"
|
||||||
md5KV = "MD5"
|
md5KV = "MD5"
|
||||||
finishedKV = "Finished"
|
finishedKV = "Finished"
|
||||||
|
creationEpochKV = "CreationEpoch"
|
||||||
|
|
||||||
// keys for lock.
|
// keys for lock.
|
||||||
isLockKV = "IsLock"
|
isLockKV = "IsLock"
|
||||||
|
@ -266,6 +267,14 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
|
||||||
|
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.Uint64(creationEpochKV, epoch), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
version.CreationEpoch = epoch
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return version, nil
|
return version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,6 +317,14 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if creationEpoch, ok := treeNode.Get(creationEpochKV); ok {
|
||||||
|
if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.Uint64(creationEpochKV, epoch), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
multipartInfo.CreationEpoch = epoch
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return multipartInfo, nil
|
return multipartInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,6 +360,12 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
|
||||||
} else {
|
} else {
|
||||||
multipartInfo.Finished = isFinished
|
multipartInfo.Finished = isFinished
|
||||||
}
|
}
|
||||||
|
case creationEpochKV:
|
||||||
|
if epoch, err := strconv.ParseUint(string(kv.GetValue()), 10, 64); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.Uint64(creationEpochKV, epoch), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
multipartInfo.CreationEpoch = epoch
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
|
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
|
||||||
}
|
}
|
||||||
|
@ -642,7 +665,7 @@ func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepa
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
|
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
|
||||||
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
|
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
|
||||||
path := pathFromName(objectName)
|
path := pathFromName(objectName)
|
||||||
|
|
||||||
p := &GetNodesParams{
|
p := &GetNodesParams{
|
||||||
|
@ -1497,6 +1520,7 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
||||||
FileNameKey: path[len(path)-1],
|
FileNameKey: path[len(path)-1],
|
||||||
ownerKV: version.Owner.EncodeToString(),
|
ownerKV: version.Owner.EncodeToString(),
|
||||||
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
|
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
|
||||||
|
creationEpochKV: strconv.FormatUint(version.CreationEpoch, 10),
|
||||||
}
|
}
|
||||||
|
|
||||||
if version.Size > 0 {
|
if version.Size > 0 {
|
||||||
|
@ -1550,7 +1574,7 @@ func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.Bucke
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
|
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
|
||||||
keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
|
keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV}
|
||||||
path := pathFromName(filepath)
|
path := pathFromName(filepath)
|
||||||
p := &GetNodesParams{
|
p := &GetNodesParams{
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
|
@ -1608,6 +1632,7 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
|
||||||
if info.Finished {
|
if info.Finished {
|
||||||
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
|
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
|
||||||
}
|
}
|
||||||
|
info.Meta[creationEpochKV] = strconv.FormatUint(info.CreationEpoch, 10)
|
||||||
|
|
||||||
return info.Meta
|
return info.Meta
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue