Enable unparam and unconvert linters #1598

Merged
dstepanov-yadro merged 3 commits from dstepanov-yadro/frostfs-node:feat/add_linters into master 2025-01-14 08:27:24 +00:00
42 changed files with 166 additions and 220 deletions

View file

@ -89,5 +89,7 @@ linters:
- protogetter - protogetter
- intrange - intrange
- tenv - tenv
- unconvert
- unparam
disable-all: true disable-all: true
fast: false fast: false

View file

@ -253,7 +253,7 @@ func frostfsidListNamespaces(cmd *cobra.Command, _ []string) {
reader := frostfsidrpclient.NewReader(inv, hash) reader := frostfsidrpclient.NewReader(inv, hash)
sessionID, it, err := reader.ListNamespaces() sessionID, it, err := reader.ListNamespaces()
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err) commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID) items, err := readIterator(inv, &it, sessionID)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err) commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
namespaces, err := frostfsidclient.ParseNamespaces(items) namespaces, err := frostfsidclient.ParseNamespaces(items)
@ -305,7 +305,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
sessionID, it, err := reader.ListNamespaceSubjects(ns) sessionID, it, err := reader.ListNamespaceSubjects(ns)
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err) commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, iteratorBatchSize, sessionID)) subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, sessionID))
commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err) commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err)
sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) }) sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) })
@ -319,7 +319,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
sessionID, it, err := reader.ListSubjects() sessionID, it, err := reader.ListSubjects()
commonCmd.ExitOnErr(cmd, "can't get subject: %w", err) commonCmd.ExitOnErr(cmd, "can't get subject: %w", err)
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID) items, err := readIterator(inv, &it, sessionID)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err) commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
subj, err := frostfsidclient.ParseSubject(items) subj, err := frostfsidclient.ParseSubject(items)
@ -365,7 +365,7 @@ func frostfsidListGroups(cmd *cobra.Command, _ []string) {
sessionID, it, err := reader.ListGroups(ns) sessionID, it, err := reader.ListGroups(ns)
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err) commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID) items, err := readIterator(inv, &it, sessionID)
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err) commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
groups, err := frostfsidclient.ParseGroups(items) groups, err := frostfsidclient.ParseGroups(items)
commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err) commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err)
@ -415,7 +415,7 @@ func frostfsidListGroupSubjects(cmd *cobra.Command, _ []string) {
sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID)) sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID))
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err) commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID) items, err := readIterator(inv, &it, sessionID)
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err) commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err) subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err)
@ -492,17 +492,17 @@ func (f *frostfsidClient) sendWaitRes() (*state.AppExecResult, error) {
return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil) return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil)
} }
func readIterator(inv *invoker.Invoker, iter *result.Iterator, batchSize int, sessionID uuid.UUID) ([]stackitem.Item, error) { func readIterator(inv *invoker.Invoker, iter *result.Iterator, sessionID uuid.UUID) ([]stackitem.Item, error) {
var shouldStop bool var shouldStop bool
res := make([]stackitem.Item, 0) res := make([]stackitem.Item, 0)
for !shouldStop { for !shouldStop {
items, err := inv.TraverseIterator(sessionID, iter, batchSize) items, err := inv.TraverseIterator(sessionID, iter, iteratorBatchSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
res = append(res, items...) res = append(res, items...)
shouldStop = len(items) < batchSize shouldStop = len(items) < iteratorBatchSize
} }
return res, nil return res, nil

View file

@ -52,7 +52,7 @@ func genereateAPEOverride(cmd *cobra.Command, _ []string) {
outputPath, _ := cmd.Flags().GetString(outputFlag) outputPath, _ := cmd.Flags().GetString(outputFlag)
if outputPath != "" { if outputPath != "" {
err := os.WriteFile(outputPath, []byte(overrideMarshalled), 0o644) err := os.WriteFile(outputPath, overrideMarshalled, 0o644)
commonCmd.ExitOnErr(cmd, "dump error: %w", err) commonCmd.ExitOnErr(cmd, "dump error: %w", err)
} else { } else {
fmt.Print("\n") fmt.Print("\n")

View file

@ -23,11 +23,11 @@ type policyPlaygroundREPL struct {
nodes map[string]netmap.NodeInfo nodes map[string]netmap.NodeInfo
} }
func newPolicyPlaygroundREPL(cmd *cobra.Command) (*policyPlaygroundREPL, error) { func newPolicyPlaygroundREPL(cmd *cobra.Command) *policyPlaygroundREPL {
return &policyPlaygroundREPL{ return &policyPlaygroundREPL{
cmd: cmd, cmd: cmd,
nodes: map[string]netmap.NodeInfo{}, nodes: map[string]netmap.NodeInfo{},
}, nil }
} }
func (repl *policyPlaygroundREPL) handleLs(args []string) error { func (repl *policyPlaygroundREPL) handleLs(args []string) error {
@ -246,8 +246,7 @@ var policyPlaygroundCmd = &cobra.Command{
Long: `A REPL for testing placement policies. Long: `A REPL for testing placement policies.
If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`, If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`,
Run: func(cmd *cobra.Command, _ []string) { Run: func(cmd *cobra.Command, _ []string) {
repl, err := newPolicyPlaygroundREPL(cmd) repl := newPolicyPlaygroundREPL(cmd)
commonCmd.ExitOnErr(cmd, "could not create policy playground: %w", err)
commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run()) commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run())
}, },
} }

View file

@ -124,10 +124,7 @@ func (v *BucketsView) loadNodeChildren(
path := parentBucket.Path path := parentBucket.Path
parser := parentBucket.NextParser parser := parentBucket.NextParser
buffer, err := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize) buffer := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize)
if err != nil {
return err
}
for item := range buffer { for item := range buffer {
if item.err != nil { if item.err != nil {
@ -135,6 +132,7 @@ func (v *BucketsView) loadNodeChildren(
} }
bucket := item.val bucket := item.val
var err error
bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil) bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil)
if err != nil { if err != nil {
return err return err
@ -180,10 +178,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
defer cancel() defer cancel()
// Check the current bucket's nested buckets if exist // Check the current bucket's nested buckets if exist
bucketsBuffer, err := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize) bucketsBuffer := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
if err != nil {
return false, err
}
for item := range bucketsBuffer { for item := range bucketsBuffer {
if item.err != nil { if item.err != nil {
@ -191,6 +186,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
} }
b := item.val b := item.val
var err error
b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil) b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil)
if err != nil { if err != nil {
return false, err return false, err
@ -206,10 +202,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
} }
// Check the current bucket's nested records if exist // Check the current bucket's nested records if exist
recordsBuffer, err := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize) recordsBuffer := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
if err != nil {
return false, err
}
for item := range recordsBuffer { for item := range recordsBuffer {
if item.err != nil { if item.err != nil {
@ -217,6 +210,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
} }
r := item.val r := item.val
var err error
r.Entry, _, err = bucket.NextParser(r.Key, r.Value) r.Entry, _, err = bucket.NextParser(r.Key, r.Value)
if err != nil { if err != nil {
return false, err return false, err

View file

@ -35,7 +35,7 @@ func resolvePath(tx *bbolt.Tx, path [][]byte) (*bbolt.Bucket, error) {
func load[T any]( func load[T any](
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int, ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
filter func(key, value []byte) bool, transform func(key, value []byte) T, filter func(key, value []byte) bool, transform func(key, value []byte) T,
) (<-chan Item[T], error) { ) <-chan Item[T] {
buffer := make(chan Item[T], bufferSize) buffer := make(chan Item[T], bufferSize)
go func() { go func() {
@ -77,13 +77,13 @@ func load[T any](
} }
}() }()
return buffer, nil return buffer
} }
func LoadBuckets( func LoadBuckets(
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int, ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
) (<-chan Item[*Bucket], error) { ) <-chan Item[*Bucket] {
buffer, err := load( buffer := load(
ctx, db, path, bufferSize, ctx, db, path, bufferSize,
func(_, value []byte) bool { func(_, value []byte) bool {
return value == nil return value == nil
@ -98,17 +98,14 @@ func LoadBuckets(
} }
}, },
) )
if err != nil {
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
}
return buffer, nil return buffer
} }
func LoadRecords( func LoadRecords(
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int, ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
) (<-chan Item[*Record], error) { ) <-chan Item[*Record] {
buffer, err := load( buffer := load(
ctx, db, path, bufferSize, ctx, db, path, bufferSize,
func(_, value []byte) bool { func(_, value []byte) bool {
return value != nil return value != nil
@ -124,11 +121,8 @@ func LoadRecords(
} }
}, },
) )
if err != nil {
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
}
return buffer, nil return buffer
} }
// HasBuckets checks if a bucket has nested buckets. It relies on assumption // HasBuckets checks if a bucket has nested buckets. It relies on assumption
@ -137,24 +131,21 @@ func HasBuckets(ctx context.Context, db *bbolt.DB, path [][]byte) (bool, error)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
buffer, err := load( buffer := load(
ctx, db, path, 1, ctx, db, path, 1,
nil, nil,
func(_, value []byte) []byte { return value }, func(_, value []byte) []byte { return value },
) )
if err != nil {
return false, err
}
x, ok := <-buffer x, ok := <-buffer
if !ok { if !ok {
return false, nil return false, nil
} }
if x.err != nil { if x.err != nil {
return false, err return false, x.err
} }
if x.val != nil { if x.val != nil {
return false, err return false, nil
} }
return true, nil return true, nil
} }

View file

@ -62,10 +62,7 @@ func (v *RecordsView) Mount(ctx context.Context) error {
ctx, v.onUnmount = context.WithCancel(ctx) ctx, v.onUnmount = context.WithCancel(ctx)
tempBuffer, err := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize) tempBuffer := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize)
if err != nil {
return err
}
v.buffer = make(chan *Record, v.ui.loadBufferSize) v.buffer = make(chan *Record, v.ui.loadBufferSize)
go func() { go func() {
@ -73,11 +70,12 @@ func (v *RecordsView) Mount(ctx context.Context) error {
for item := range tempBuffer { for item := range tempBuffer {
if item.err != nil { if item.err != nil {
v.ui.stopOnError(err) v.ui.stopOnError(item.err)
break break
} }
record := item.val record := item.val
var err error
record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value) record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value)
if err != nil { if err != nil {
v.ui.stopOnError(err) v.ui.stopOnError(err)

View file

@ -698,8 +698,7 @@ func initCfg(appCfg *config.Config) *cfg {
netState.metrics = c.metricsCollector netState.metrics = c.metricsCollector
logPrm, err := c.loggerPrm() logPrm := c.loggerPrm()
fatalOnErr(err)
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook() logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
log, err := logger.NewLogger(logPrm) log, err := logger.NewLogger(logPrm)
fatalOnErr(err) fatalOnErr(err)
@ -853,8 +852,8 @@ func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
} }
func initCfgGRPC() cfgGRPC { func initCfgGRPC() cfgGRPC {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
return cfgGRPC{ return cfgGRPC{
maxChunkSize: maxChunkSize, maxChunkSize: maxChunkSize,
@ -1059,7 +1058,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
return sh return sh
} }
func (c *cfg) loggerPrm() (*logger.Prm, error) { func (c *cfg) loggerPrm() *logger.Prm {
// check if it has been inited before // check if it has been inited before
if c.dynamicConfiguration.logger == nil { if c.dynamicConfiguration.logger == nil {
c.dynamicConfiguration.logger = new(logger.Prm) c.dynamicConfiguration.logger = new(logger.Prm)
@ -1078,7 +1077,7 @@ func (c *cfg) loggerPrm() (*logger.Prm, error) {
} }
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
return c.dynamicConfiguration.logger, nil return c.dynamicConfiguration.logger
} }
func (c *cfg) LocalAddress() network.AddressGroup { func (c *cfg) LocalAddress() network.AddressGroup {
@ -1334,11 +1333,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
// Logger // Logger
logPrm, err := c.loggerPrm() logPrm := c.loggerPrm()
if err != nil {
c.log.Error(ctx, logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err))
return
}
components := c.getComponents(ctx, logPrm) components := c.getComponents(ctx, logPrm)

View file

@ -198,7 +198,7 @@ func (l PersistentPolicyRulesConfig) Path() string {
// //
// Returns PermDefault if the value is not a positive number. // Returns PermDefault if the value is not a positive number.
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode { func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
p := config.UintSafe((*config.Config)(l.cfg), "perm") p := config.UintSafe(l.cfg, "perm")
if p == 0 { if p == 0 {
p = PermDefault p = PermDefault
} }
@ -210,7 +210,7 @@ func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
// //
// Returns false if the value is not a boolean. // Returns false if the value is not a boolean.
func (l PersistentPolicyRulesConfig) NoSync() bool { func (l PersistentPolicyRulesConfig) NoSync() bool {
return config.BoolSafe((*config.Config)(l.cfg), "no_sync") return config.BoolSafe(l.cfg, "no_sync")
} }
// CompatibilityMode returns true if need to run node in compatibility with previous versions mode. // CompatibilityMode returns true if need to run node in compatibility with previous versions mode.

View file

@ -86,7 +86,7 @@ func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
} }
} }
s.setControlNetmapStatus(control.NetmapStatus(ctrlNetSt)) s.setControlNetmapStatus(ctrlNetSt)
} }
// sets the current node state to the given value. Subsequent cfg.bootstrap // sets the current node state to the given value. Subsequent cfg.bootstrap

View file

@ -215,8 +215,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
prm.MarkAsGarbage(addr) prm.MarkAsGarbage(addr)
prm.WithForceRemoval() prm.WithForceRemoval()
_, err := ls.Inhume(ctx, prm) return ls.Inhume(ctx, prm)
return err
} }
remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor) remoteReader := objectService.NewRemoteReader(keyStorage, clientConstructor)
@ -266,8 +265,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
var inhumePrm engine.InhumePrm var inhumePrm engine.InhumePrm
inhumePrm.MarkAsGarbage(addr) inhumePrm.MarkAsGarbage(addr)
_, err := ls.Inhume(ctx, inhumePrm) if err := ls.Inhume(ctx, inhumePrm); err != nil {
if err != nil {
c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage, c.log.Warn(ctx, logs.FrostFSNodeCouldNotInhumeMarkRedundantCopyAsGarbage,
zap.Error(err), zap.Error(err),
) )
@ -476,8 +474,7 @@ func (e engineWithoutNotifications) Delete(ctx context.Context, tombstone oid.Ad
prm.WithTarget(tombstone, addrs...) prm.WithTarget(tombstone, addrs...)
_, err := e.engine.Inhume(ctx, prm) return e.engine.Inhume(ctx, prm)
return err
} }
func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error { func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address, toLock []oid.ID) error {

View file

@ -146,7 +146,6 @@ const (
ClientCantGetBlockchainHeight = "can't get blockchain height" ClientCantGetBlockchainHeight = "can't get blockchain height"
ClientCantGetBlockchainHeight243 = "can't get blockchain height" ClientCantGetBlockchainHeight243 = "can't get blockchain height"
EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool" EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool"
EventCouldNotStartListenToEvents = "could not start listen to events"
EventStopEventListenerByError = "stop event listener by error" EventStopEventListenerByError = "stop event listener by error"
EventStopEventListenerByContext = "stop event listener by context" EventStopEventListenerByContext = "stop event listener by context"
EventStopEventListenerByNotificationChannel = "stop event listener by notification channel" EventStopEventListenerByNotificationChannel = "stop event listener by notification channel"
@ -384,7 +383,6 @@ const (
FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown" FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown"
FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing" FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing"
FrostFSNodeConfigurationReading = "configuration reading" FrostFSNodeConfigurationReading = "configuration reading"
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
FrostFSNodeTracingConfigationUpdated = "tracing configation updated" FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update" FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration" FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"

View file

@ -38,10 +38,7 @@ import (
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper, func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
alphaSync event.Handler, alphaSync event.Handler,
) error { ) error {
locodeValidator, err := s.newLocodeValidator(cfg) locodeValidator := s.newLocodeValidator(cfg)
if err != nil {
return err
}
netSettings := (*networkSettings)(s.netmapClient) netSettings := (*networkSettings)(s.netmapClient)
@ -51,6 +48,7 @@ func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
poolSize := cfg.GetInt("workers.netmap") poolSize := cfg.GetInt("workers.netmap")
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize)) s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
var err error
s.netmapProcessor, err = netmap.New(&netmap.Params{ s.netmapProcessor, err = netmap.New(&netmap.Params{
Log: s.log, Log: s.log,
Metrics: s.irMetrics, Metrics: s.irMetrics,

View file

@ -9,7 +9,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
) )
func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, error) { func (s *Server) newLocodeValidator(cfg *viper.Viper) netmap.NodeValidator {
locodeDB := locodebolt.New(locodebolt.Prm{ locodeDB := locodebolt.New(locodebolt.Prm{
Path: cfg.GetString("locode.db.path"), Path: cfg.GetString("locode.db.path"),
}, },
@ -21,7 +21,7 @@ func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, err
return irlocode.New(irlocode.Prm{ return irlocode.New(irlocode.Prm{
DB: (*locodeBoltDBWrapper)(locodeDB), DB: (*locodeBoltDBWrapper)(locodeDB),
}), nil })
} }
type locodeBoltEntryWrapper struct { type locodeBoltEntryWrapper struct {

View file

@ -19,7 +19,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var errObjectIsDeleteProtected = errors.New("object is delete protected") var (
errObjectIsDeleteProtected = errors.New("object is delete protected")
deleteRes = common.DeleteRes{}
)
// Delete deletes object from blobovnicza tree. // Delete deletes object from blobovnicza tree.
// //
@ -43,17 +46,17 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
defer span.End() defer span.End()
if b.readOnly { if b.readOnly {
return common.DeleteRes{}, common.ErrReadOnly return deleteRes, common.ErrReadOnly
} }
if b.rebuildGuard.TryRLock() { if b.rebuildGuard.TryRLock() {
defer b.rebuildGuard.RUnlock() defer b.rebuildGuard.RUnlock()
} else { } else {
return common.DeleteRes{}, errRebuildInProgress return deleteRes, errRebuildInProgress
} }
if b.deleteProtectedObjects.Contains(prm.Address) { if b.deleteProtectedObjects.Contains(prm.Address) {
return common.DeleteRes{}, errObjectIsDeleteProtected return deleteRes, errObjectIsDeleteProtected
} }
var bPrm blobovnicza.DeletePrm var bPrm blobovnicza.DeletePrm
@ -98,7 +101,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
if err == nil && !objectFound { if err == nil && !objectFound {
// not found in any blobovnicza // not found in any blobovnicza
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) return deleteRes, logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
success = err == nil success = err == nil
@ -112,7 +115,7 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
shBlz := b.getBlobovnicza(ctx, blzPath) shBlz := b.getBlobovnicza(ctx, blzPath)
blz, err := shBlz.Open(ctx) blz, err := shBlz.Open(ctx)
if err != nil { if err != nil {
return common.DeleteRes{}, err return deleteRes, err
} }
defer shBlz.Close(ctx) defer shBlz.Close(ctx)
@ -122,5 +125,5 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
// removes object from blobovnicza and returns common.DeleteRes. // removes object from blobovnicza and returns common.DeleteRes.
func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) { func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) {
_, err := blz.Delete(ctx, prm) _, err := blz.Delete(ctx, prm)
return common.DeleteRes{}, err return deleteRes, err
} }

View file

@ -249,6 +249,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
} }
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) { func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
sysPath := filepath.Join(b.rootPath, path) sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath) entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode

View file

@ -136,6 +136,6 @@ func (w *genericWriter) removeWithCounter(p string, size uint64) error {
if err := os.Remove(p); err != nil { if err := os.Remove(p); err != nil {
return err return err
} }
w.fileCounter.Dec(uint64(size)) w.fileCounter.Dec(size)
return nil return nil
} }

View file

@ -114,7 +114,7 @@ func (w *linuxWriter) removeFile(p string, size uint64) error {
return logicerr.Wrap(new(apistatus.ObjectNotFound)) return logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
if err == nil { if err == nil {
w.fileCounter.Dec(uint64(size)) w.fileCounter.Dec(size)
} }
return err return err
} }

View file

@ -133,11 +133,11 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
elem := common.IterationElement{ elem := common.IterationElement{
ObjectData: v, ObjectData: v,
} }
if err := elem.Address.DecodeString(string(k)); err != nil { if err := elem.Address.DecodeString(k); err != nil {
if req.IgnoreErrors { if req.IgnoreErrors {
continue continue
} }
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, string(k), err)) return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, k, err))
} }
var err error var err error
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil { if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {

View file

@ -48,8 +48,8 @@ func (e *StorageEngine) ContainerSize(ctx context.Context, prm ContainerSizePrm)
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)() defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { err = e.execIfNotBlocked(func() error {
res, err = e.containerSize(ctx, prm) res = e.containerSize(ctx, prm)
return err return nil
}) })
return return
@ -69,7 +69,7 @@ func ContainerSize(ctx context.Context, e *StorageEngine, id cid.ID) (uint64, er
return res.Size(), nil return res.Size(), nil
} }
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes, err error) { func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
var csPrm shard.ContainerSizePrm var csPrm shard.ContainerSizePrm
csPrm.SetContainerID(prm.cnr) csPrm.SetContainerID(prm.cnr)
@ -96,8 +96,8 @@ func (e *StorageEngine) ListContainers(ctx context.Context, _ ListContainersPrm)
defer elapsed("ListContainers", e.metrics.AddMethodDuration)() defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { err = e.execIfNotBlocked(func() error {
res, err = e.listContainers(ctx) res = e.listContainers(ctx)
return err return nil
}) })
return return
@ -115,7 +115,7 @@ func ListContainers(ctx context.Context, e *StorageEngine) ([]cid.ID, error) {
return res.Containers(), nil return res.Containers(), nil
} }
func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes, error) { func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes {
uniqueIDs := make(map[string]cid.ID) uniqueIDs := make(map[string]cid.ID)
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
@ -142,5 +142,5 @@ func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes,
return ListContainersRes{ return ListContainersRes{
containers: result, containers: result,
}, nil }
} }

View file

@ -24,9 +24,6 @@ type DeletePrm struct {
forceRemoval bool forceRemoval bool
} }
// DeleteRes groups the resulting values of Delete operation.
type DeleteRes struct{}
// WithAddress is a Delete option to set the addresses of the objects to delete. // WithAddress is a Delete option to set the addresses of the objects to delete.
// //
// Option is required. // Option is required.
@ -51,7 +48,7 @@ func (p *DeletePrm) WithForceRemoval() {
// NOTE: Marks any object to be deleted (despite any prohibitions // NOTE: Marks any object to be deleted (despite any prohibitions
// on operations with that object) if WithForceRemoval option has // on operations with that object) if WithForceRemoval option has
// been provided. // been provided.
func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRes, err error) { func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Delete", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Delete",
trace.WithAttributes( trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()), attribute.String("address", prm.addr.EncodeToString()),
@ -60,15 +57,12 @@ func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRe
defer span.End() defer span.End()
defer elapsed("Delete", e.metrics.AddMethodDuration)() defer elapsed("Delete", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { return e.execIfNotBlocked(func() error {
res, err = e.delete(ctx, prm) return e.delete(ctx, prm)
return err
}) })
return
} }
func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) error {
var locked struct { var locked struct {
is bool is bool
} }
@ -126,14 +120,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
}) })
if locked.is { if locked.is {
return DeleteRes{}, new(apistatus.ObjectLocked) return new(apistatus.ObjectLocked)
} }
if splitInfo != nil { if splitInfo != nil {
e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID()) e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID())
} }
return DeleteRes{}, nil return nil
} }
func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) { func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) {

View file

@ -70,8 +70,7 @@ func TestDeleteBigObject(t *testing.T) {
deletePrm.WithForceRemoval() deletePrm.WithForceRemoval()
deletePrm.WithAddress(addrParent) deletePrm.WithAddress(addrParent)
_, err := e.Delete(context.Background(), deletePrm) require.NoError(t, e.Delete(context.Background(), deletePrm))
require.NoError(t, err)
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
@ -141,8 +140,7 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
deletePrm.WithForceRemoval() deletePrm.WithForceRemoval()
deletePrm.WithAddress(addrParent) deletePrm.WithAddress(addrParent)
_, err := e.Delete(context.Background(), deletePrm) require.NoError(t, e.Delete(context.Background(), deletePrm))
require.NoError(t, err)
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
@ -153,7 +151,7 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
// delete physical // delete physical
var delPrm shard.DeletePrm var delPrm shard.DeletePrm
delPrm.SetAddresses(addrParent) delPrm.SetAddresses(addrParent)
_, err = s1.Delete(context.Background(), delPrm) _, err := s1.Delete(context.Background(), delPrm)
require.NoError(t, err) require.NoError(t, err)
delPrm.SetAddresses(addrLink) delPrm.SetAddresses(addrLink)

View file

@ -724,7 +724,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
shards := make([]pooledShard, 0, len(e.shards)) shards := make([]pooledShard, 0, len(e.shards))
for id := range e.shards { for id := range e.shards {
shards = append(shards, pooledShard{ shards = append(shards, pooledShard{
hashedShard: hashedShard(e.shards[id]), hashedShard: e.shards[id],
pool: e.shardPools[id], pool: e.shardPools[id],
}) })
} }

View file

@ -27,9 +27,6 @@ type InhumePrm struct {
forceRemoval bool forceRemoval bool
} }
// InhumeRes encapsulates results of inhume operation.
type InhumeRes struct{}
// WithTarget sets a list of objects that should be inhumed and tombstone address // WithTarget sets a list of objects that should be inhumed and tombstone address
// as the reason for inhume operation. // as the reason for inhume operation.
// //
@ -67,23 +64,20 @@ var errInhumeFailure = errors.New("inhume operation failed")
// with that object) if WithForceRemoval option has been provided. // with that object) if WithForceRemoval option has been provided.
// //
// Returns an error if executions are blocked (see BlockExecution). // Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) { func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume") ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume")
defer span.End() defer span.End()
defer elapsed("Inhume", e.metrics.AddMethodDuration)() defer elapsed("Inhume", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { return e.execIfNotBlocked(func() error {
res, err = e.inhume(ctx, prm) return e.inhume(ctx, prm)
return err
}) })
return
} }
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) error {
addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval) addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval)
if err != nil { if err != nil {
return InhumeRes{}, err return err
} }
var shPrm shard.InhumePrm var shPrm shard.InhumePrm
@ -107,7 +101,7 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
zap.String("shard_id", shardID), zap.String("shard_id", shardID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
) )
return InhumeRes{}, errInhumeFailure return errInhumeFailure
} }
if _, err := sh.Inhume(ctx, shPrm); err != nil { if _, err := sh.Inhume(ctx, shPrm); err != nil {
@ -119,11 +113,11 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
default: default:
e.reportShardError(ctx, sh, "couldn't inhume object in shard", err) e.reportShardError(ctx, sh, "couldn't inhume object in shard", err)
} }
return InhumeRes{}, err return err
} }
} }
return InhumeRes{}, nil return nil
} }
// groupObjectsByShard groups objects based on the shard(s) they are stored on. // groupObjectsByShard groups objects based on the shard(s) they are stored on.

View file

@ -55,7 +55,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
var inhumePrm InhumePrm var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent)) inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
addrs, err := Select(context.Background(), e, cnr, false, fs) addrs, err := Select(context.Background(), e, cnr, false, fs)
@ -85,7 +85,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
var inhumePrm InhumePrm var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent)) inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
addrs, err := Select(context.Background(), e, cnr, false, fs) addrs, err := Select(context.Background(), e, cnr, false, fs)
@ -128,7 +128,7 @@ func TestStorageEngine_ECInhume(t *testing.T) {
var inhumePrm InhumePrm var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress) inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
var alreadyRemoved *apistatus.ObjectAlreadyRemoved var alreadyRemoved *apistatus.ObjectAlreadyRemoved
@ -173,7 +173,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
var prm InhumePrm var prm InhumePrm
prm.WithTarget(ts, object.AddressOf(obj)) prm.WithTarget(ts, object.AddressOf(obj))
_, err := engine.Inhume(context.Background(), prm) err := engine.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
}) })
@ -182,7 +182,7 @@ func TestInhumeExpiredRegularObject(t *testing.T) {
var prm InhumePrm var prm InhumePrm
prm.MarkAsGarbage(object.AddressOf(obj)) prm.MarkAsGarbage(object.AddressOf(obj))
_, err := engine.Inhume(context.Background(), prm) err := engine.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
}) })
} }
@ -237,7 +237,7 @@ func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
prm.WithTarget(ts, addrs...) prm.WithTarget(ts, addrs...)
b.StartTimer() b.StartTimer()
_, err := engine.Inhume(context.Background(), prm) err := engine.Inhume(context.Background(), prm)
require.NoError(b, err) require.NoError(b, err)
b.StopTimer() b.StopTimer()
} }

View file

@ -114,7 +114,7 @@ func TestLockUserScenario(t *testing.T) {
inhumePrm.WithTarget(tombAddr, objAddr) inhumePrm.WithTarget(tombAddr, objAddr)
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// 4. // 4.
@ -127,7 +127,7 @@ func TestLockUserScenario(t *testing.T) {
inhumePrm.WithTarget(tombForLockAddr, lockerAddr) inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorIs(t, err, meta.ErrLockObjectRemoval) require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
// 5. // 5.
@ -136,7 +136,7 @@ func TestLockUserScenario(t *testing.T) {
inhumePrm.WithTarget(tombAddr, objAddr) inhumePrm.WithTarget(tombAddr, objAddr)
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
return err == nil return err == nil
}, 30*time.Second, time.Second) }, 30*time.Second, time.Second)
} }
@ -200,7 +200,7 @@ func TestLockExpiration(t *testing.T) {
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj)) inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// 3. // 3.
@ -212,7 +212,7 @@ func TestLockExpiration(t *testing.T) {
inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj)) inhumePrm.WithTarget(tombAddr, objectcore.AddressOf(obj))
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
return err == nil return err == nil
}, 30*time.Second, time.Second) }, 30*time.Second, time.Second)
} }
@ -270,12 +270,12 @@ func TestLockForceRemoval(t *testing.T) {
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj)) inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// 4. // 4.
@ -283,13 +283,12 @@ func TestLockForceRemoval(t *testing.T) {
deletePrm.WithAddress(objectcore.AddressOf(lock)) deletePrm.WithAddress(objectcore.AddressOf(lock))
deletePrm.WithForceRemoval() deletePrm.WithForceRemoval()
_, err = e.Delete(context.Background(), deletePrm) require.NoError(t, e.Delete(context.Background(), deletePrm))
require.NoError(t, err)
// 5. // 5.
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
_, err = e.Inhume(context.Background(), inhumePrm) err = e.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
} }

View file

@ -54,19 +54,17 @@ func (e *StorageEngine) Select(ctx context.Context, prm SelectPrm) (res SelectRe
defer elapsed("Select", e.metrics.AddMethodDuration)() defer elapsed("Select", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { err = e.execIfNotBlocked(func() error {
res, err = e._select(ctx, prm) res = e._select(ctx, prm)
return err return nil
}) })
return return
} }
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes, error) { func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes {
addrList := make([]oid.Address, 0) addrList := make([]oid.Address, 0)
uniqueMap := make(map[string]struct{}) uniqueMap := make(map[string]struct{})
var outError error
var shPrm shard.SelectPrm var shPrm shard.SelectPrm
shPrm.SetContainerID(prm.cnr, prm.indexedContainer) shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
shPrm.SetFilters(prm.filters) shPrm.SetFilters(prm.filters)
@ -90,7 +88,7 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
return SelectRes{ return SelectRes{
addrList: addrList, addrList: addrList,
}, outError }
} }
// List returns `limit` available physically storage object addresses in engine. // List returns `limit` available physically storage object addresses in engine.
@ -100,14 +98,14 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) { func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
defer elapsed("List", e.metrics.AddMethodDuration)() defer elapsed("List", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error { err = e.execIfNotBlocked(func() error {
res, err = e.list(ctx, limit) res = e.list(ctx, limit)
return err return nil
}) })
return return
} }
func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, error) { func (e *StorageEngine) list(ctx context.Context, limit uint64) SelectRes {
addrList := make([]oid.Address, 0, limit) addrList := make([]oid.Address, 0, limit)
uniqueMap := make(map[string]struct{}) uniqueMap := make(map[string]struct{})
ln := uint64(0) ln := uint64(0)
@ -136,7 +134,7 @@ func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, erro
return SelectRes{ return SelectRes{
addrList: addrList, addrList: addrList,
}, nil }
} }
// Select selects objects from local storage using provided filters. // Select selects objects from local storage using provided filters.

View file

@ -272,7 +272,7 @@ func (e *StorageEngine) sortShards(objAddr interface{ EncodeToString() string })
h := hrw.StringHash(objAddr.EncodeToString()) h := hrw.StringHash(objAddr.EncodeToString())
shards := make([]hashedShard, 0, len(e.shards)) shards := make([]hashedShard, 0, len(e.shards))
for _, sh := range e.shards { for _, sh := range e.shards {
shards = append(shards, hashedShard(sh)) shards = append(shards, sh)
} }
hrw.SortHasherSliceByValue(shards, h) hrw.SortHasherSliceByValue(shards, h)
return shards return shards
@ -285,7 +285,7 @@ func (e *StorageEngine) unsortedShards() []hashedShard {
shards := make([]hashedShard, 0, len(e.shards)) shards := make([]hashedShard, 0, len(e.shards))
for _, sh := range e.shards { for _, sh := range e.shards {
shards = append(shards, hashedShard(sh)) shards = append(shards, sh)
} }
return shards return shards

View file

@ -56,7 +56,7 @@ func (db *DB) containers(tx *bbolt.Tx) ([]cid.ID, error) {
return result, err return result, err
} }
func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) { func (db *DB) ContainerSize(id cid.ID) (uint64, error) {
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
@ -64,21 +64,22 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
return 0, ErrDegradedMode return 0, ErrDegradedMode
} }
err = db.boltDB.View(func(tx *bbolt.Tx) error { var size uint64
size, err = db.containerSize(tx, id) err := db.boltDB.View(func(tx *bbolt.Tx) error {
size = db.containerSize(tx, id)
return err return nil
}) })
return size, metaerr.Wrap(err) return size, metaerr.Wrap(err)
} }
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) { func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) uint64 {
containerVolume := tx.Bucket(containerVolumeBucketName) containerVolume := tx.Bucket(containerVolumeBucketName)
key := make([]byte, cidSize) key := make([]byte, cidSize)
id.Encode(key) id.Encode(key)
return parseContainerSize(containerVolume.Get(key)), nil return parseContainerSize(containerVolume.Get(key))
} }
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool { func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {

View file

@ -251,13 +251,13 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
return db.incContainerObjectCounter(tx, cnrID, isUserObject) return db.incContainerObjectCounter(tx, cnrID, isUserObject)
} }
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error {
b := tx.Bucket(shardInfoBucket) b := tx.Bucket(shardInfoBucket)
if b == nil { if b == nil {
return nil return nil
} }
return db.updateShardObjectCounterBucket(b, typ, delta, inc) return db.updateShardObjectCounterBucket(b, typ, delta, false)
} }
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error { func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {

View file

@ -161,21 +161,21 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error { func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
if res.phyCount > 0 { if res.phyCount > 0 {
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false) err := db.decShardObjectCounter(tx, phy, res.phyCount)
if err != nil { if err != nil {
return fmt.Errorf("decrease phy object counter: %w", err) return fmt.Errorf("decrease phy object counter: %w", err)
} }
} }
if res.logicCount > 0 { if res.logicCount > 0 {
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false) err := db.decShardObjectCounter(tx, logical, res.logicCount)
if err != nil { if err != nil {
return fmt.Errorf("decrease logical object counter: %w", err) return fmt.Errorf("decrease logical object counter: %w", err)
} }
} }
if res.userCount > 0 { if res.userCount > 0 {
err := db.updateShardObjectCounter(tx, user, res.userCount, false) err := db.decShardObjectCounter(tx, user, res.userCount)
if err != nil { if err != nil {
return fmt.Errorf("decrease user object counter: %w", err) return fmt.Errorf("decrease user object counter: %w", err)
} }

View file

@ -342,10 +342,10 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
} }
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil { if err := db.decShardObjectCounter(tx, logical, res.LogicInhumed()); err != nil {
return err return err
} }
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil { if err := db.decShardObjectCounter(tx, user, res.UserInhumed()); err != nil {
return err return err
} }

View file

@ -35,7 +35,7 @@ func (r StorageIDRes) StorageID() []byte {
// StorageID returns storage descriptor for objects from the blobstor. // StorageID returns storage descriptor for objects from the blobstor.
// It is put together with the object can makes get/delete operation faster. // It is put together with the object can makes get/delete operation faster.
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) { func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (StorageIDRes, error) {
var ( var (
startedAt = time.Now() startedAt = time.Now()
success = false success = false
@ -53,32 +53,32 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
var res StorageIDRes
if db.mode.NoMetabase() { if db.mode.NoMetabase() {
return res, ErrDegradedMode return res, ErrDegradedMode
} }
err = db.boltDB.View(func(tx *bbolt.Tx) error { err := db.boltDB.View(func(tx *bbolt.Tx) error {
res.id, err = db.storageID(tx, prm.addr) res.id = db.storageID(tx, prm.addr)
return nil
return err
}) })
success = err == nil success = err == nil
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)
} }
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) { func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) []byte {
key := make([]byte, bucketKeySize) key := make([]byte, bucketKeySize)
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key)) smallBucket := tx.Bucket(smallBucketName(addr.Container(), key))
if smallBucket == nil { if smallBucket == nil {
return nil, nil return nil
} }
storageID := smallBucket.Get(objectKey(addr.Object(), key)) storageID := smallBucket.Get(objectKey(addr.Object(), key))
if storageID == nil { if storageID == nil {
return nil, nil return nil
} }
return bytes.Clone(storageID), nil return bytes.Clone(storageID)
} }
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation. // UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.

View file

@ -419,10 +419,7 @@ func (t *boltForest) addByPathInternal(d CIDDescriptor, attr string, treeID stri
return err return err
} }
i, node, err := t.getPathPrefix(bTree, attr, path) i, node := t.getPathPrefix(bTree, attr, path)
if err != nil {
return err
}
ts := t.getLatestTimestamp(bLog, d.Position, d.Size) ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
lm = make([]Move, len(path)-i+1) lm = make([]Move, len(path)-i+1)
@ -980,10 +977,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
b := treeRoot.Bucket(dataBucket) b := treeRoot.Bucket(dataBucket)
i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1]) i, curNodes := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
if err != nil {
return err
}
if i < len(path)-1 { if i < len(path)-1 {
return nil return nil
} }
@ -1526,7 +1520,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
return &res, nil return &res, nil
} }
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) { func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node) {
c := bTree.Cursor() c := bTree.Cursor()
var curNodes []Node var curNodes []Node
@ -1549,14 +1543,14 @@ func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr strin
} }
if len(nextNodes) == 0 { if len(nextNodes) == 0 {
return i, curNodes, nil return i, curNodes
} }
} }
return len(path), nextNodes, nil return len(path), nextNodes
} }
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) { func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node) {
c := bTree.Cursor() c := bTree.Cursor()
var curNode Node var curNode Node
@ -1576,10 +1570,10 @@ loop:
childKey, value = c.Next() childKey, value = c.Next()
} }
return i, curNode, nil return i, curNode
} }
return len(path), curNode, nil return len(path), curNode
} }
func (t *boltForest) moveFromBytes(m *Move, data []byte) error { func (t *boltForest) moveFromBytes(m *Move, data []byte) error {

View file

@ -38,7 +38,7 @@ func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err err
err = s.SetMode(ctx, mode.DegradedReadOnly) err = s.SetMode(ctx, mode.DegradedReadOnly)
if err != nil { if err != nil {
return fmt.Errorf("switch to mode %s", mode.Mode(mode.DegradedReadOnly)) return fmt.Errorf("switch to mode %s", mode.DegradedReadOnly)
} }
return nil return nil
} }

View file

@ -94,7 +94,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
if err != nil { if err != nil {
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
return metaerr.Wrap(c.initCounters()) c.initCounters()
return nil
} }
// Init runs necessary services. // Init runs necessary services.

View file

@ -19,7 +19,6 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool {
return c.maxCacheSize >= size+objectSize return c.maxCacheSize >= size+objectSize
} }
func (c *cache) initCounters() error { func (c *cache) initCounters() {
c.estimateCacheSize() c.estimateCacheSize()
return nil
} }

View file

@ -134,11 +134,8 @@ func (l *listener) Listen(ctx context.Context) {
l.startOnce.Do(func() { l.startOnce.Do(func() {
l.wg.Add(1) l.wg.Add(1)
defer l.wg.Done() defer l.wg.Done()
if err := l.listen(ctx, nil); err != nil {
l.log.Error(ctx, logs.EventCouldNotStartListenToEvents, l.listen(ctx, nil)
zap.Error(err),
)
}
}) })
} }
@ -152,23 +149,17 @@ func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
l.startOnce.Do(func() { l.startOnce.Do(func() {
l.wg.Add(1) l.wg.Add(1)
defer l.wg.Done() defer l.wg.Done()
if err := l.listen(ctx, intError); err != nil {
l.log.Error(ctx, logs.EventCouldNotStartListenToEvents, l.listen(ctx, intError)
zap.Error(err),
)
l.sendError(ctx, intError, err)
}
}) })
} }
func (l *listener) listen(ctx context.Context, intError chan<- error) error { func (l *listener) listen(ctx context.Context, intError chan<- error) {
subErrCh := make(chan error) subErrCh := make(chan error)
go l.subscribe(subErrCh) go l.subscribe(subErrCh)
l.listenLoop(ctx, intError, subErrCh) l.listenLoop(ctx, intError, subErrCh)
return nil
} }
func (l *listener) subscribe(errCh chan error) { func (l *listener) subscribe(errCh chan error) {

View file

@ -41,7 +41,7 @@ func ParseNewEpoch(e *state.ContainedNotificationEvent) (event.Event, error) {
} }
return NewEpoch{ return NewEpoch{
Num: uint64(nee.Epoch.Uint64()), Num: nee.Epoch.Uint64(),
Hash: e.Container, Hash: e.Container,
}, nil }, nil
} }

View file

@ -42,8 +42,7 @@ func (s *Server) DropObjects(ctx context.Context, req *control.DropObjectsReques
prm.WithForceRemoval() prm.WithForceRemoval()
prm.WithAddress(addrList[i]) prm.WithAddress(addrList[i])
_, err := s.s.Delete(ctx, prm) if err := s.s.Delete(ctx, prm); err != nil && firstErr == nil {
if err != nil && firstErr == nil {
firstErr = err firstErr = err
} }
} }

View file

@ -59,7 +59,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
maxSz := s.stream.MaxSizeSrc.MaxObjectSize() maxSz := s.stream.MaxSizeSrc.MaxObjectSize()
s.sizes = &sizes{ s.sizes = &sizes{
payloadSz: uint64(v.GetHeader().GetPayloadLength()), payloadSz: v.GetHeader().GetPayloadLength(),
} }
// check payload size limit overflow // check payload size limit overflow

View file

@ -120,10 +120,7 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
} }
rem = []int{-1, -1} rem = []int{-1, -1}
sortedVector, err := sortVector(cfg, unsortedVector) sortedVector := sortVector(cfg, unsortedVector)
if err != nil {
return nil, err
}
ns = [][]netmap.NodeInfo{sortedVector, regularVector} ns = [][]netmap.NodeInfo{sortedVector, regularVector}
} else if cfg.flatSuccess != nil { } else if cfg.flatSuccess != nil {
ns = flatNodes(ns) ns = flatNodes(ns)
@ -188,7 +185,7 @@ type nodeMetrics struct {
metrics []int metrics []int
} }
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) { func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) []netmap.NodeInfo {
nm := make([]nodeMetrics, len(unsortedVector)) nm := make([]nodeMetrics, len(unsortedVector))
node := cfg.nodeState.LocalNodeInfo() node := cfg.nodeState.LocalNodeInfo()
@ -209,7 +206,7 @@ func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo,
for i := range unsortedVector { for i := range unsortedVector {
sortedVector[i] = unsortedVector[nm[i].index] sortedVector[i] = unsortedVector[nm[i].index]
} }
return sortedVector, nil return sortedVector
} }
// Node is a descriptor of storage node with information required for intra-container communication. // Node is a descriptor of storage node with information required for intra-container communication.