[#1598] golangci: Enable unparam linter
All checks were successful
DCO action / DCO (pull_request) Successful in 35s
Vulncheck / Vulncheck (pull_request) Successful in 1m6s
Build / Build Components (pull_request) Successful in 1m45s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m44s
Tests and linters / Staticcheck (pull_request) Successful in 1m59s
Tests and linters / gopls check (pull_request) Successful in 2m19s
Tests and linters / Run gofumpt (pull_request) Successful in 2m35s
Tests and linters / Lint (pull_request) Successful in 2m56s
Tests and linters / Tests with -race (pull_request) Successful in 3m48s
Tests and linters / Tests (pull_request) Successful in 3m58s
All checks were successful
DCO action / DCO (pull_request) Successful in 35s
Vulncheck / Vulncheck (pull_request) Successful in 1m6s
Build / Build Components (pull_request) Successful in 1m45s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m44s
Tests and linters / Staticcheck (pull_request) Successful in 1m59s
Tests and linters / gopls check (pull_request) Successful in 2m19s
Tests and linters / Run gofumpt (pull_request) Successful in 2m35s
Tests and linters / Lint (pull_request) Successful in 2m56s
Tests and linters / Tests with -race (pull_request) Successful in 3m48s
Tests and linters / Tests (pull_request) Successful in 3m58s
To drop unnecessary parameters and return values. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
4d5ae59a52
commit
fb928616cc
26 changed files with 123 additions and 155 deletions
|
@ -90,5 +90,6 @@ linters:
|
||||||
- intrange
|
- intrange
|
||||||
- tenv
|
- tenv
|
||||||
- unconvert
|
- unconvert
|
||||||
|
- unparam
|
||||||
disable-all: true
|
disable-all: true
|
||||||
fast: false
|
fast: false
|
||||||
|
|
|
@ -253,7 +253,7 @@ func frostfsidListNamespaces(cmd *cobra.Command, _ []string) {
|
||||||
reader := frostfsidrpclient.NewReader(inv, hash)
|
reader := frostfsidrpclient.NewReader(inv, hash)
|
||||||
sessionID, it, err := reader.ListNamespaces()
|
sessionID, it, err := reader.ListNamespaces()
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
namespaces, err := frostfsidclient.ParseNamespaces(items)
|
namespaces, err := frostfsidclient.ParseNamespaces(items)
|
||||||
|
@ -305,7 +305,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListNamespaceSubjects(ns)
|
sessionID, it, err := reader.ListNamespaceSubjects(ns)
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
|
|
||||||
subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, iteratorBatchSize, sessionID))
|
subAddresses, err := frostfsidclient.UnwrapArrayOfUint160(readIterator(inv, &it, sessionID))
|
||||||
commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't unwrap: %w", err)
|
||||||
|
|
||||||
sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) })
|
sort.Slice(subAddresses, func(i, j int) bool { return subAddresses[i].Less(subAddresses[j]) })
|
||||||
|
@ -319,7 +319,7 @@ func frostfsidListSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListSubjects()
|
sessionID, it, err := reader.ListSubjects()
|
||||||
commonCmd.ExitOnErr(cmd, "can't get subject: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get subject: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
subj, err := frostfsidclient.ParseSubject(items)
|
subj, err := frostfsidclient.ParseSubject(items)
|
||||||
|
@ -365,7 +365,7 @@ func frostfsidListGroups(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListGroups(ns)
|
sessionID, it, err := reader.ListGroups(ns)
|
||||||
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't get namespace: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
||||||
groups, err := frostfsidclient.ParseGroups(items)
|
groups, err := frostfsidclient.ParseGroups(items)
|
||||||
commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't parse groups: %w", err)
|
||||||
|
@ -415,7 +415,7 @@ func frostfsidListGroupSubjects(cmd *cobra.Command, _ []string) {
|
||||||
sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID))
|
sessionID, it, err := reader.ListGroupSubjects(ns, big.NewInt(groupID))
|
||||||
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't list groups: %w", err)
|
||||||
|
|
||||||
items, err := readIterator(inv, &it, iteratorBatchSize, sessionID)
|
items, err := readIterator(inv, &it, sessionID)
|
||||||
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't read iterator: %w", err)
|
||||||
|
|
||||||
subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err)
|
subjects, err := frostfsidclient.UnwrapArrayOfUint160(items, err)
|
||||||
|
@ -492,17 +492,17 @@ func (f *frostfsidClient) sendWaitRes() (*state.AppExecResult, error) {
|
||||||
return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil)
|
return f.roCli.Wait(f.wCtx.SentTxs[0].Hash, f.wCtx.SentTxs[0].Vub, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func readIterator(inv *invoker.Invoker, iter *result.Iterator, batchSize int, sessionID uuid.UUID) ([]stackitem.Item, error) {
|
func readIterator(inv *invoker.Invoker, iter *result.Iterator, sessionID uuid.UUID) ([]stackitem.Item, error) {
|
||||||
var shouldStop bool
|
var shouldStop bool
|
||||||
res := make([]stackitem.Item, 0)
|
res := make([]stackitem.Item, 0)
|
||||||
for !shouldStop {
|
for !shouldStop {
|
||||||
items, err := inv.TraverseIterator(sessionID, iter, batchSize)
|
items, err := inv.TraverseIterator(sessionID, iter, iteratorBatchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res = append(res, items...)
|
res = append(res, items...)
|
||||||
shouldStop = len(items) < batchSize
|
shouldStop = len(items) < iteratorBatchSize
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
|
|
@ -23,11 +23,11 @@ type policyPlaygroundREPL struct {
|
||||||
nodes map[string]netmap.NodeInfo
|
nodes map[string]netmap.NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPolicyPlaygroundREPL(cmd *cobra.Command) (*policyPlaygroundREPL, error) {
|
func newPolicyPlaygroundREPL(cmd *cobra.Command) *policyPlaygroundREPL {
|
||||||
return &policyPlaygroundREPL{
|
return &policyPlaygroundREPL{
|
||||||
cmd: cmd,
|
cmd: cmd,
|
||||||
nodes: map[string]netmap.NodeInfo{},
|
nodes: map[string]netmap.NodeInfo{},
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repl *policyPlaygroundREPL) handleLs(args []string) error {
|
func (repl *policyPlaygroundREPL) handleLs(args []string) error {
|
||||||
|
@ -246,8 +246,7 @@ var policyPlaygroundCmd = &cobra.Command{
|
||||||
Long: `A REPL for testing placement policies.
|
Long: `A REPL for testing placement policies.
|
||||||
If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`,
|
If a wallet and endpoint is provided, the initial netmap data will be loaded from the snapshot of the node. Otherwise, an empty playground is created.`,
|
||||||
Run: func(cmd *cobra.Command, _ []string) {
|
Run: func(cmd *cobra.Command, _ []string) {
|
||||||
repl, err := newPolicyPlaygroundREPL(cmd)
|
repl := newPolicyPlaygroundREPL(cmd)
|
||||||
commonCmd.ExitOnErr(cmd, "could not create policy playground: %w", err)
|
|
||||||
commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run())
|
commonCmd.ExitOnErr(cmd, "policy playground failed: %w", repl.run())
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,10 +124,7 @@ func (v *BucketsView) loadNodeChildren(
|
||||||
path := parentBucket.Path
|
path := parentBucket.Path
|
||||||
parser := parentBucket.NextParser
|
parser := parentBucket.NextParser
|
||||||
|
|
||||||
buffer, err := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize)
|
buffer := LoadBuckets(ctx, v.ui.db, path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range buffer {
|
for item := range buffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -135,6 +132,7 @@ func (v *BucketsView) loadNodeChildren(
|
||||||
}
|
}
|
||||||
bucket := item.val
|
bucket := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil)
|
bucket.Entry, bucket.NextParser, err = parser(bucket.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -180,10 +178,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Check the current bucket's nested buckets if exist
|
// Check the current bucket's nested buckets if exist
|
||||||
bucketsBuffer, err := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
bucketsBuffer := LoadBuckets(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range bucketsBuffer {
|
for item := range bucketsBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -191,6 +186,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
b := item.val
|
b := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil)
|
b.Entry, b.NextParser, err = bucket.NextParser(b.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -206,10 +202,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the current bucket's nested records if exist
|
// Check the current bucket's nested records if exist
|
||||||
recordsBuffer, err := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
recordsBuffer := LoadRecords(ctx, v.ui.db, bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for item := range recordsBuffer {
|
for item := range recordsBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
|
@ -217,6 +210,7 @@ func (v *BucketsView) bucketSatisfiesFilter(
|
||||||
}
|
}
|
||||||
r := item.val
|
r := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
r.Entry, _, err = bucket.NextParser(r.Key, r.Value)
|
r.Entry, _, err = bucket.NextParser(r.Key, r.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
|
|
@ -35,7 +35,7 @@ func resolvePath(tx *bbolt.Tx, path [][]byte) (*bbolt.Bucket, error) {
|
||||||
func load[T any](
|
func load[T any](
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
filter func(key, value []byte) bool, transform func(key, value []byte) T,
|
filter func(key, value []byte) bool, transform func(key, value []byte) T,
|
||||||
) (<-chan Item[T], error) {
|
) <-chan Item[T] {
|
||||||
buffer := make(chan Item[T], bufferSize)
|
buffer := make(chan Item[T], bufferSize)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -77,13 +77,13 @@ func load[T any](
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadBuckets(
|
func LoadBuckets(
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
) (<-chan Item[*Bucket], error) {
|
) <-chan Item[*Bucket] {
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, bufferSize,
|
ctx, db, path, bufferSize,
|
||||||
func(_, value []byte) bool {
|
func(_, value []byte) bool {
|
||||||
return value == nil
|
return value == nil
|
||||||
|
@ -98,17 +98,14 @@ func LoadBuckets(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadRecords(
|
func LoadRecords(
|
||||||
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
||||||
) (<-chan Item[*Record], error) {
|
) <-chan Item[*Record] {
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, bufferSize,
|
ctx, db, path, bufferSize,
|
||||||
func(_, value []byte) bool {
|
func(_, value []byte) bool {
|
||||||
return value != nil
|
return value != nil
|
||||||
|
@ -124,11 +121,8 @@ func LoadRecords(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buffer, nil
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasBuckets checks if a bucket has nested buckets. It relies on assumption
|
// HasBuckets checks if a bucket has nested buckets. It relies on assumption
|
||||||
|
@ -137,24 +131,21 @@ func HasBuckets(ctx context.Context, db *bbolt.DB, path [][]byte) (bool, error)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
buffer, err := load(
|
buffer := load(
|
||||||
ctx, db, path, 1,
|
ctx, db, path, 1,
|
||||||
nil,
|
nil,
|
||||||
func(_, value []byte) []byte { return value },
|
func(_, value []byte) []byte { return value },
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
x, ok := <-buffer
|
x, ok := <-buffer
|
||||||
if !ok {
|
if !ok {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if x.err != nil {
|
if x.err != nil {
|
||||||
return false, err
|
return false, x.err
|
||||||
}
|
}
|
||||||
if x.val != nil {
|
if x.val != nil {
|
||||||
return false, err
|
return false, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,10 +62,7 @@ func (v *RecordsView) Mount(ctx context.Context) error {
|
||||||
|
|
||||||
ctx, v.onUnmount = context.WithCancel(ctx)
|
ctx, v.onUnmount = context.WithCancel(ctx)
|
||||||
|
|
||||||
tempBuffer, err := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize)
|
tempBuffer := LoadRecords(ctx, v.ui.db, v.bucket.Path, v.ui.loadBufferSize)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
v.buffer = make(chan *Record, v.ui.loadBufferSize)
|
v.buffer = make(chan *Record, v.ui.loadBufferSize)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -73,11 +70,12 @@ func (v *RecordsView) Mount(ctx context.Context) error {
|
||||||
|
|
||||||
for item := range tempBuffer {
|
for item := range tempBuffer {
|
||||||
if item.err != nil {
|
if item.err != nil {
|
||||||
v.ui.stopOnError(err)
|
v.ui.stopOnError(item.err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
record := item.val
|
record := item.val
|
||||||
|
|
||||||
|
var err error
|
||||||
record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value)
|
record.Entry, _, err = v.bucket.NextParser(record.Key, record.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
v.ui.stopOnError(err)
|
v.ui.stopOnError(err)
|
||||||
|
|
|
@ -698,8 +698,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
||||||
|
|
||||||
netState.metrics = c.metricsCollector
|
netState.metrics = c.metricsCollector
|
||||||
|
|
||||||
logPrm, err := c.loggerPrm()
|
logPrm := c.loggerPrm()
|
||||||
fatalOnErr(err)
|
|
||||||
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
||||||
log, err := logger.NewLogger(logPrm)
|
log, err := logger.NewLogger(logPrm)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
@ -1059,7 +1058,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
||||||
return sh
|
return sh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
func (c *cfg) loggerPrm() *logger.Prm {
|
||||||
// check if it has been inited before
|
// check if it has been inited before
|
||||||
if c.dynamicConfiguration.logger == nil {
|
if c.dynamicConfiguration.logger == nil {
|
||||||
c.dynamicConfiguration.logger = new(logger.Prm)
|
c.dynamicConfiguration.logger = new(logger.Prm)
|
||||||
|
@ -1078,7 +1077,7 @@ func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
||||||
}
|
}
|
||||||
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
|
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
|
||||||
|
|
||||||
return c.dynamicConfiguration.logger, nil
|
return c.dynamicConfiguration.logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) LocalAddress() network.AddressGroup {
|
func (c *cfg) LocalAddress() network.AddressGroup {
|
||||||
|
@ -1334,11 +1333,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
|
|
||||||
// Logger
|
// Logger
|
||||||
|
|
||||||
logPrm, err := c.loggerPrm()
|
logPrm := c.loggerPrm()
|
||||||
if err != nil {
|
|
||||||
c.log.Error(ctx, logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
components := c.getComponents(ctx, logPrm)
|
components := c.getComponents(ctx, logPrm)
|
||||||
|
|
||||||
|
|
|
@ -146,7 +146,6 @@ const (
|
||||||
ClientCantGetBlockchainHeight = "can't get blockchain height"
|
ClientCantGetBlockchainHeight = "can't get blockchain height"
|
||||||
ClientCantGetBlockchainHeight243 = "can't get blockchain height"
|
ClientCantGetBlockchainHeight243 = "can't get blockchain height"
|
||||||
EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool"
|
EventCouldNotSubmitHandlerToWorkerPool = "could not Submit handler to worker pool"
|
||||||
EventCouldNotStartListenToEvents = "could not start listen to events"
|
|
||||||
EventStopEventListenerByError = "stop event listener by error"
|
EventStopEventListenerByError = "stop event listener by error"
|
||||||
EventStopEventListenerByContext = "stop event listener by context"
|
EventStopEventListenerByContext = "stop event listener by context"
|
||||||
EventStopEventListenerByNotificationChannel = "stop event listener by notification channel"
|
EventStopEventListenerByNotificationChannel = "stop event listener by notification channel"
|
||||||
|
@ -384,7 +383,6 @@ const (
|
||||||
FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown"
|
FrostFSNodeShutdownSkip = "node is already shutting down, skipped shutdown"
|
||||||
FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing"
|
FrostFSNodeShutdownWhenNotReady = "node is going to shut down when subsystems are still initializing"
|
||||||
FrostFSNodeConfigurationReading = "configuration reading"
|
FrostFSNodeConfigurationReading = "configuration reading"
|
||||||
FrostFSNodeLoggerConfigurationPreparation = "logger configuration preparation"
|
|
||||||
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
FrostFSNodeTracingConfigationUpdated = "tracing configation updated"
|
||||||
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
FrostFSNodeStorageEngineConfigurationUpdate = "storage engine configuration update"
|
||||||
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
|
FrostFSNodePoolConfigurationUpdate = "adjust pool configuration"
|
||||||
|
|
|
@ -38,10 +38,7 @@ import (
|
||||||
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
||||||
alphaSync event.Handler,
|
alphaSync event.Handler,
|
||||||
) error {
|
) error {
|
||||||
locodeValidator, err := s.newLocodeValidator(cfg)
|
locodeValidator := s.newLocodeValidator(cfg)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
netSettings := (*networkSettings)(s.netmapClient)
|
netSettings := (*networkSettings)(s.netmapClient)
|
||||||
|
|
||||||
|
@ -51,6 +48,7 @@ func (s *Server) initNetmapProcessor(ctx context.Context, cfg *viper.Viper,
|
||||||
poolSize := cfg.GetInt("workers.netmap")
|
poolSize := cfg.GetInt("workers.netmap")
|
||||||
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
|
s.log.Debug(ctx, logs.NetmapNetmapWorkerPool, zap.Int("size", poolSize))
|
||||||
|
|
||||||
|
var err error
|
||||||
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
Metrics: s.irMetrics,
|
Metrics: s.irMetrics,
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, error) {
|
func (s *Server) newLocodeValidator(cfg *viper.Viper) netmap.NodeValidator {
|
||||||
locodeDB := locodebolt.New(locodebolt.Prm{
|
locodeDB := locodebolt.New(locodebolt.Prm{
|
||||||
Path: cfg.GetString("locode.db.path"),
|
Path: cfg.GetString("locode.db.path"),
|
||||||
},
|
},
|
||||||
|
@ -21,7 +21,7 @@ func (s *Server) newLocodeValidator(cfg *viper.Viper) (netmap.NodeValidator, err
|
||||||
|
|
||||||
return irlocode.New(irlocode.Prm{
|
return irlocode.New(irlocode.Prm{
|
||||||
DB: (*locodeBoltDBWrapper)(locodeDB),
|
DB: (*locodeBoltDBWrapper)(locodeDB),
|
||||||
}), nil
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type locodeBoltEntryWrapper struct {
|
type locodeBoltEntryWrapper struct {
|
||||||
|
|
|
@ -19,7 +19,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errObjectIsDeleteProtected = errors.New("object is delete protected")
|
var (
|
||||||
|
errObjectIsDeleteProtected = errors.New("object is delete protected")
|
||||||
|
deleteRes = common.DeleteRes{}
|
||||||
|
)
|
||||||
|
|
||||||
// Delete deletes object from blobovnicza tree.
|
// Delete deletes object from blobovnicza tree.
|
||||||
//
|
//
|
||||||
|
@ -43,17 +46,17 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if b.readOnly {
|
if b.readOnly {
|
||||||
return common.DeleteRes{}, common.ErrReadOnly
|
return deleteRes, common.ErrReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.rebuildGuard.TryRLock() {
|
if b.rebuildGuard.TryRLock() {
|
||||||
defer b.rebuildGuard.RUnlock()
|
defer b.rebuildGuard.RUnlock()
|
||||||
} else {
|
} else {
|
||||||
return common.DeleteRes{}, errRebuildInProgress
|
return deleteRes, errRebuildInProgress
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.deleteProtectedObjects.Contains(prm.Address) {
|
if b.deleteProtectedObjects.Contains(prm.Address) {
|
||||||
return common.DeleteRes{}, errObjectIsDeleteProtected
|
return deleteRes, errObjectIsDeleteProtected
|
||||||
}
|
}
|
||||||
|
|
||||||
var bPrm blobovnicza.DeletePrm
|
var bPrm blobovnicza.DeletePrm
|
||||||
|
@ -98,7 +101,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
|
|
||||||
if err == nil && !objectFound {
|
if err == nil && !objectFound {
|
||||||
// not found in any blobovnicza
|
// not found in any blobovnicza
|
||||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return deleteRes, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
success = err == nil
|
success = err == nil
|
||||||
|
|
||||||
|
@ -112,7 +115,7 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
|
||||||
shBlz := b.getBlobovnicza(ctx, blzPath)
|
shBlz := b.getBlobovnicza(ctx, blzPath)
|
||||||
blz, err := shBlz.Open(ctx)
|
blz, err := shBlz.Open(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.DeleteRes{}, err
|
return deleteRes, err
|
||||||
}
|
}
|
||||||
defer shBlz.Close(ctx)
|
defer shBlz.Close(ctx)
|
||||||
|
|
||||||
|
@ -122,5 +125,5 @@ func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicz
|
||||||
// removes object from blobovnicza and returns common.DeleteRes.
|
// removes object from blobovnicza and returns common.DeleteRes.
|
||||||
func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) {
|
func (b *Blobovniczas) deleteObject(ctx context.Context, blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm) (common.DeleteRes, error) {
|
||||||
_, err := blz.Delete(ctx, prm)
|
_, err := blz.Delete(ctx, prm)
|
||||||
return common.DeleteRes{}, err
|
return deleteRes, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -249,6 +249,12 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
||||||
|
|
|
@ -48,8 +48,8 @@ func (e *StorageEngine) ContainerSize(ctx context.Context, prm ContainerSizePrm)
|
||||||
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
|
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.containerSize(ctx, prm)
|
res = e.containerSize(ctx, prm)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -69,7 +69,7 @@ func ContainerSize(ctx context.Context, e *StorageEngine, id cid.ID) (uint64, er
|
||||||
return res.Size(), nil
|
return res.Size(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes, err error) {
|
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes) {
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
var csPrm shard.ContainerSizePrm
|
var csPrm shard.ContainerSizePrm
|
||||||
csPrm.SetContainerID(prm.cnr)
|
csPrm.SetContainerID(prm.cnr)
|
||||||
|
@ -96,8 +96,8 @@ func (e *StorageEngine) ListContainers(ctx context.Context, _ ListContainersPrm)
|
||||||
defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
|
defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.listContainers(ctx)
|
res = e.listContainers(ctx)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -115,7 +115,7 @@ func ListContainers(ctx context.Context, e *StorageEngine) ([]cid.ID, error) {
|
||||||
return res.Containers(), nil
|
return res.Containers(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes, error) {
|
func (e *StorageEngine) listContainers(ctx context.Context) ListContainersRes {
|
||||||
uniqueIDs := make(map[string]cid.ID)
|
uniqueIDs := make(map[string]cid.ID)
|
||||||
|
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
|
@ -142,5 +142,5 @@ func (e *StorageEngine) listContainers(ctx context.Context) (ListContainersRes,
|
||||||
|
|
||||||
return ListContainersRes{
|
return ListContainersRes{
|
||||||
containers: result,
|
containers: result,
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ type DeletePrm struct {
|
||||||
// DeleteRes groups the resulting values of Delete operation.
|
// DeleteRes groups the resulting values of Delete operation.
|
||||||
type DeleteRes struct{}
|
type DeleteRes struct{}
|
||||||
|
|
||||||
|
var deleteRes = DeleteRes{}
|
||||||
|
|
||||||
// WithAddress is a Delete option to set the addresses of the objects to delete.
|
// WithAddress is a Delete option to set the addresses of the objects to delete.
|
||||||
//
|
//
|
||||||
// Option is required.
|
// Option is required.
|
||||||
|
@ -126,14 +128,14 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
|
||||||
})
|
})
|
||||||
|
|
||||||
if locked.is {
|
if locked.is {
|
||||||
return DeleteRes{}, new(apistatus.ObjectLocked)
|
return deleteRes, new(apistatus.ObjectLocked)
|
||||||
}
|
}
|
||||||
|
|
||||||
if splitInfo != nil {
|
if splitInfo != nil {
|
||||||
e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID())
|
e.deleteChildren(ctx, prm.addr, prm.forceRemoval, splitInfo.SplitID())
|
||||||
}
|
}
|
||||||
|
|
||||||
return DeleteRes{}, nil
|
return deleteRes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) {
|
func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, force bool, splitID *objectSDK.SplitID) {
|
||||||
|
|
|
@ -30,6 +30,8 @@ type InhumePrm struct {
|
||||||
// InhumeRes encapsulates results of inhume operation.
|
// InhumeRes encapsulates results of inhume operation.
|
||||||
type InhumeRes struct{}
|
type InhumeRes struct{}
|
||||||
|
|
||||||
|
var inhumeRes = InhumeRes{}
|
||||||
|
|
||||||
// WithTarget sets a list of objects that should be inhumed and tombstone address
|
// WithTarget sets a list of objects that should be inhumed and tombstone address
|
||||||
// as the reason for inhume operation.
|
// as the reason for inhume operation.
|
||||||
//
|
//
|
||||||
|
@ -83,7 +85,7 @@ func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRe
|
||||||
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval)
|
addrsPerShard, err := e.groupObjectsByShard(ctx, prm.addrs, !prm.forceRemoval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return InhumeRes{}, err
|
return inhumeRes, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var shPrm shard.InhumePrm
|
var shPrm shard.InhumePrm
|
||||||
|
@ -107,7 +109,7 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
|
||||||
zap.String("shard_id", shardID),
|
zap.String("shard_id", shardID),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
||||||
)
|
)
|
||||||
return InhumeRes{}, errInhumeFailure
|
return inhumeRes, errInhumeFailure
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := sh.Inhume(ctx, shPrm); err != nil {
|
if _, err := sh.Inhume(ctx, shPrm); err != nil {
|
||||||
|
@ -119,11 +121,11 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
|
||||||
default:
|
default:
|
||||||
e.reportShardError(ctx, sh, "couldn't inhume object in shard", err)
|
e.reportShardError(ctx, sh, "couldn't inhume object in shard", err)
|
||||||
}
|
}
|
||||||
return InhumeRes{}, err
|
return inhumeRes, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return InhumeRes{}, nil
|
return inhumeRes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// groupObjectsByShard groups objects based on the shard(s) they are stored on.
|
// groupObjectsByShard groups objects based on the shard(s) they are stored on.
|
||||||
|
|
|
@ -54,19 +54,17 @@ func (e *StorageEngine) Select(ctx context.Context, prm SelectPrm) (res SelectRe
|
||||||
defer elapsed("Select", e.metrics.AddMethodDuration)()
|
defer elapsed("Select", e.metrics.AddMethodDuration)()
|
||||||
|
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e._select(ctx, prm)
|
res = e._select(ctx, prm)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
|
func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) SelectRes {
|
||||||
addrList := make([]oid.Address, 0)
|
addrList := make([]oid.Address, 0)
|
||||||
uniqueMap := make(map[string]struct{})
|
uniqueMap := make(map[string]struct{})
|
||||||
|
|
||||||
var outError error
|
|
||||||
|
|
||||||
var shPrm shard.SelectPrm
|
var shPrm shard.SelectPrm
|
||||||
shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
|
shPrm.SetContainerID(prm.cnr, prm.indexedContainer)
|
||||||
shPrm.SetFilters(prm.filters)
|
shPrm.SetFilters(prm.filters)
|
||||||
|
@ -90,7 +88,7 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
|
||||||
|
|
||||||
return SelectRes{
|
return SelectRes{
|
||||||
addrList: addrList,
|
addrList: addrList,
|
||||||
}, outError
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns `limit` available physically storage object addresses in engine.
|
// List returns `limit` available physically storage object addresses in engine.
|
||||||
|
@ -100,14 +98,14 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
|
||||||
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
|
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
|
||||||
defer elapsed("List", e.metrics.AddMethodDuration)()
|
defer elapsed("List", e.metrics.AddMethodDuration)()
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.list(ctx, limit)
|
res = e.list(ctx, limit)
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, error) {
|
func (e *StorageEngine) list(ctx context.Context, limit uint64) SelectRes {
|
||||||
addrList := make([]oid.Address, 0, limit)
|
addrList := make([]oid.Address, 0, limit)
|
||||||
uniqueMap := make(map[string]struct{})
|
uniqueMap := make(map[string]struct{})
|
||||||
ln := uint64(0)
|
ln := uint64(0)
|
||||||
|
@ -136,7 +134,7 @@ func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, erro
|
||||||
|
|
||||||
return SelectRes{
|
return SelectRes{
|
||||||
addrList: addrList,
|
addrList: addrList,
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select selects objects from local storage using provided filters.
|
// Select selects objects from local storage using provided filters.
|
||||||
|
|
|
@ -56,7 +56,7 @@ func (db *DB) containers(tx *bbolt.Tx) ([]cid.ID, error) {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
|
func (db *DB) ContainerSize(id cid.ID) (uint64, error) {
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
@ -64,21 +64,22 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
|
||||||
return 0, ErrDegradedMode
|
return 0, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
var size uint64
|
||||||
size, err = db.containerSize(tx, id)
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
size = db.containerSize(tx, id)
|
||||||
|
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return size, metaerr.Wrap(err)
|
return size, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) {
|
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) uint64 {
|
||||||
containerVolume := tx.Bucket(containerVolumeBucketName)
|
containerVolume := tx.Bucket(containerVolumeBucketName)
|
||||||
key := make([]byte, cidSize)
|
key := make([]byte, cidSize)
|
||||||
id.Encode(key)
|
id.Encode(key)
|
||||||
|
|
||||||
return parseContainerSize(containerVolume.Get(key)), nil
|
return parseContainerSize(containerVolume.Get(key))
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
||||||
|
|
|
@ -251,13 +251,13 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error {
|
||||||
b := tx.Bucket(shardInfoBucket)
|
b := tx.Bucket(shardInfoBucket)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.updateShardObjectCounterBucket(b, typ, delta, inc)
|
return db.updateShardObjectCounterBucket(b, typ, delta, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
|
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
|
||||||
|
|
|
@ -161,21 +161,21 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
||||||
|
|
||||||
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
||||||
if res.phyCount > 0 {
|
if res.phyCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
|
err := db.decShardObjectCounter(tx, phy, res.phyCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decrease phy object counter: %w", err)
|
return fmt.Errorf("decrease phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.logicCount > 0 {
|
if res.logicCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
|
err := db.decShardObjectCounter(tx, logical, res.logicCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decrease logical object counter: %w", err)
|
return fmt.Errorf("decrease logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.userCount > 0 {
|
if res.userCount > 0 {
|
||||||
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
|
err := db.decShardObjectCounter(tx, user, res.userCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decrease user object counter: %w", err)
|
return fmt.Errorf("decrease user object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -342,10 +342,10 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
if err := db.decShardObjectCounter(tx, logical, res.LogicInhumed()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
|
if err := db.decShardObjectCounter(tx, user, res.UserInhumed()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ func (r StorageIDRes) StorageID() []byte {
|
||||||
|
|
||||||
// StorageID returns storage descriptor for objects from the blobstor.
|
// StorageID returns storage descriptor for objects from the blobstor.
|
||||||
// It is put together with the object can makes get/delete operation faster.
|
// It is put together with the object can makes get/delete operation faster.
|
||||||
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) {
|
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (StorageIDRes, error) {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
|
@ -53,32 +53,32 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
var res StorageIDRes
|
||||||
if db.mode.NoMetabase() {
|
if db.mode.NoMetabase() {
|
||||||
return res, ErrDegradedMode
|
return res, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.id, err = db.storageID(tx, prm.addr)
|
res.id = db.storageID(tx, prm.addr)
|
||||||
|
return nil
|
||||||
return err
|
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) {
|
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) []byte {
|
||||||
key := make([]byte, bucketKeySize)
|
key := make([]byte, bucketKeySize)
|
||||||
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key))
|
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key))
|
||||||
if smallBucket == nil {
|
if smallBucket == nil {
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
storageID := smallBucket.Get(objectKey(addr.Object(), key))
|
storageID := smallBucket.Get(objectKey(addr.Object(), key))
|
||||||
if storageID == nil {
|
if storageID == nil {
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return bytes.Clone(storageID), nil
|
return bytes.Clone(storageID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
|
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
|
||||||
|
|
|
@ -419,10 +419,7 @@ func (t *boltForest) addByPathInternal(d CIDDescriptor, attr string, treeID stri
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
i, node, err := t.getPathPrefix(bTree, attr, path)
|
i, node := t.getPathPrefix(bTree, attr, path)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
|
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
|
||||||
lm = make([]Move, len(path)-i+1)
|
lm = make([]Move, len(path)-i+1)
|
||||||
|
@ -980,10 +977,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
|
||||||
|
|
||||||
b := treeRoot.Bucket(dataBucket)
|
b := treeRoot.Bucket(dataBucket)
|
||||||
|
|
||||||
i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
|
i, curNodes := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if i < len(path)-1 {
|
if i < len(path)-1 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1526,7 +1520,7 @@ func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) {
|
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node) {
|
||||||
c := bTree.Cursor()
|
c := bTree.Cursor()
|
||||||
|
|
||||||
var curNodes []Node
|
var curNodes []Node
|
||||||
|
@ -1549,14 +1543,14 @@ func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr strin
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(nextNodes) == 0 {
|
if len(nextNodes) == 0 {
|
||||||
return i, curNodes, nil
|
return i, curNodes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(path), nextNodes, nil
|
return len(path), nextNodes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
|
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node) {
|
||||||
c := bTree.Cursor()
|
c := bTree.Cursor()
|
||||||
|
|
||||||
var curNode Node
|
var curNode Node
|
||||||
|
@ -1576,10 +1570,10 @@ loop:
|
||||||
childKey, value = c.Next()
|
childKey, value = c.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
return i, curNode, nil
|
return i, curNode
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(path), curNode, nil
|
return len(path), curNode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
|
func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
|
||||||
|
|
|
@ -94,7 +94,8 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
return metaerr.Wrap(c.initCounters())
|
c.initCounters()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
|
|
|
@ -19,7 +19,6 @@ func (c *cache) hasEnoughSpace(objectSize uint64) bool {
|
||||||
return c.maxCacheSize >= size+objectSize
|
return c.maxCacheSize >= size+objectSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) initCounters() error {
|
func (c *cache) initCounters() {
|
||||||
c.estimateCacheSize()
|
c.estimateCacheSize()
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,11 +134,8 @@ func (l *listener) Listen(ctx context.Context) {
|
||||||
l.startOnce.Do(func() {
|
l.startOnce.Do(func() {
|
||||||
l.wg.Add(1)
|
l.wg.Add(1)
|
||||||
defer l.wg.Done()
|
defer l.wg.Done()
|
||||||
if err := l.listen(ctx, nil); err != nil {
|
|
||||||
l.log.Error(ctx, logs.EventCouldNotStartListenToEvents,
|
l.listen(ctx, nil)
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,23 +149,17 @@ func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
||||||
l.startOnce.Do(func() {
|
l.startOnce.Do(func() {
|
||||||
l.wg.Add(1)
|
l.wg.Add(1)
|
||||||
defer l.wg.Done()
|
defer l.wg.Done()
|
||||||
if err := l.listen(ctx, intError); err != nil {
|
|
||||||
l.log.Error(ctx, logs.EventCouldNotStartListenToEvents,
|
l.listen(ctx, intError)
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
l.sendError(ctx, intError, err)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) listen(ctx context.Context, intError chan<- error) error {
|
func (l *listener) listen(ctx context.Context, intError chan<- error) {
|
||||||
subErrCh := make(chan error)
|
subErrCh := make(chan error)
|
||||||
|
|
||||||
go l.subscribe(subErrCh)
|
go l.subscribe(subErrCh)
|
||||||
|
|
||||||
l.listenLoop(ctx, intError, subErrCh)
|
l.listenLoop(ctx, intError, subErrCh)
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) subscribe(errCh chan error) {
|
func (l *listener) subscribe(errCh chan error) {
|
||||||
|
|
|
@ -120,10 +120,7 @@ func NewTraverser(opts ...Option) (*Traverser, error) {
|
||||||
}
|
}
|
||||||
rem = []int{-1, -1}
|
rem = []int{-1, -1}
|
||||||
|
|
||||||
sortedVector, err := sortVector(cfg, unsortedVector)
|
sortedVector := sortVector(cfg, unsortedVector)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
|
ns = [][]netmap.NodeInfo{sortedVector, regularVector}
|
||||||
} else if cfg.flatSuccess != nil {
|
} else if cfg.flatSuccess != nil {
|
||||||
ns = flatNodes(ns)
|
ns = flatNodes(ns)
|
||||||
|
@ -188,7 +185,7 @@ type nodeMetrics struct {
|
||||||
metrics []int
|
metrics []int
|
||||||
}
|
}
|
||||||
|
|
||||||
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) {
|
func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) []netmap.NodeInfo {
|
||||||
nm := make([]nodeMetrics, len(unsortedVector))
|
nm := make([]nodeMetrics, len(unsortedVector))
|
||||||
node := cfg.nodeState.LocalNodeInfo()
|
node := cfg.nodeState.LocalNodeInfo()
|
||||||
|
|
||||||
|
@ -209,7 +206,7 @@ func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo,
|
||||||
for i := range unsortedVector {
|
for i := range unsortedVector {
|
||||||
sortedVector[i] = unsortedVector[nm[i].index]
|
sortedVector[i] = unsortedVector[nm[i].index]
|
||||||
}
|
}
|
||||||
return sortedVector, nil
|
return sortedVector
|
||||||
}
|
}
|
||||||
|
|
||||||
// Node is a descriptor of storage node with information required for intra-container communication.
|
// Node is a descriptor of storage node with information required for intra-container communication.
|
||||||
|
|
Loading…
Reference in a new issue