diff --git a/api/data/info.go b/api/data/info.go index 34d9bd1..ecc1607 100644 --- a/api/data/info.go +++ b/api/data/info.go @@ -82,6 +82,15 @@ type ( VersionID string NoErrorOnDeleteMarker bool } + + // CreatedObjectInfo stores created object info. + CreatedObjectInfo struct { + ID oid.ID + Size uint64 + HashSum []byte + MD5Sum []byte + CreationEpoch uint64 + } ) // SettingsObjectName is a system name for a bucket settings file. diff --git a/api/data/tree.go b/api/data/tree.go index 0164062..3fd5d1e 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -72,6 +72,7 @@ type BaseNodeVersion struct { Created *time.Time Owner *user.ID IsDeleteMarker bool + CreationEpoch uint64 } func (v *BaseNodeVersion) GetETag(md5Enabled bool) string { @@ -110,6 +111,7 @@ type MultipartInfo struct { Meta map[string]string CopiesNumbers []uint32 Finished bool + CreationEpoch uint64 } // PartInfo is upload information about part. diff --git a/api/layer/cors.go b/api/layer/cors.go index ce3c0c9..6c969c4 100644 --- a/api/layer/cors.go +++ b/api/layer/cors.go @@ -44,12 +44,12 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error { CopiesNumber: p.CopiesNumbers, } - _, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo) + createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo) if err != nil { return fmt.Errorf("put system object: %w", err) } - objIDToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, objID) + objIDToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, createdObj.ID) objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove) if err != nil && !objIDToDeleteNotFound { return err diff --git a/api/layer/frostfs.go b/api/layer/frostfs.go index b3c2c87..eb2a728 100644 --- a/api/layer/frostfs.go +++ b/api/layer/frostfs.go @@ -148,6 +148,12 @@ type PrmObjectCreate struct { BufferMaxSize uint64 } +// CreateObjectResult is a result parameter of FrostFS.CreateObject operation. +type CreateObjectResult struct { + ObjectID oid.ID + CreationEpoch uint64 +} + // PrmObjectDelete groups parameters of FrostFS.DeleteObject operation. type PrmObjectDelete struct { // Authentication parameters. @@ -231,15 +237,15 @@ type FrostFS interface { // CreateObject creates and saves a parameterized object in the FrostFS container. // It sets 'Timestamp' attribute to the current time. - // It returns the ID of the saved object. + // It returns the ID and creation epoch of the saved object. // // Creation time should be written into the object (UTC). // // It returns ErrAccessDenied on write access violation. // - // It returns exactly one non-zero value. It returns any error encountered which - // prevented the container from being created. - CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) + // It returns exactly one non-nil value. It returns any error encountered which + // prevented the object from being created. + CreateObject(context.Context, PrmObjectCreate) (*CreateObjectResult, error) // DeleteObject marks the object to be removed from the FrostFS container by identifier. // Successful return does not guarantee actual removal. @@ -265,4 +271,7 @@ type FrostFS interface { // // It returns any error encountered which prevented computing epochs. TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error) + + // NetworkInfo returns parameters of FrostFS network. + NetworkInfo(context.Context) (netmap.NetworkInfo, error) } diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index d360020..9552ca9 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -18,6 +18,7 @@ import ( apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" @@ -237,10 +238,10 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr) } -func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) { +func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (*CreateObjectResult, error) { b := make([]byte, 32) if _, err := io.ReadFull(rand.Reader, b); err != nil { - return oid.ID{}, err + return nil, err } var id oid.ID id.SetSHA256(sha256.Sum256(b)) @@ -248,7 +249,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid. attrs := make([]object.Attribute, 0) if err := t.objectPutErrors[prm.Filepath]; err != nil { - return oid.ID{}, err + return nil, err } if prm.Filepath != "" { @@ -293,7 +294,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid. if prm.Payload != nil { all, err := io.ReadAll(prm.Payload) if err != nil { - return oid.ID{}, err + return nil, err } obj.SetPayload(all) obj.SetPayloadSize(uint64(len(all))) @@ -307,7 +308,10 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid. addr := newAddress(cnrID, objID) t.objects[addr.EncodeToString()] = obj - return objID, nil + return &CreateObjectResult{ + ObjectID: objID, + CreationEpoch: t.currentEpoch - 1, + }, nil } func (t *TestFrostFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { @@ -386,6 +390,13 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) ([]o return res, nil } +func (t *TestFrostFS) NetworkInfo(context.Context) (netmap.NetworkInfo, error) { + ni := netmap.NetworkInfo{} + ni.SetCurrentEpoch(t.currentEpoch) + + return ni, nil +} + func (t *TestFrostFS) checkAccess(cnrID cid.ID, owner user.ID) bool { cnr, ok := t.containers[cnrID.EncodeToString()] if !ok { diff --git a/api/layer/lifecycle.go b/api/layer/lifecycle.go index 3d4afe9..7cc9bb7 100644 --- a/api/layer/lifecycle.go +++ b/api/layer/lifecycle.go @@ -43,7 +43,7 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke prm.Container = lifecycleBkt.CID - _, objID, _, md5, err := n.objectPutAndHash(ctx, prm, lifecycleBkt) + createdObj, err := n.objectPutAndHash(ctx, prm, lifecycleBkt) if err != nil { return err } @@ -53,13 +53,13 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke return apiErr.GetAPIError(apiErr.ErrInvalidDigest) } - if !bytes.Equal(hashBytes, md5) { - n.deleteLifecycleObject(ctx, lifecycleBkt, objID) + if !bytes.Equal(hashBytes, createdObj.MD5Sum) { + n.deleteLifecycleObject(ctx, lifecycleBkt, createdObj.ID) return apiErr.GetAPIError(apiErr.ErrInvalidDigest) } - objIDToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, objID) + objIDToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, createdObj.ID) objIDToDeleteNotFound := errors.Is(err, ErrNoNodeToRemove) if err != nil && !objIDToDeleteNotFound { return err diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index aeb6759..af30154 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -150,6 +150,11 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar metaSize += len(p.Data.TagSet) } + networkInfo, err := n.frostFS.NetworkInfo(ctx) + if err != nil { + return fmt.Errorf("get network info: %w", err) + } + info := &data.MultipartInfo{ Key: p.Info.Key, UploadID: p.Info.UploadID, @@ -157,6 +162,7 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar Created: TimeNow(ctx), Meta: make(map[string]string, metaSize), CopiesNumbers: p.CopiesNumbers, + CreationEpoch: networkInfo.CurrentEpoch(), } for key, val := range p.Header { @@ -229,7 +235,7 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) - size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo) + createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo) if err != nil { return nil, err } @@ -238,21 +244,21 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf if err != nil { return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest) } - if hex.EncodeToString(hashBytes) != hex.EncodeToString(md5Hash) { + if hex.EncodeToString(hashBytes) != hex.EncodeToString(createdObj.MD5Sum) { prm := PrmObjectDelete{ - Object: id, + Object: createdObj.ID, Container: bktInfo.CID, } n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) err = n.frostFS.DeleteObject(ctx, prm) if err != nil { - n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id)) + n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID)) } return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest) } } if p.Info.Encryption.Enabled() { - size = decSize + createdObj.Size = decSize } if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) { @@ -260,10 +266,10 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf if err != nil { return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch) } - if !bytes.Equal(contentHashBytes, hash) { - err = n.objectDelete(ctx, bktInfo, id) + if !bytes.Equal(contentHashBytes, createdObj.HashSum) { + err = n.objectDelete(ctx, bktInfo, createdObj.ID) if err != nil { - n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id)) + n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID)) } return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch) } @@ -271,17 +277,17 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf n.reqLogger(ctx).Debug(logs.UploadPart, zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber), - zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id)) + zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID)) partInfo := &data.PartInfo{ Key: p.Info.Key, UploadID: p.Info.UploadID, Number: p.PartNumber, - OID: id, - Size: size, - ETag: hex.EncodeToString(hash), + OID: createdObj.ID, + Size: createdObj.Size, + ETag: hex.EncodeToString(createdObj.HashSum), Created: prm.CreationTime, - MD5: hex.EncodeToString(md5Hash), + MD5: hex.EncodeToString(createdObj.MD5Sum), } oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) @@ -298,7 +304,7 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf } objInfo := &data.ObjectInfo{ - ID: id, + ID: createdObj.ID, CID: bktInfo.CID, Owner: bktInfo.Owner, diff --git a/api/layer/object.go b/api/layer/object.go index 5575fba..24314ec 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -273,7 +273,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend prm.Attributes = append(prm.Attributes, [2]string{k, v}) } - size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo) + createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo) if err != nil { return nil, err } @@ -282,10 +282,10 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend if err != nil { return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest) } - if !bytes.Equal(headerMd5Hash, md5Hash) { - err = n.objectDelete(ctx, p.BktInfo, id) + if !bytes.Equal(headerMd5Hash, createdObj.MD5Sum) { + err = n.objectDelete(ctx, p.BktInfo, createdObj.ID) if err != nil { - n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id)) + n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID)) } return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest) } @@ -296,25 +296,26 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend if err != nil { return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch) } - if !bytes.Equal(contentHashBytes, hash) { - err = n.objectDelete(ctx, p.BktInfo, id) + if !bytes.Equal(contentHashBytes, createdObj.HashSum) { + err = n.objectDelete(ctx, p.BktInfo, createdObj.ID) if err != nil { - n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id)) + n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID)) } return nil, apiErrors.GetAPIError(apiErrors.ErrContentSHA256Mismatch) } } - n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id)) + n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", createdObj.ID)) now := TimeNow(ctx) newVersion := &data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ - OID: id, - ETag: hex.EncodeToString(hash), - FilePath: p.Object, - Size: p.Size, - Created: &now, - Owner: &n.gateOwner, + OID: createdObj.ID, + ETag: hex.EncodeToString(createdObj.HashSum), + FilePath: p.Object, + Size: p.Size, + Created: &now, + Owner: &n.gateOwner, + CreationEpoch: createdObj.CreationEpoch, }, IsUnversioned: !bktSettings.VersioningEnabled(), IsCombined: p.Header[MultipartObjectSize] != "", @@ -322,7 +323,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend if len(p.CompleteMD5Hash) > 0 { newVersion.MD5 = p.CompleteMD5Hash } else { - newVersion.MD5 = hex.EncodeToString(md5Hash) + newVersion.MD5 = hex.EncodeToString(createdObj.MD5Sum) } if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { @@ -334,7 +335,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend ObjVersion: &data.ObjectVersion{ BktInfo: p.BktInfo, ObjectName: p.Object, - VersionID: id.EncodeToString(), + VersionID: createdObj.ID.EncodeToString(), }, NewLock: p.Lock, CopiesNumbers: p.CopiesNumbers, @@ -349,13 +350,13 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend n.cache.CleanListCacheEntriesContainingObject(p.Object, p.BktInfo.CID) objInfo := &data.ObjectInfo{ - ID: id, + ID: createdObj.ID, CID: p.BktInfo.CID, Owner: n.gateOwner, Bucket: p.BktInfo.Name, Name: p.Object, - Size: size, + Size: createdObj.Size, Created: prm.CreationTime, Headers: p.Header, ContentType: p.Header[api.ContentType], @@ -493,8 +494,7 @@ func (n *Layer) objectDeleteBase(ctx context.Context, bktInfo *data.BucketInfo, } // objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject. -// Returns object ID and payload sha256 hash. -func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) { +func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (*data.CreatedObjectInfo, error) { n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) prm.ClientCut = n.features.ClientCut() prm.BufferMaxSize = n.features.BufferMaxSizeForPut() @@ -507,15 +507,21 @@ func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn hash.Write(buf) md5Hash.Write(buf) }) - id, err := n.frostFS.CreateObject(ctx, prm) + res, err := n.frostFS.CreateObject(ctx, prm) if err != nil { if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil { n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard)) } - return 0, oid.ID{}, nil, nil, err + return nil, err } - return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil + return &data.CreatedObjectInfo{ + ID: res.ObjectID, + Size: size, + HashSum: hash.Sum(nil), + MD5Sum: md5Hash.Sum(nil), + CreationEpoch: res.CreationEpoch, + }, nil } type logWrapper struct { diff --git a/api/layer/object_test.go b/api/layer/object_test.go index 21c8d2e..d1c668b 100644 --- a/api/layer/object_test.go +++ b/api/layer/object_test.go @@ -44,7 +44,7 @@ func TestGoroutinesDontLeakInPutAndHash(t *testing.T) { expErr := errors.New("some error") tc.testFrostFS.SetObjectPutError(tc.obj, expErr) - _, _, _, _, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo) + _, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo) require.ErrorIs(t, err, expErr) require.Empty(t, payload.Len(), "body must be read out otherwise goroutines can leak in wrapReader") } diff --git a/api/layer/system_object.go b/api/layer/system_object.go index aa5fa9c..18332d6 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -125,8 +125,12 @@ func (n *Layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj return oid.ID{}, err } - _, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo) - return id, err + createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo) + if err != nil { + return oid.ID{}, err + } + + return createdObj.ID, nil } func (n *Layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion) (*data.LockInfo, error) { diff --git a/go.mod b/go.mod index 7d3cd64..529c9a4 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164 git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36 + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722061523-7e94a6adf2ba git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 github.com/aws/aws-sdk-go v1.44.6 diff --git a/go.sum b/go.sum index a3ec594..16713bd 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE= -git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36 h1:MV/vKJWLQT34RRbXYvkNKFYGNjL5bRNuCQMXkbC7fLI= -git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36/go.mod h1:vluJ/+yQMcq8ZIZZSA7Te+JKClr0lgtRErjICvb8wto= +git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722061523-7e94a6adf2ba h1:OP5fCRRQ5ndRmAYyuLr7rBOMz5nKrB+o9B1XZ+mm3XY= +git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722061523-7e94a6adf2ba/go.mod h1:vluJ/+yQMcq8ZIZZSA7Te+JKClr0lgtRErjICvb8wto= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a h1:Bk1fB4cQASPKgAVGCdlBOEp5ohZfDxqK6fZM8eP+Emo= diff --git a/internal/frostfs/authmate.go b/internal/frostfs/authmate.go index 008a999..a38ef6d 100644 --- a/internal/frostfs/authmate.go +++ b/internal/frostfs/authmate.go @@ -122,12 +122,17 @@ func (x *AuthmateFrostFS) CreateObject(ctx context.Context, prm tokens.PrmObject attributes = append(attributes, [2]string{attr.Key(), attr.Value()}) } - return x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{ + res, err := x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{ Container: prm.Container, Filepath: prm.Filepath, Attributes: attributes, Payload: bytes.NewReader(prm.Payload), }) + if err != nil { + return oid.ID{}, err + } + + return res.ObjectID, nil } func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address) (*crdt.ObjectVersions, error) { diff --git a/internal/frostfs/frostfs.go b/internal/frostfs/frostfs.go index a6b3214..ccdf68b 100644 --- a/internal/frostfs/frostfs.go +++ b/internal/frostfs/frostfs.go @@ -15,6 +15,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" @@ -51,7 +52,7 @@ func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS { } } -// TimeToEpoch implements frostfs.FrostFS interface method. +// TimeToEpoch implements layer.FrostFS interface method. func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) { dur := futureTime.Sub(now) if dur < 0 { @@ -87,7 +88,7 @@ func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (u return curr, epoch, nil } -// Container implements frostfs.FrostFS interface method. +// Container implements layer.FrostFS interface method. func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) { prm := pool.PrmContainerGet{ ContainerID: layerPrm.ContainerID, @@ -102,7 +103,7 @@ func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (* return &res, nil } -// CreateContainer implements frostfs.FrostFS interface method. +// CreateContainer implements layer.FrostFS interface method. func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) { var cnr container.Container cnr.Init() @@ -150,7 +151,7 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre }, handleObjectError("save container via connection pool", err) } -// UserContainers implements frostfs.FrostFS interface method. +// UserContainers implements layer.FrostFS interface method. func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) { prm := pool.PrmContainerList{ OwnerID: layerPrm.UserID, @@ -161,7 +162,7 @@ func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserCont return r, handleObjectError("list user containers via connection pool", err) } -// DeleteContainer implements frostfs.FrostFS interface method. +// DeleteContainer implements layer.FrostFS interface method. func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error { prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await} @@ -169,8 +170,8 @@ func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session return handleObjectError("delete container via connection pool", err) } -// CreateObject implements frostfs.FrostFS interface method. -func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, error) { +// CreateObject implements layer.FrostFS interface method. +func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (*layer.CreateObjectResult, error) { attrNum := len(prm.Attributes) + 1 // + creation time if prm.Filepath != "" { @@ -237,8 +238,15 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) ( prmPut.UseKey(prm.PrivateKey) } - idObj, err := x.pool.PutObject(ctx, prmPut) - return idObj, handleObjectError("save object via connection pool", err) + res, err := x.pool.PutObject(ctx, prmPut) + if err = handleObjectError("save object via connection pool", err); err != nil { + return nil, err + } + + return &layer.CreateObjectResult{ + ObjectID: res.ObjectID, + CreationEpoch: res.Epoch, + }, nil } // wraps io.ReadCloser and transforms Read errors related to access violation @@ -255,7 +263,7 @@ func (x payloadReader) Read(p []byte) (int, error) { return n, handleObjectError("read payload", err) } -// ReadObject implements frostfs.FrostFS interface method. +// ReadObject implements layer.FrostFS interface method. func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) { var addr oid.Address addr.SetContainer(prm.Container) @@ -340,7 +348,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay }, nil } -// DeleteObject implements frostfs.FrostFS interface method. +// DeleteObject implements layer.FrostFS interface method. func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error { var addr oid.Address addr.SetContainer(prm.Container) @@ -359,7 +367,7 @@ func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) e return handleObjectError("mark object removal via connection pool", err) } -// SearchObjects implements frostfs.FrostFS interface method. +// SearchObjects implements layer.FrostFS interface method. func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) { filters := object.NewSearchFilters() filters.AddRootFilter() @@ -396,6 +404,16 @@ func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) return buf, handleObjectError("read object list", err) } +// NetworkInfo implements layer.FrostFS interface method. +func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { + ni, err := x.pool.NetworkInfo(ctx) + if err != nil { + return ni, handleObjectError("get network info via connection pool", err) + } + + return ni, nil +} + // ResolverFrostFS represents virtual connection to the FrostFS network. // It implements resolver.FrostFS. type ResolverFrostFS struct { diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 8492f75..9873e15 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -94,6 +94,7 @@ const ( etagKV = "ETag" md5KV = "MD5" finishedKV = "Finished" + creationEpochKV = "CreationEpoch" // keys for lock. isLockKV = "IsLock" @@ -266,6 +267,14 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree } } + if creationEpoch, ok := treeNode.Get(creationEpochKV); ok { + if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil { + log.Warn(logs.InvalidTreeKV, zap.Uint64(creationEpochKV, epoch), zap.Error(err)) + } else { + version.CreationEpoch = epoch + } + } + return version, nil } @@ -308,6 +317,14 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr } } + if creationEpoch, ok := treeNode.Get(creationEpochKV); ok { + if epoch, err := strconv.ParseUint(creationEpoch, 10, 64); err != nil { + log.Warn(logs.InvalidTreeKV, zap.Uint64(creationEpochKV, epoch), zap.Error(err)) + } else { + multipartInfo.CreationEpoch = epoch + } + } + return multipartInfo, nil } @@ -343,6 +360,12 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, } else { multipartInfo.Finished = isFinished } + case creationEpochKV: + if epoch, err := strconv.ParseUint(string(kv.GetValue()), 10, 64); err != nil { + log.Warn(logs.InvalidTreeKV, zap.Uint64(creationEpochKV, epoch), zap.Error(err)) + } else { + multipartInfo.CreationEpoch = epoch + } default: multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) } @@ -642,7 +665,7 @@ func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepa } func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) { - meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV} + meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV} path := pathFromName(objectName) p := &GetNodesParams{ @@ -1493,10 +1516,11 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) { path := pathFromName(version.FilePath) meta := map[string]string{ - oidKV: version.OID.EncodeToString(), - FileNameKey: path[len(path)-1], - ownerKV: version.Owner.EncodeToString(), - createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10), + oidKV: version.OID.EncodeToString(), + FileNameKey: path[len(path)-1], + ownerKV: version.Owner.EncodeToString(), + createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10), + creationEpochKV: strconv.FormatUint(version.CreationEpoch, 10), } if version.Size > 0 { @@ -1550,7 +1574,7 @@ func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.Bucke } func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) { - keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV} + keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, creationEpochKV} path := pathFromName(filepath) p := &GetNodesParams{ BktInfo: bktInfo, @@ -1608,6 +1632,7 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str if info.Finished { info.Meta[finishedKV] = strconv.FormatBool(info.Finished) } + info.Meta[creationEpochKV] = strconv.FormatUint(info.CreationEpoch, 10) return info.Meta }