[#20] Add logs for processing container and deleting objects
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
5ffed0b96f
commit
4f250eba42
2 changed files with 16 additions and 1 deletions
|
@ -122,6 +122,7 @@ LOOP:
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Executor) worker(ctx context.Context, job Job) error {
|
func (e *Executor) worker(ctx context.Context, job Job) error {
|
||||||
|
e.log.Debug(logs.ProcessingJob, zap.String("user", job.PrivateKey.Address()), zap.String("cid", job.ContainerID.EncodeToString()))
|
||||||
ctx = addBearerToContext(ctx, job.Bearer)
|
ctx = addBearerToContext(ctx, job.Bearer)
|
||||||
|
|
||||||
var userID user.ID
|
var userID user.ID
|
||||||
|
@ -277,7 +278,7 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru
|
||||||
|
|
||||||
objectStream, err := e.tree.InitVersionsByPrefixStream(ctx, bktInfo, "", false)
|
objectStream, err := e.tree.InitVersionsByPrefixStream(ctx, bktInfo, "", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("list multiparts: %w", err)
|
return fmt.Errorf("list versions: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -577,6 +578,8 @@ func (e *Executor) deleteObject(ctx context.Context, version *data.NodeVersion,
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(version.OID)
|
addr.SetObject(version.OID)
|
||||||
|
|
||||||
|
e.log.Debug(logs.DeleteObject, zap.String("address", addr.EncodeToString()), zap.String("filepath", version.FilePath))
|
||||||
if err := e.frostfs.DeleteObject(ctx, addr); err != nil && !isNotFound(err) {
|
if err := e.frostfs.DeleteObject(ctx, addr); err != nil && !isNotFound(err) {
|
||||||
e.log.Warn(logs.DeleteObjectVersionFromStorage, zap.String("key", version.FilePath),
|
e.log.Warn(logs.DeleteObjectVersionFromStorage, zap.String("key", version.FilePath),
|
||||||
zap.String("address", addr.EncodeToString()), zap.Error(err))
|
zap.String("address", addr.EncodeToString()), zap.Error(err))
|
||||||
|
@ -594,6 +597,9 @@ func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersio
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.log.Debug(logs.AddDeleteMarker, zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("oid", randOID.EncodeToString()),
|
||||||
|
zap.String("filepath", version.FilePath))
|
||||||
|
|
||||||
now := nowTime()
|
now := nowTime()
|
||||||
newVersion := &data.NodeVersion{
|
newVersion := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
|
@ -614,6 +620,8 @@ func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersio
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Executor) removeVersion(ctx context.Context, version *data.NodeVersion, bktInfo *data.BucketInfo) {
|
func (e *Executor) removeVersion(ctx context.Context, version *data.NodeVersion, bktInfo *data.BucketInfo) {
|
||||||
|
e.log.Debug(logs.RemoveVersion, zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("oid", version.OID.EncodeToString()),
|
||||||
|
zap.String("filepath", version.FilePath), zap.Uint64("node_id", version.ID))
|
||||||
if err := e.tree.RemoveVersion(ctx, bktInfo, version.ID); err != nil {
|
if err := e.tree.RemoveVersion(ctx, bktInfo, version.ID); err != nil {
|
||||||
e.log.Warn(logs.DeleteObjectVersionFromTree, zap.String("key", version.FilePath),
|
e.log.Warn(logs.DeleteObjectVersionFromTree, zap.String("key", version.FilePath),
|
||||||
zap.Uint64("id", version.ID), zap.Error(err))
|
zap.Uint64("id", version.ID), zap.Error(err))
|
||||||
|
@ -687,6 +695,8 @@ func versionCreationEpoch(version *data.NodeVersion, ni *netmap.NetworkInfo) (ui
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Executor) abortMultipart(ctx context.Context, bktInfo *data.BucketInfo, multipart *data.MultipartInfo) error {
|
func (e *Executor) abortMultipart(ctx context.Context, bktInfo *data.BucketInfo, multipart *data.MultipartInfo) error {
|
||||||
|
e.log.Debug(logs.AbortMultipart, zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("key", multipart.Key),
|
||||||
|
zap.String("upload_id", multipart.UploadID), zap.Uint64("node_id", multipart.ID))
|
||||||
parts, err := e.tree.GetParts(ctx, bktInfo, multipart.ID)
|
parts, err := e.tree.GetParts(ctx, bktInfo, multipart.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get parts: %w", err)
|
return fmt.Errorf("get parts: %w", err)
|
||||||
|
@ -696,6 +706,7 @@ func (e *Executor) abortMultipart(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
addr.SetObject(part.OID)
|
addr.SetObject(part.OID)
|
||||||
|
e.log.Debug(logs.DeleteObject, zap.String("address", addr.EncodeToString()), zap.Int("part", part.Number))
|
||||||
if err = e.frostfs.DeleteObject(ctx, addr); err != nil {
|
if err = e.frostfs.DeleteObject(ctx, addr); err != nil {
|
||||||
return fmt.Errorf("delete object '%s': %w", addr.EncodeToString(), err)
|
return fmt.Errorf("delete object '%s': %w", addr.EncodeToString(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,4 +64,8 @@ const (
|
||||||
DeleteObjectVersionFromTree = "delete object version from tree"
|
DeleteObjectVersionFromTree = "delete object version from tree"
|
||||||
EpochMismatched = "epoch mismatched"
|
EpochMismatched = "epoch mismatched"
|
||||||
UnknownCredentialSource = "unknown credential source to use"
|
UnknownCredentialSource = "unknown credential source to use"
|
||||||
|
AbortMultipart = "abort multipart"
|
||||||
|
DeleteObject = "delete object"
|
||||||
|
RemoveVersion = "remove version"
|
||||||
|
ProcessingJob = "processing job"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue