[#381] *: Move to sync/atomic

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
bugfix/364-fix-flush
Evgenii Stratonikov 2023-05-19 18:06:20 +03:00
parent ff570847a4
commit 4b768fd115
21 changed files with 77 additions and 64 deletions

View File

@ -4,6 +4,7 @@ import (
"crypto/ecdsa"
"fmt"
"strings"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
@ -13,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"github.com/spf13/cobra"
"go.uber.org/atomic"
)
const (
@ -134,7 +134,7 @@ func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clien
const statusPollingInterval = 1 * time.Second
const reportIntervalSeconds = 5
var resp *control.GetShardEvacuationStatusResponse
reportResponse := atomic.NewPointer(resp)
reportResponse := new(atomic.Pointer[control.GetShardEvacuationStatusResponse])
pollingCompleted := make(chan struct{})
progressReportCompleted := make(chan struct{})

View File

@ -11,7 +11,7 @@ import (
"path/filepath"
"strings"
"sync"
atomicstd "sync/atomic"
"sync/atomic"
"syscall"
"time"
@ -68,7 +68,6 @@ import (
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/panjf2000/ants/v2"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
)
@ -374,7 +373,7 @@ type shared struct {
ownerIDFromKey user.ID // user ID calculated from key
// current network map
netMap atomicstd.Value // type netmap.NetMap
netMap atomic.Value // type netmap.NetMap
netMapSource netmapCore.Source
cnrClient *containerClient.Client
@ -582,6 +581,9 @@ func initCfg(appCfg *config.Config) *cfg {
}
func initInternals(appCfg *config.Config, log *logger.Logger) internals {
var healthStatus atomic.Int32
healthStatus.Store(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED))
return internals{
done: make(chan struct{}),
appCfg: appCfg,
@ -589,7 +591,7 @@ func initInternals(appCfg *config.Config, log *logger.Logger) internals {
log: log,
wg: new(sync.WaitGroup),
apiVersion: version.Current(),
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
healthStatus: &healthStatus,
}
}
@ -627,12 +629,14 @@ func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) c
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
var reBootstrapTurnedOff atomic.Bool
reBootstrapTurnedOff.Store(relayOnly)
return cfgNetmap{
scriptHash: contractsconfig.Netmap(appCfg),
state: netState,
workerPool: netmapWorkerPool,
needBootstrap: !relayOnly,
reBoostrapTurnedOff: atomic.NewBool(relayOnly),
reBoostrapTurnedOff: &reBootstrapTurnedOff,
}
}

View File

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -19,7 +20,6 @@ import (
netmapService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/netmap"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -27,7 +27,7 @@ import (
type networkState struct {
epoch *atomic.Uint64
controlNetStatus atomic.Value // control.NetmapStatus
controlNetStatus atomic.Int32 // control.NetmapStatus
nodeInfo atomic.Value // *netmapSDK.NodeInfo
@ -35,13 +35,11 @@ type networkState struct {
}
func newNetworkState() *networkState {
var nmStatus atomic.Value
nmStatus.Store(control.NetmapStatus_STATUS_UNDEFINED)
return &networkState{
epoch: atomic.NewUint64(0),
controlNetStatus: nmStatus,
ns := &networkState{
epoch: new(atomic.Uint64),
}
ns.controlNetStatus.Store(int32(control.NetmapStatus_STATUS_UNDEFINED))
return ns
}
func (s *networkState) CurrentEpoch() uint64 {
@ -91,11 +89,11 @@ func (s *networkState) setNodeInfo(ni *netmapSDK.NodeInfo) {
// calls will process this value to decide what status node should set in the
// network.
func (s *networkState) setControlNetmapStatus(st control.NetmapStatus) {
s.controlNetStatus.Store(st)
s.controlNetStatus.Store(int32(st))
}
func (s *networkState) controlNetmapStatus() (res control.NetmapStatus) {
return s.controlNetStatus.Load().(control.NetmapStatus)
return control.NetmapStatus(s.controlNetStatus.Load())
}
func (s *networkState) getNodeInfo() (res netmapSDK.NodeInfo, ok bool) {

2
go.mod
View File

@ -32,7 +32,6 @@ require (
go.etcd.io/bbolt v1.3.7
go.opentelemetry.io/otel v1.15.1
go.opentelemetry.io/otel/trace v1.15.1
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
golang.org/x/sync v0.2.0
@ -102,6 +101,7 @@ require (
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.15.1 // indirect
go.opentelemetry.io/otel/sdk v1.15.1 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect

View File

@ -2,12 +2,12 @@ package innerring
import (
"fmt"
"sync/atomic"
"testing"
"time"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
func TestIndexerReturnsIndexes(t *testing.T) {
@ -209,7 +209,7 @@ type testCommiteeFetcher struct {
}
func (f *testCommiteeFetcher) Committee() (keys.PublicKeys, error) {
f.calls.Inc()
f.calls.Add(1)
return f.keys, f.err
}
@ -220,6 +220,6 @@ type testIRFetcher struct {
}
func (f *testIRFetcher) InnerRingKeys() (keys.PublicKeys, error) {
f.calls.Inc()
f.calls.Add(1)
return f.keys, f.err
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
@ -29,7 +30,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/spf13/viper"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -52,7 +52,7 @@ type (
epochDuration atomic.Uint64
statusIndex *innerRingIndexer
precision precision.Fixed8Converter
healthStatus atomic.Value
healthStatus atomic.Int32
balanceClient *balanceClient.Client
netmapClient *nmClient.Client
persistate *state.PersistentStorage

View File

@ -153,7 +153,7 @@ func (s *Server) ResetEpochTimer(h uint32) error {
}
func (s *Server) setHealthStatus(hs control.HealthStatus) {
s.healthStatus.Store(hs)
s.healthStatus.Store(int32(hs))
if s.metrics != nil {
s.metrics.SetHealth(int32(hs))
}
@ -161,7 +161,7 @@ func (s *Server) setHealthStatus(hs control.HealthStatus) {
// HealthStatus returns the current health status of the IR application.
func (s *Server) HealthStatus() control.HealthStatus {
return s.healthStatus.Load().(control.HealthStatus)
return control.HealthStatus(s.healthStatus.Load())
}
func initPersistentStateStorage(cfg *viper.Viper) (*state.PersistentStorage, error) {

View File

@ -3,11 +3,11 @@ package blobovnicza
import (
"io/fs"
"os"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
)

View File

@ -44,7 +44,7 @@ func (b *Blobovnicza) incSize(sz uint64) {
}
func (b *Blobovnicza) decSize(sz uint64) {
b.filled.Sub(sz)
b.filled.Add(^(sz - 1))
}
func (b *Blobovnicza) full() bool {

View File

@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"strconv"
"sync/atomic"
"testing"
"time"
@ -24,7 +25,6 @@ import (
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)

View File

@ -3,6 +3,7 @@ package engine
import (
"errors"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -10,7 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -131,7 +131,7 @@ func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err er
return
}
errCount := sh.errorCount.Inc()
errCount := sh.errorCount.Add(1)
e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err)
}
@ -149,7 +149,7 @@ func (e *StorageEngine) reportShardError(
return
}
errCount := sh.errorCount.Inc()
errCount := sh.errorCount.Add(1)
e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...)
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -22,7 +23,6 @@ import (
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
@ -98,7 +98,7 @@ func (te *testEngineWrapper) setInitializedShards(t testing.TB, shards ...*shard
te.engine.shards[s.ID().String()] = hashedShard{
shardWrapper: shardWrapper{
errorCount: atomic.NewUint32(0),
errorCount: new(atomic.Uint32),
Shard: s,
},
hash: hrw.Hash([]byte(s.ID().String())),

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -17,7 +18,6 @@ import (
"git.frostfs.info/TrueCloudLab/hrw"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -41,9 +41,9 @@ type EvacuateShardRes struct {
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes() *EvacuateShardRes {
return &EvacuateShardRes{
evacuated: atomic.NewUint64(0),
total: atomic.NewUint64(0),
failed: atomic.NewUint64(0),
evacuated: new(atomic.Uint64),
total: new(atomic.Uint64),
failed: new(atomic.Uint64),
}
}
@ -97,11 +97,17 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
if p == nil {
return nil
}
return &EvacuateShardRes{
evacuated: atomic.NewUint64(p.evacuated.Load()),
total: atomic.NewUint64(p.total.Load()),
failed: atomic.NewUint64(p.failed.Load()),
res := &EvacuateShardRes{
evacuated: new(atomic.Uint64),
total: new(atomic.Uint64),
failed: new(atomic.Uint64),
}
res.evacuated.Store(p.evacuated.Load())
res.total.Store(p.total.Load())
res.failed.Store(p.failed.Load())
return res
}
const defaultEvacuateBatchSize = 100
@ -323,7 +329,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.ignoreErrors {
res.failed.Inc()
res.failed.Add(1)
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
@ -350,7 +356,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
return err
}
res.evacuated.Inc()
res.evacuated.Add(1)
}
return nil
}
@ -371,7 +377,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
res.evacuated.Inc()
res.evacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),

View File

@ -2,6 +2,7 @@ package engine
import (
"fmt"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -11,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/google/uuid"
"github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -133,7 +133,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
e.shards[strID] = hashedShard{
shardWrapper: shardWrapper{
errorCount: atomic.NewUint32(0),
errorCount: new(atomic.Uint32),
Shard: sh,
},
hash: hrw.Hash([]byte(strID)),

View File

@ -2,13 +2,13 @@ package testutil
import (
"encoding/binary"
"sync/atomic"
"testing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/exp/rand"
)
@ -27,7 +27,7 @@ var _ AddressGenerator = &SeqAddrGenerator{}
func (g *SeqAddrGenerator) Next() oid.Address {
var id oid.ID
binary.LittleEndian.PutUint64(id[:], ((g.cnt.Inc()-1)%g.MaxID)+1)
binary.LittleEndian.PutUint64(id[:], ((g.cnt.Add(1)-1)%g.MaxID)+1)
var addr oid.Address
addr.SetContainer(cid.ID{})
addr.SetObject(id)
@ -69,7 +69,7 @@ func generateObjectWithOIDWithCIDWithSize(oid oid.ID, cid cid.ID, sz uint64) *ob
func (g *SeqObjGenerator) Next() *object.Object {
var id oid.ID
binary.LittleEndian.PutUint64(id[:], g.cnt.Inc())
binary.LittleEndian.PutUint64(id[:], g.cnt.Add(1))
return generateObjectWithOIDWithCIDWithSize(id, cid.ID{}, g.ObjSize)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"runtime"
"strconv"
"sync/atomic"
"testing"
"time"
@ -15,7 +16,6 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
func prepareObjects(t testing.TB, n int) []*objectSDK.Object {
@ -49,13 +49,15 @@ func BenchmarkPut(b *testing.B) {
// Ensure the benchmark is bound by CPU and not waiting batch-delay time.
b.SetParallelism(1)
index := atomic.NewInt64(-1)
var index atomic.Int64
index.Store(-1)
objs := prepareObjects(b, b.N)
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := metaPut(db, objs[index.Inc()], nil); err != nil {
if err := metaPut(db, objs[index.Add(1)], nil); err != nil {
b.Fatal(err)
}
}
@ -65,12 +67,13 @@ func BenchmarkPut(b *testing.B) {
db := newDB(b,
meta.WithMaxBatchDelay(time.Millisecond*10),
meta.WithMaxBatchSize(1))
index := atomic.NewInt64(-1)
var index atomic.Int64
index.Store(-1)
objs := prepareObjects(b, b.N)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if err := metaPut(db, objs[index.Inc()], nil); err != nil {
if err := metaPut(db, objs[index.Add(1)], nil); err != nil {
b.Fatal(err)
}
}

View File

@ -6,6 +6,7 @@ import (
"math"
"os"
"path/filepath"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -27,7 +28,6 @@ import (
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap/zaptest"
)

View File

@ -4,6 +4,7 @@ import (
"context"
"os"
"path/filepath"
"sync/atomic"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -22,7 +23,6 @@ import (
versionSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap/zaptest"
)
@ -132,7 +132,7 @@ func TestFlush(t *testing.T) {
testIgnoreErrors := func(t *testing.T, f func(*cache)) {
var errCount atomic.Uint32
wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) {
errCount.Inc()
errCount.Add(1)
}))
objects := putObjects(t, wc)
f(wc.(*cache))

View File

@ -2,9 +2,10 @@ package writecache
import (
"fmt"
"math"
"sync/atomic"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
)
func (c *cache) estimateCacheSize() uint64 {
@ -24,11 +25,11 @@ type counters struct {
}
func (x *counters) IncDB() {
x.cDB.Inc()
x.cDB.Add(1)
}
func (x *counters) DecDB() {
x.cDB.Dec()
x.cDB.Add(math.MaxUint64)
}
func (x *counters) DB() uint64 {
@ -36,11 +37,11 @@ func (x *counters) DB() uint64 {
}
func (x *counters) IncFS() {
x.cFS.Inc()
x.cFS.Add(1)
}
func (x *counters) DecFS() {
x.cFS.Dec()
x.cFS.Add(math.MaxUint64)
}
func (x *counters) FS() uint64 {

View File

@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -26,7 +27,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/atomic"
"go.uber.org/zap"
)

View File

@ -1,8 +1,9 @@
package util
import (
"sync/atomic"
"github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
)
// WorkerPool represents a tool to control