[#412] Store creation epoch in tree service
All checks were successful
/ DCO (pull_request) Successful in 52s
/ Builds (1.20) (pull_request) Successful in 2m38s
/ Builds (1.21) (pull_request) Successful in 2m37s
/ Vulncheck (pull_request) Successful in 2m48s
/ Lint (pull_request) Successful in 4m17s
/ Tests (1.20) (pull_request) Successful in 2m58s
/ Tests (1.21) (pull_request) Successful in 2m45s

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-07-12 15:04:41 +03:00
parent 0e1ab11a1b
commit 5a2a2c2326
14 changed files with 106 additions and 47 deletions

View file

@ -72,6 +72,7 @@ type BaseNodeVersion struct {
Created *time.Time
Owner *user.ID
IsDeleteMarker bool
CreatedEpoch uint64
}
func (v *BaseNodeVersion) GetETag(md5Enabled bool) string {
@ -110,6 +111,7 @@ type MultipartInfo struct {
Meta map[string]string
CopiesNumbers []uint32
Finished bool
CreatedEpoch uint64
}
// PartInfo is upload information about part.

View file

@ -44,7 +44,7 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
CopiesNumber: p.CopiesNumbers,
}
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
_, objID, _, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return fmt.Errorf("put system object: %w", err)
}

View file

@ -231,15 +231,14 @@ type FrostFS interface {
// CreateObject creates and saves a parameterized object in the FrostFS container.
// It sets 'Timestamp' attribute to the current time.
// It returns the ID of the saved object.
// It returns the ID and creation epoch of the saved object.
//
// Creation time should be written into the object (UTC).
//
// It returns ErrAccessDenied on write access violation.
//
// It returns exactly one non-zero value. It returns any error encountered which
// prevented the container from being created.
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
// It returns any error encountered which prevented the object from being created.
CreateObject(context.Context, PrmObjectCreate) (oid.ID, uint64, error)
// DeleteObject marks the object to be removed from the FrostFS container by identifier.
// Successful return does not guarantee actual removal.
@ -265,4 +264,7 @@ type FrostFS interface {
//
// It returns any error encountered which prevented computing epochs.
TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error)
// NetworkInfo returns parameters of FrostFS network.
NetworkInfo(context.Context) (netmap.NetworkInfo, error)
}

View file

@ -18,6 +18,7 @@ import (
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
@ -237,10 +238,10 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
}
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, uint64, error) {
b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return oid.ID{}, err
return oid.ID{}, 0, err
}
var id oid.ID
id.SetSHA256(sha256.Sum256(b))
@ -248,7 +249,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
attrs := make([]object.Attribute, 0)
if err := t.objectPutErrors[prm.Filepath]; err != nil {
return oid.ID{}, err
return oid.ID{}, 0, err
}
if prm.Filepath != "" {
@ -293,7 +294,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
if prm.Payload != nil {
all, err := io.ReadAll(prm.Payload)
if err != nil {
return oid.ID{}, err
return oid.ID{}, 0, err
}
obj.SetPayload(all)
obj.SetPayloadSize(uint64(len(all)))
@ -307,7 +308,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
addr := newAddress(cnrID, objID)
t.objects[addr.EncodeToString()] = obj
return objID, nil
return objID, t.currentEpoch - 1, nil
}
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
@ -386,6 +387,13 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) ([]o
return res, nil
}
func (t *TestFrostFS) NetworkInfo(context.Context) (netmap.NetworkInfo, error) {
ni := netmap.NetworkInfo{}
ni.SetCurrentEpoch(t.currentEpoch)
return ni, nil
}
func (t *TestFrostFS) checkAccess(cnrID cid.ID, owner user.ID) bool {
cnr, ok := t.containers[cnrID.EncodeToString()]
if !ok {

View file

@ -43,7 +43,7 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke
prm.Container = lifecycleBkt.CID
_, objID, _, md5, err := n.objectPutAndHash(ctx, prm, lifecycleBkt)
_, objID, _, md5, _, err := n.objectPutAndHash(ctx, prm, lifecycleBkt)
if err != nil {
return err
}

View file

@ -150,6 +150,11 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
metaSize += len(p.Data.TagSet)
}
networkInfo, err := n.frostFS.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info: %w", err)
}
info := &data.MultipartInfo{
Key: p.Info.Key,
UploadID: p.Info.UploadID,
@ -157,6 +162,7 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
Created: TimeNow(ctx),
Meta: make(map[string]string, metaSize),
CopiesNumbers: p.CopiesNumbers,
CreatedEpoch: networkInfo.CurrentEpoch(),
}
for key, val := range p.Header {
@ -229,7 +235,7 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
size, id, hash, md5Hash, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil {
return nil, err
}

View file

@ -273,7 +273,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
size, id, hash, md5Hash, epoch, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, err
}
@ -309,12 +309,13 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
now := TimeNow(ctx)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: id,
ETag: hex.EncodeToString(hash),
FilePath: p.Object,
Size: p.Size,
Created: &now,
Owner: &n.gateOwner,
OID: id,
ETag: hex.EncodeToString(hash),
FilePath: p.Object,
Size: p.Size,
Created: &now,
Owner: &n.gateOwner,
CreatedEpoch: epoch,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
IsCombined: p.Header[MultipartObjectSize] != "",
@ -493,8 +494,8 @@ func (n *Layer) objectDeleteBase(ctx context.Context, bktInfo *data.BucketInfo,
}
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
// Returns object ID and payload sha256 hash.
func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
// Returns object size, ID, payload sha256 and md5 hashes, creation epoch.
func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, uint64, error) {
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
prm.ClientCut = n.features.ClientCut()
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
@ -507,15 +508,15 @@ func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
hash.Write(buf)
md5Hash.Write(buf)
})
id, err := n.frostFS.CreateObject(ctx, prm)
id, epoch, err := n.frostFS.CreateObject(ctx, prm)
if err != nil {
if _, errDiscard := io.Copy(io.Discard, prm.Payload); errDiscard != nil {
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
}
return 0, oid.ID{}, nil, nil, err
return 0, oid.ID{}, nil, nil, 0, err
}
return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil
return size, id, hash.Sum(nil), md5Hash.Sum(nil), epoch, nil
}
type logWrapper struct {

View file

@ -44,7 +44,7 @@ func TestGoroutinesDontLeakInPutAndHash(t *testing.T) {
expErr := errors.New("some error")
tc.testFrostFS.SetObjectPutError(tc.obj, expErr)
_, _, _, _, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo)
_, _, _, _, _, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo)
require.ErrorIs(t, err, expErr)
require.Empty(t, payload.Len(), "body must be read out otherwise goroutines can leak in wrapReader")
}

View file

@ -125,7 +125,7 @@ func (n *Layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
return oid.ID{}, err
}
_, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
_, id, _, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
return id, err
}

2
go.mod
View file

@ -37,6 +37,8 @@ require (
google.golang.org/protobuf v1.33.0
)
replace git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240531132048-ebd8fcd1685f => git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240712115130-0858160c8824
require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect

4
go.sum
View file

@ -44,8 +44,6 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240531132048-ebd8fcd1685f h1:vBLC1OSGMSn7lRJv/p1of0veifuBdZdztVrF9Vn+UFk=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240531132048-ebd8fcd1685f/go.mod h1:4AObM67VUqkXQJlODTFThFnuMGEuK8h9DrAXHDZqvCU=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a h1:Bk1fB4cQASPKgAVGCdlBOEp5ohZfDxqK6fZM8eP+Emo=
@ -56,6 +54,8 @@ git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjq
git.frostfs.info/TrueCloudLab/tzhash v1.8.0/go.mod h1:dhY+oy274hV8wGvGL4MwwMpdL3GYvaX1a8GQZQHvlF8=
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 h1:HeY8n27VyPRQe49l/fzyVMkWEB2fsLJYKp64pwA7tz4=
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02/go.mod h1:rQFJJdEOV7KbbMtQYR2lNfiZk+ONRDJSbMCTWxKt8Fw=
git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240712115130-0858160c8824 h1:dw1wsBdC8ZwA+rrBK/G3YNN3zULvWOFWyMXLtCcoj7s=
git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240712115130-0858160c8824/go.mod h1:4AObM67VUqkXQJlODTFThFnuMGEuK8h9DrAXHDZqvCU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=

View file

@ -122,12 +122,14 @@ func (x *AuthmateFrostFS) CreateObject(ctx context.Context, prm tokens.PrmObject
attributes = append(attributes, [2]string{attr.Key(), attr.Value()})
}
return x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{
objID, _, err := x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{
Container: prm.Container,
Filepath: prm.Filepath,
Attributes: attributes,
Payload: bytes.NewReader(prm.Payload),
})
return objID, err
}
func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address) (*crdt.ObjectVersions, error) {

View file

@ -15,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
@ -51,7 +52,7 @@ func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
}
}
// TimeToEpoch implements frostfs.FrostFS interface method.
// TimeToEpoch implements layer.FrostFS interface method.
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
dur := futureTime.Sub(now)
if dur < 0 {
@ -87,7 +88,7 @@ func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (u
return curr, epoch, nil
}
// Container implements frostfs.FrostFS interface method.
// Container implements layer.FrostFS interface method.
func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) {
prm := pool.PrmContainerGet{
ContainerID: layerPrm.ContainerID,
@ -102,7 +103,7 @@ func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*
return &res, nil
}
// CreateContainer implements frostfs.FrostFS interface method.
// CreateContainer implements layer.FrostFS interface method.
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
var cnr container.Container
cnr.Init()
@ -150,7 +151,7 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
}, handleObjectError("save container via connection pool", err)
}
// UserContainers implements frostfs.FrostFS interface method.
// UserContainers implements layer.FrostFS interface method.
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) {
prm := pool.PrmContainerList{
OwnerID: layerPrm.UserID,
@ -161,7 +162,7 @@ func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserCont
return r, handleObjectError("list user containers via connection pool", err)
}
// DeleteContainer implements frostfs.FrostFS interface method.
// DeleteContainer implements layer.FrostFS interface method.
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
@ -169,8 +170,8 @@ func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session
return handleObjectError("delete container via connection pool", err)
}
// CreateObject implements frostfs.FrostFS interface method.
func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, error) {
// CreateObject implements layer.FrostFS interface method.
func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, uint64, error) {
attrNum := len(prm.Attributes) + 1 // + creation time
if prm.Filepath != "" {
@ -237,8 +238,8 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
prmPut.UseKey(prm.PrivateKey)
}
idObj, err := x.pool.PutObject(ctx, prmPut)
return idObj, handleObjectError("save object via connection pool", err)
idObj, epoch, err := x.pool.PutObject(ctx, prmPut)
return idObj, epoch, handleObjectError("save object via connection pool", err)
}
// wraps io.ReadCloser and transforms Read errors related to access violation
@ -255,7 +256,7 @@ func (x payloadReader) Read(p []byte) (int, error) {
return n, handleObjectError("read payload", err)
}
// ReadObject implements frostfs.FrostFS interface method.
// ReadObject implements layer.FrostFS interface method.
func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
@ -340,7 +341,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
}, nil
}
// DeleteObject implements frostfs.FrostFS interface method.
// DeleteObject implements layer.FrostFS interface method.
func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error {
var addr oid.Address
addr.SetContainer(prm.Container)
@ -359,7 +360,7 @@ func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) e
return handleObjectError("mark object removal via connection pool", err)
}
// SearchObjects implements frostfs.FrostFS interface method.
// SearchObjects implements layer.FrostFS interface method.
func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
@ -396,6 +397,16 @@ func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch)
return buf, handleObjectError("read object list", err)
}
// NetworkInfo implements layer.FrostFS interface method.
func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
ni, err := x.pool.NetworkInfo(ctx)
if err != nil {
return ni, handleObjectError("get network info via connection pool", err)
}
return ni, nil
}
// ResolverFrostFS represents virtual connection to the FrostFS network.
// It implements resolver.FrostFS.
type ResolverFrostFS struct {

View file

@ -94,6 +94,7 @@ const (
etagKV = "ETag"
md5KV = "MD5"
finishedKV = "Finished"
createdEpochKV = "CreatedEpoch"
// keys for lock.
isLockKV = "IsLock"
@ -234,6 +235,14 @@ func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *tree
}
}
if createdEpoch, ok := treeNode.Get(createdEpochKV); ok {
if epoch, err := strconv.ParseUint(createdEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.Uint64(createdEpochKV, epoch), zap.Error(err))
} else {
version.CreatedEpoch = epoch
}
}
return version
}
@ -272,6 +281,14 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
}
}
if createdEpoch, ok := treeNode.Get(createdEpochKV); ok {
if epoch, err := strconv.ParseUint(createdEpoch, 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.Uint64(createdEpochKV, epoch), zap.Error(err))
} else {
multipartInfo.CreatedEpoch = epoch
}
}
return multipartInfo, nil
}
@ -303,6 +320,12 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
} else {
multipartInfo.Finished = isFinished
}
case createdEpochKV:
if epoch, err := strconv.ParseUint(string(kv.GetValue()), 10, 64); err != nil {
log.Warn(logs.InvalidTreeKV, zap.Uint64(createdEpochKV, epoch), zap.Error(err))
} else {
multipartInfo.CreatedEpoch = epoch
}
default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
}
@ -580,7 +603,7 @@ func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepa
}
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
meta := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, createdEpochKV}
path := pathFromName(objectName)
p := &GetNodesParams{
@ -1331,10 +1354,11 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
path := pathFromName(version.FilePath)
meta := map[string]string{
oidKV: version.OID.EncodeToString(),
FileNameKey: path[len(path)-1],
ownerKV: version.Owner.EncodeToString(),
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
oidKV: version.OID.EncodeToString(),
FileNameKey: path[len(path)-1],
ownerKV: version.Owner.EncodeToString(),
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
createdEpochKV: strconv.FormatUint(version.CreatedEpoch, 10),
}
if version.Size > 0 {
@ -1388,7 +1412,7 @@ func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.Bucke
}
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
keysToReturn := []string{oidKV, isCombinedKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV, createdEpochKV}
path := pathFromName(filepath)
p := &GetNodesParams{
BktInfo: bktInfo,
@ -1446,6 +1470,7 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
}
info.Meta[createdEpochKV] = strconv.FormatUint(info.CreatedEpoch, 10)
return info.Meta
}