forked from TrueCloudLab/frostfs-node
[#1523] local_object_storage: Unify parameters for the Delete
operation
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
36c88f0dc8
commit
d75d030a90
6 changed files with 44 additions and 49 deletions
|
@ -261,12 +261,12 @@ func (b *Blobovniczas) Get(prm common.GetPrm) (res common.GetRes, err error) {
|
||||||
//
|
//
|
||||||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||||
// Otherwise, all Blobovniczas are processed descending weight.
|
// Otherwise, all Blobovniczas are processed descending weight.
|
||||||
func (b *Blobovniczas) Delete(prm DeleteSmallPrm) (res DeleteSmallRes, err error) {
|
func (b *Blobovniczas) Delete(prm common.DeletePrm) (res common.DeleteRes, err error) {
|
||||||
var bPrm blobovnicza.DeletePrm
|
var bPrm blobovnicza.DeletePrm
|
||||||
bPrm.SetAddress(prm.addr)
|
bPrm.SetAddress(prm.Address)
|
||||||
|
|
||||||
if prm.blobovniczaID != nil {
|
if prm.BlobovniczaID != nil {
|
||||||
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
|
blz, err := b.openBlobovnicza(prm.BlobovniczaID.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
@ -277,7 +277,7 @@ func (b *Blobovniczas) Delete(prm DeleteSmallPrm) (res DeleteSmallRes, err error
|
||||||
activeCache := make(map[string]struct{})
|
activeCache := make(map[string]struct{})
|
||||||
objectFound := false
|
objectFound := false
|
||||||
|
|
||||||
err = b.iterateSortedLeaves(&prm.addr, func(p string) (bool, error) {
|
err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
dirPath := filepath.Dir(p)
|
||||||
|
|
||||||
// don't process active blobovnicza of the level twice
|
// don't process active blobovnicza of the level twice
|
||||||
|
@ -307,7 +307,7 @@ func (b *Blobovniczas) Delete(prm DeleteSmallPrm) (res DeleteSmallRes, err error
|
||||||
// not found in any blobovnicza
|
// not found in any blobovnicza
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
|
||||||
return DeleteSmallRes{}, errNotFound
|
return common.DeleteRes{}, errNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -370,7 +370,7 @@ func (b *Blobovniczas) GetRange(prm common.GetRangePrm) (res common.GetRangeRes,
|
||||||
// tries to delete object from particular blobovnicza.
|
// tries to delete object from particular blobovnicza.
|
||||||
//
|
//
|
||||||
// returns no error if object was removed from some blobovnicza of the same level.
|
// returns no error if object was removed from some blobovnicza of the same level.
|
||||||
func (b *Blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath string, tryActive bool, dp DeleteSmallPrm) (DeleteSmallRes, error) {
|
func (b *Blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath string, tryActive bool, dp common.DeletePrm) (common.DeleteRes, error) {
|
||||||
lvlPath := filepath.Dir(blzPath)
|
lvlPath := filepath.Dir(blzPath)
|
||||||
|
|
||||||
// try to remove from blobovnicza if it is opened
|
// try to remove from blobovnicza if it is opened
|
||||||
|
@ -416,13 +416,13 @@ func (b *Blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath
|
||||||
b.log.Debug("index is too big", zap.String("path", blzPath))
|
b.log.Debug("index is too big", zap.String("path", blzPath))
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
|
||||||
return DeleteSmallRes{}, errNotFound
|
return common.DeleteRes{}, errNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// open blobovnicza (cached inside)
|
// open blobovnicza (cached inside)
|
||||||
blz, err := b.openBlobovnicza(blzPath)
|
blz, err := b.openBlobovnicza(blzPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeleteSmallRes{}, err
|
return common.DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.deleteObject(blz, prm, dp)
|
return b.deleteObject(blz, prm, dp)
|
||||||
|
@ -563,20 +563,20 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string,
|
||||||
return b.getObjectRange(blz, prm)
|
return b.getObjectRange(blz, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
// removes object from blobovnicza and returns DeleteSmallRes.
|
// removes object from blobovnicza and returns common.DeleteRes.
|
||||||
func (b *Blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm, dp DeleteSmallPrm) (DeleteSmallRes, error) {
|
func (b *Blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm, dp common.DeletePrm) (common.DeleteRes, error) {
|
||||||
_, err := blz.Delete(prm)
|
_, err := blz.Delete(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeleteSmallRes{}, err
|
return common.DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
storagelog.Write(b.log,
|
storagelog.Write(b.log,
|
||||||
storagelog.AddressField(dp.addr),
|
storagelog.AddressField(dp.Address),
|
||||||
storagelog.OpField("Blobovniczas DELETE"),
|
storagelog.OpField("Blobovniczas DELETE"),
|
||||||
zap.Stringer("blobovnicza ID", dp.blobovniczaID),
|
zap.Stringer("blobovnicza ID", dp.BlobovniczaID),
|
||||||
)
|
)
|
||||||
|
|
||||||
return DeleteSmallRes{}, nil
|
return common.DeleteRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// reads object from blobovnicza and returns GetSmallRes.
|
// reads object from blobovnicza and returns GetSmallRes.
|
||||||
|
|
|
@ -116,11 +116,11 @@ func TestBlobovniczas(t *testing.T) {
|
||||||
require.Equal(t, payload[off:off+ln], rngRes.Data)
|
require.Equal(t, payload[off:off+ln], rngRes.Data)
|
||||||
}
|
}
|
||||||
|
|
||||||
var dPrm DeleteSmallPrm
|
var dPrm common.DeletePrm
|
||||||
var gPrm common.GetPrm
|
var gPrm common.GetPrm
|
||||||
|
|
||||||
for i := range addrList {
|
for i := range addrList {
|
||||||
dPrm.SetAddress(addrList[i])
|
dPrm.Address = addrList[i]
|
||||||
|
|
||||||
_, err := b.Delete(dPrm)
|
_, err := b.Delete(dPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -1,10 +0,0 @@
|
||||||
package blobovniczatree
|
|
||||||
|
|
||||||
// DeleteSmallPrm groups the parameters of DeleteSmall operation.
|
|
||||||
type DeleteSmallPrm struct {
|
|
||||||
address
|
|
||||||
rwBlobovniczaID
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteSmallRes groups the resulting values of DeleteSmall operation.
|
|
||||||
type DeleteSmallRes struct{}
|
|
15
pkg/local_object_storage/blobstor/common/delete.go
Normal file
15
pkg/local_object_storage/blobstor/common/delete.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DeletePrm groups the parameters of Delete operation.
|
||||||
|
type DeletePrm struct {
|
||||||
|
Address oid.Address
|
||||||
|
BlobovniczaID *blobovnicza.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteRes groups the resulting values of Delete operation.
|
||||||
|
type DeleteRes struct{}
|
|
@ -3,28 +3,20 @@ package blobstor
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DeleteBigPrm groups the parameters of DeleteBig operation.
|
|
||||||
type DeleteBigPrm struct {
|
|
||||||
address
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteBigRes groups the resulting values of DeleteBig operation.
|
|
||||||
type DeleteBigRes struct{}
|
|
||||||
|
|
||||||
// DeleteBig removes an object from shallow dir of BLOB storage.
|
// DeleteBig removes an object from shallow dir of BLOB storage.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that did not allow
|
// Returns any error encountered that did not allow
|
||||||
// to completely remove the object.
|
// to completely remove the object.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
||||||
func (b *BlobStor) DeleteBig(prm DeleteBigPrm) (DeleteBigRes, error) {
|
func (b *BlobStor) DeleteBig(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
err := b.fsTree.Delete(prm.addr)
|
err := b.fsTree.Delete(prm.Address)
|
||||||
if errors.Is(err, fstree.ErrFileNotFound) {
|
if errors.Is(err, fstree.ErrFileNotFound) {
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
|
||||||
|
@ -32,10 +24,10 @@ func (b *BlobStor) DeleteBig(prm DeleteBigPrm) (DeleteBigRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(b.log, storagelog.AddressField(prm.addr), storagelog.OpField("fstree DELETE"))
|
storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree DELETE"))
|
||||||
}
|
}
|
||||||
|
|
||||||
return DeleteBigRes{}, err
|
return common.DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteSmall removes an object from blobovnicza of BLOB storage.
|
// DeleteSmall removes an object from blobovnicza of BLOB storage.
|
||||||
|
@ -47,6 +39,6 @@ func (b *BlobStor) DeleteBig(prm DeleteBigPrm) (DeleteBigRes, error) {
|
||||||
// to completely remove the object.
|
// to completely remove the object.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
||||||
func (b *BlobStor) DeleteSmall(prm blobovniczatree.DeleteSmallPrm) (blobovniczatree.DeleteSmallRes, error) {
|
func (b *BlobStor) DeleteSmall(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
return b.blobovniczas.Delete(prm)
|
return b.blobovniczas.Delete(prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,7 @@ package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
@ -36,8 +35,6 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ln := len(prm.addr)
|
ln := len(prm.addr)
|
||||||
var delSmallPrm blobovniczatree.DeleteSmallPrm
|
|
||||||
var delBigPrm blobstor.DeleteBigPrm
|
|
||||||
|
|
||||||
smalls := make(map[oid.Address]*blobovnicza.ID, ln)
|
smalls := make(map[oid.Address]*blobovnicza.ID, ln)
|
||||||
|
|
||||||
|
@ -76,8 +73,9 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
|
|
||||||
for i := range prm.addr { // delete small object
|
for i := range prm.addr { // delete small object
|
||||||
if id, ok := smalls[prm.addr[i]]; ok {
|
if id, ok := smalls[prm.addr[i]]; ok {
|
||||||
delSmallPrm.SetAddress(prm.addr[i])
|
var delSmallPrm common.DeletePrm
|
||||||
delSmallPrm.SetBlobovniczaID(id)
|
delSmallPrm.Address = prm.addr[i]
|
||||||
|
delSmallPrm.BlobovniczaID = id
|
||||||
|
|
||||||
_, err = s.blobStor.DeleteSmall(delSmallPrm)
|
_, err = s.blobStor.DeleteSmall(delSmallPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -90,8 +88,8 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete big object
|
// delete big object
|
||||||
|
var delBigPrm common.DeletePrm
|
||||||
delBigPrm.SetAddress(prm.addr[i])
|
delBigPrm.Address = prm.addr[i]
|
||||||
|
|
||||||
_, err = s.blobStor.DeleteBig(delBigPrm)
|
_, err = s.blobStor.DeleteBig(delBigPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue