Compare commits
16 commits
master
...
feature/35
Author | SHA1 | Date | |
---|---|---|---|
fc32876347 | |||
e4a891877c | |||
4148590668 | |||
493cafc62a | |||
3711976dfc | |||
|
c3f5045842 | ||
|
ab65063d6d | ||
|
c60029d3b0 | ||
|
0beb7ccf5c | ||
0fe5e34fb0 | |||
bcf3f0f517 | |||
79d59e4ed2 | |||
364b4ac572 | |||
f7679a8168 | |||
2dc2fe8780 | |||
21412ef24a |
30 changed files with 328 additions and 430 deletions
|
@ -4,8 +4,13 @@ Changelog for FrostFS Node
|
|||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- Support copies number parameter in `frostfs-cli object put` (#351)
|
||||
|
||||
### Changed
|
||||
### Fixed
|
||||
- Copy number was not used for `PUT` requests (#284)
|
||||
- Tree service panic in its internal client cache (#323)
|
||||
|
||||
### Removed
|
||||
### Updated
|
||||
### Updating from v0.36.0
|
||||
|
@ -84,7 +89,7 @@ You need to change configuration environment variables to `FROSTFS_*` if you use
|
|||
New config field `object.delete.tombstone_lifetime` allows to set tombstone lifetime
|
||||
more appropriate for a specific deployment.
|
||||
|
||||
Use `__SYSTEM__` prefix for system attributes instead of `__NEOFS__`
|
||||
Use `__SYSTEM__` prefix for system attributes instead of `__NEOFS__`
|
||||
(existed objects with old attributes will be treated as before, but for new objects new attributes will be used).
|
||||
|
||||
## Older versions
|
||||
|
|
|
@ -329,6 +329,8 @@ func CreateSession(prm CreateSessionPrm) (res CreateSessionRes, err error) {
|
|||
type PutObjectPrm struct {
|
||||
commonObjectPrm
|
||||
|
||||
copyNum uint32
|
||||
|
||||
hdr *object.Object
|
||||
|
||||
rdr io.Reader
|
||||
|
@ -352,6 +354,11 @@ func (x *PutObjectPrm) SetHeaderCallback(f func(*object.Object)) {
|
|||
x.headerCallback = f
|
||||
}
|
||||
|
||||
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
|
||||
func (x *PutObjectPrm) SetCopiesNumber(copiesNumbers uint32) {
|
||||
x.copyNum = copiesNumbers
|
||||
}
|
||||
|
||||
// PutObjectRes groups the resulting values of PutObject operation.
|
||||
type PutObjectRes struct {
|
||||
id oid.ID
|
||||
|
@ -381,6 +388,7 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
|
|||
}
|
||||
|
||||
putPrm.WithXHeaders(prm.xHeaders...)
|
||||
putPrm.SetCopiesNumber(prm.copyNum)
|
||||
|
||||
wrt, err := prm.cli.ObjectPutInit(context.Background(), putPrm)
|
||||
if err != nil {
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
const (
|
||||
noProgressFlag = "no-progress"
|
||||
notificationFlag = "notify"
|
||||
copiesNumberFlag = "copies-number"
|
||||
)
|
||||
|
||||
var putExpiredOn uint64
|
||||
|
@ -56,6 +57,8 @@ func initObjectPutCmd() {
|
|||
|
||||
flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default")
|
||||
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
|
||||
|
||||
flags.Uint32(copiesNumberFlag, 0, "Number of copies of the object to store within the RPC call")
|
||||
}
|
||||
|
||||
func putObject(cmd *cobra.Command, _ []string) {
|
||||
|
@ -116,6 +119,12 @@ func putObject(cmd *cobra.Command, _ []string) {
|
|||
}
|
||||
}
|
||||
|
||||
copyNum, err := cmd.Flags().GetUint32(copiesNumberFlag)
|
||||
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
|
||||
if copyNum > 0 {
|
||||
prm.SetCopiesNumber(copyNum)
|
||||
}
|
||||
|
||||
res, err := internalclient.PutObject(prm)
|
||||
if p != nil {
|
||||
p.Finish()
|
||||
|
|
|
@ -24,6 +24,61 @@ type valueWithTime[V any] struct {
|
|||
e error
|
||||
}
|
||||
|
||||
type locker struct {
|
||||
mtx *sync.Mutex
|
||||
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
||||
}
|
||||
|
||||
type keyLocker[K comparable] struct {
|
||||
lockers map[K]*locker
|
||||
lockersMtx *sync.Mutex
|
||||
}
|
||||
|
||||
func newKeyLocker[K comparable]() *keyLocker[K] {
|
||||
return &keyLocker[K]{
|
||||
lockers: make(map[K]*locker),
|
||||
lockersMtx: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (l *keyLocker[K]) LockKey(key K) {
|
||||
l.lockersMtx.Lock()
|
||||
|
||||
if locker, found := l.lockers[key]; found {
|
||||
locker.waiters++
|
||||
l.lockersMtx.Unlock()
|
||||
|
||||
locker.mtx.Lock()
|
||||
return
|
||||
}
|
||||
|
||||
locker := &locker{
|
||||
mtx: &sync.Mutex{},
|
||||
waiters: 1,
|
||||
}
|
||||
locker.mtx.Lock()
|
||||
|
||||
l.lockers[key] = locker
|
||||
l.lockersMtx.Unlock()
|
||||
}
|
||||
|
||||
func (l *keyLocker[K]) UnlockKey(key K) {
|
||||
l.lockersMtx.Lock()
|
||||
defer l.lockersMtx.Unlock()
|
||||
|
||||
locker, found := l.lockers[key]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
if locker.waiters == 1 {
|
||||
delete(l.lockers, key)
|
||||
}
|
||||
locker.waiters--
|
||||
|
||||
locker.mtx.Unlock()
|
||||
}
|
||||
|
||||
// entity that provides TTL cache interface.
|
||||
type ttlNetCache[K comparable, V any] struct {
|
||||
ttl time.Duration
|
||||
|
@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct {
|
|||
cache *lru.Cache[K, *valueWithTime[V]]
|
||||
|
||||
netRdr netValueReader[K, V]
|
||||
|
||||
keyLocker *keyLocker[K]
|
||||
}
|
||||
|
||||
// complicates netValueReader with TTL caching mechanism.
|
||||
|
@ -41,10 +98,11 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
|||
fatalOnErr(err)
|
||||
|
||||
return &ttlNetCache[K, V]{
|
||||
ttl: ttl,
|
||||
sz: sz,
|
||||
cache: cache,
|
||||
netRdr: netRdr,
|
||||
ttl: ttl,
|
||||
sz: sz,
|
||||
cache: cache,
|
||||
netRdr: netRdr,
|
||||
keyLocker: newKeyLocker[K](),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
|||
// returned value should not be modified.
|
||||
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||
val, ok := c.cache.Peek(key)
|
||||
if ok {
|
||||
if time.Since(val.t) < c.ttl {
|
||||
return val.v, val.e
|
||||
}
|
||||
if ok && time.Since(val.t) < c.ttl {
|
||||
return val.v, val.e
|
||||
}
|
||||
|
||||
c.cache.Remove(key)
|
||||
c.keyLocker.LockKey(key)
|
||||
defer c.keyLocker.UnlockKey(key)
|
||||
|
||||
val, ok = c.cache.Peek(key)
|
||||
if ok && time.Since(val.t) < c.ttl {
|
||||
return val.v, val.e
|
||||
}
|
||||
|
||||
v, err := c.netRdr(key)
|
||||
|
||||
c.set(key, v, err)
|
||||
c.cache.Add(key, &valueWithTime[V]{
|
||||
v: v,
|
||||
t: time.Now(),
|
||||
e: err,
|
||||
})
|
||||
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||
c.keyLocker.LockKey(k)
|
||||
defer c.keyLocker.UnlockKey(k)
|
||||
|
||||
c.cache.Add(k, &valueWithTime[V]{
|
||||
v: v,
|
||||
t: time.Now(),
|
||||
|
@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
|||
}
|
||||
|
||||
func (c *ttlNetCache[K, V]) remove(key K) {
|
||||
c.keyLocker.LockKey(key)
|
||||
defer c.keyLocker.UnlockKey(key)
|
||||
|
||||
c.cache.Remove(key)
|
||||
}
|
||||
|
||||
|
|
32
cmd/frostfs-node/cache_test.go
Normal file
32
cmd/frostfs-node/cache_test.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestKeyLocker(t *testing.T) {
|
||||
taken := false
|
||||
eg, _ := errgroup.WithContext(context.Background())
|
||||
keyLocker := newKeyLocker[int]()
|
||||
for i := 0; i < 100; i++ {
|
||||
eg.Go(func() error {
|
||||
keyLocker.LockKey(0)
|
||||
defer keyLocker.UnlockKey(0)
|
||||
|
||||
require.False(t, taken)
|
||||
taken = true
|
||||
require.True(t, taken)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
taken = false
|
||||
require.False(t, taken)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
|
@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
||||
// but don't forget about the profit of reading the new container and caching it:
|
||||
// creation success are most commonly tracked by polling GET op.
|
||||
cnr, err := cachedContainerStorage.Get(ev.ID)
|
||||
cnr, err := cnrSrc.Get(ev.ID)
|
||||
if err == nil {
|
||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||
cachedContainerStorage.set(ev.ID, cnr, nil)
|
||||
} else {
|
||||
// unlike removal, we expect successful receive of the container
|
||||
// after successful creation, so logging can be useful
|
||||
|
|
|
@ -187,4 +187,4 @@ FROSTFS_STORAGE_SHARD_1_GC_REMOVER_SLEEP_INTERVAL=5m
|
|||
|
||||
FROSTFS_TRACING_ENABLED=true
|
||||
FROSTFS_TRACING_ENDPOINT="localhost"
|
||||
FROSTFS_TRACING_EXPORTER="otlp_grpc"
|
||||
FROSTFS_TRACING_EXPORTER="otlp_grpc"
|
||||
|
|
|
@ -219,4 +219,3 @@ tracing:
|
|||
enabled: true
|
||||
exporter: "otlp_grpc"
|
||||
endpoint: "localhost"
|
||||
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
|
|||
go 1.18
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230516125015-c3f61e7c8595
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.0
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
|||
return err == nil, err
|
||||
}
|
||||
|
||||
func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
index, lst, err := e.getTreeShard(cid, treeID)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
return lst[index].TreeHeight(cid, treeID)
|
||||
}
|
||||
|
||||
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||
index, lst, err := e.getTreeShard(cid, treeID)
|
||||
|
|
|
@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e
|
|||
})
|
||||
}
|
||||
|
||||
func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
var height uint64
|
||||
var retErr error
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(bucketName(cid, treeID))
|
||||
if treeRoot != nil {
|
||||
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
|
||||
height = binary.BigEndian.Uint64(k)
|
||||
} else {
|
||||
retErr = ErrTreeNotFound
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
err = retErr
|
||||
}
|
||||
return height, err
|
||||
}
|
||||
|
||||
// TreeExists implements the Forest interface.
|
||||
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||
t.modeMtx.RLock()
|
||||
|
|
|
@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) {
|
||||
fullID := cid.EncodeToString() + "/" + treeID
|
||||
tree, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
return 0, ErrTreeNotFound
|
||||
}
|
||||
return tree.operations[len(tree.operations)-1].Time, nil
|
||||
}
|
||||
|
||||
// TreeExists implements the pilorama.Forest interface.
|
||||
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
|
||||
fullID := cid.EncodeToString() + "/" + treeID
|
||||
|
|
|
@ -527,10 +527,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
|
|||
checkExists(t, false, cid, treeID)
|
||||
})
|
||||
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
|
||||
checkExists(t, true, cid, treeID)
|
||||
|
||||
height, err := s.TreeHeight(cid, treeID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 11, height)
|
||||
|
||||
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||
|
||||
_, err = s.TreeHeight(cidtest.ID(), treeID)
|
||||
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||
|
||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||
|
||||
t.Run("can be removed", func(t *testing.T) {
|
||||
require.NoError(t, s.TreeDrop(cid, treeID))
|
||||
|
|
|
@ -48,6 +48,8 @@ type Forest interface {
|
|||
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
|
||||
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
|
||||
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
||||
// TreeHeight returns current tree height.
|
||||
TreeHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
||||
}
|
||||
|
||||
type ForestStorage interface {
|
||||
|
|
|
@ -155,6 +155,13 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) {
|
|||
return s.pilorama.TreeList(cid)
|
||||
}
|
||||
|
||||
func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
if s.pilorama == nil {
|
||||
return 0, ErrPiloramaDisabled
|
||||
}
|
||||
return s.pilorama.TreeHeight(cid, treeID)
|
||||
}
|
||||
|
||||
// TreeExists implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||
if s.pilorama == nil {
|
||||
|
|
|
@ -32,11 +32,11 @@ const (
|
|||
func (c *cache) runFlushLoop() {
|
||||
for i := 0; i < c.workersCount; i++ {
|
||||
c.wg.Add(1)
|
||||
go c.flushWorker(i)
|
||||
go c.workerFlushSmall()
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.flushBigObjects()
|
||||
go c.workerFlushBig()
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -48,7 +48,7 @@ func (c *cache) runFlushLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-tt.C:
|
||||
c.flushDB()
|
||||
c.flushSmallObjects()
|
||||
tt.Reset(defaultFlushInterval)
|
||||
case <-c.closeCh:
|
||||
return
|
||||
|
@ -57,7 +57,7 @@ func (c *cache) runFlushLoop() {
|
|||
}()
|
||||
}
|
||||
|
||||
func (c *cache) flushDB() {
|
||||
func (c *cache) flushSmallObjects() {
|
||||
var lastKey []byte
|
||||
var m []objectInfo
|
||||
for {
|
||||
|
@ -70,7 +70,7 @@ func (c *cache) flushDB() {
|
|||
m = m[:0]
|
||||
|
||||
c.modeMtx.RLock()
|
||||
if c.readOnly() || !c.initialized.Load() {
|
||||
if c.readOnly() {
|
||||
c.modeMtx.RUnlock()
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
|
@ -109,10 +109,6 @@ func (c *cache) flushDB() {
|
|||
|
||||
var count int
|
||||
for i := range m {
|
||||
if c.flushed.Contains(m[i].addr) {
|
||||
continue
|
||||
}
|
||||
|
||||
obj := object.New()
|
||||
if err := obj.Unmarshal(m[i].data); err != nil {
|
||||
continue
|
||||
|
@ -140,7 +136,7 @@ func (c *cache) flushDB() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *cache) flushBigObjects() {
|
||||
func (c *cache) workerFlushBig() {
|
||||
defer c.wg.Done()
|
||||
|
||||
tick := time.NewTicker(defaultFlushInterval * 10)
|
||||
|
@ -151,9 +147,6 @@ func (c *cache) flushBigObjects() {
|
|||
if c.readOnly() {
|
||||
c.modeMtx.RUnlock()
|
||||
break
|
||||
} else if !c.initialized.Load() {
|
||||
c.modeMtx.RUnlock()
|
||||
continue
|
||||
}
|
||||
|
||||
_ = c.flushFSTree(true)
|
||||
|
@ -181,10 +174,6 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
|
|||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
sAddr := addr.EncodeToString()
|
||||
|
||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
c.reportFlushError("can't read a file", sAddr, err)
|
||||
|
@ -212,9 +201,7 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// mark object as flushed
|
||||
c.flushed.Add(sAddr, false)
|
||||
|
||||
c.deleteFromDisk([]string{sAddr})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -222,8 +209,8 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// flushWorker writes objects to the main storage.
|
||||
func (c *cache) flushWorker(_ int) {
|
||||
// workerFlushSmall writes small objects to the main storage.
|
||||
func (c *cache) workerFlushSmall() {
|
||||
defer c.wg.Done()
|
||||
|
||||
var obj *object.Object
|
||||
|
@ -236,9 +223,12 @@ func (c *cache) flushWorker(_ int) {
|
|||
}
|
||||
|
||||
err := c.flushObject(obj, nil)
|
||||
if err == nil {
|
||||
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
|
||||
if err != nil {
|
||||
// Error is handled in flushObject.
|
||||
continue
|
||||
}
|
||||
|
||||
c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -294,10 +284,6 @@ func (c *cache) flush(ignoreErrors bool) error {
|
|||
cs := b.Cursor()
|
||||
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
||||
sa := string(k)
|
||||
if _, ok := c.flushed.Peek(sa); ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := addr.DecodeString(sa); err != nil {
|
||||
c.reportFlushError("can't decode object address from the DB", sa, err)
|
||||
if ignoreErrors {
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -15,7 +14,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -109,22 +107,9 @@ func TestFlush(t *testing.T) {
|
|||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||
|
||||
require.NoError(t, wc.Flush(false))
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
var mPrm meta.GetPrm
|
||||
mPrm.SetAddress(objects[i].addr)
|
||||
_, err := mb.Get(mPrm)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
check(t, mb, bs, objects[2:])
|
||||
check(t, mb, bs, objects)
|
||||
})
|
||||
|
||||
t.Run("flush on moving to degraded mode", func(t *testing.T) {
|
||||
|
@ -138,23 +123,9 @@ func TestFlush(t *testing.T) {
|
|||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||
|
||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
var mPrm meta.GetPrm
|
||||
mPrm.SetAddress(objects[i].addr)
|
||||
_, err := mb.Get(mPrm)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
check(t, mb, bs, objects[2:])
|
||||
check(t, mb, bs, objects)
|
||||
})
|
||||
|
||||
t.Run("ignore errors", func(t *testing.T) {
|
||||
|
@ -223,67 +194,6 @@ func TestFlush(t *testing.T) {
|
|||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("on init", func(t *testing.T) {
|
||||
wc, bs, mb := newCache(t)
|
||||
objects := []objectPair{
|
||||
// removed
|
||||
putObject(t, wc, 1),
|
||||
putObject(t, wc, smallSize+1),
|
||||
// not found
|
||||
putObject(t, wc, 1),
|
||||
putObject(t, wc, smallSize+1),
|
||||
// ok
|
||||
putObject(t, wc, 1),
|
||||
putObject(t, wc, smallSize+1),
|
||||
}
|
||||
|
||||
require.NoError(t, wc.Close())
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
for i := range objects {
|
||||
var prm meta.PutPrm
|
||||
prm.SetObject(objects[i].obj)
|
||||
_, err := mb.Put(prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
|
||||
inhumePrm.SetTombstoneAddress(oidtest.Address())
|
||||
_, err := mb.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var deletePrm meta.DeletePrm
|
||||
deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
|
||||
_, err = mb.Delete(deletePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
|
||||
// Open in read-only: no error, nothing is removed.
|
||||
require.NoError(t, wc.Open(true))
|
||||
initWC(t, wc)
|
||||
for i := range objects {
|
||||
_, err := wc.Get(context.Background(), objects[i].addr)
|
||||
require.NoError(t, err, i)
|
||||
}
|
||||
require.NoError(t, wc.Close())
|
||||
|
||||
// Open in read-write: no error, something is removed.
|
||||
require.NoError(t, wc.Open(false))
|
||||
initWC(t, wc)
|
||||
for i := range objects {
|
||||
_, err := wc.Get(context.Background(), objects[i].addr)
|
||||
if i < 2 {
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
|
||||
} else {
|
||||
require.NoError(t, err, i)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func putObject(t *testing.T, c Cache, size int) objectPair {
|
||||
|
@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
|
|||
|
||||
func initWC(t *testing.T, wc Cache) {
|
||||
require.NoError(t, wc.Init())
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
rawWc := wc.(*cache)
|
||||
return rawWc.initialized.Load()
|
||||
}, 100*time.Second, 1*time.Millisecond)
|
||||
}
|
||||
|
||||
type dummyEpoch struct{}
|
||||
|
|
|
@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
|||
value, err := Get(c.db, []byte(saddr))
|
||||
if err == nil {
|
||||
obj := objectSDK.New()
|
||||
c.flushed.Get(saddr)
|
||||
return obj, obj.Unmarshal(value)
|
||||
}
|
||||
|
||||
|
@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
|||
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||
}
|
||||
|
||||
c.flushed.Get(saddr)
|
||||
return res.Object, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,183 +0,0 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (c *cache) initFlushMarks() {
|
||||
var localWG sync.WaitGroup
|
||||
|
||||
localWG.Add(1)
|
||||
go func() {
|
||||
defer localWG.Done()
|
||||
|
||||
c.fsTreeFlushMarkUpdate()
|
||||
}()
|
||||
|
||||
localWG.Add(1)
|
||||
go func() {
|
||||
defer localWG.Done()
|
||||
|
||||
c.dbFlushMarkUpdate()
|
||||
}()
|
||||
|
||||
c.initWG.Add(1)
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
defer c.initWG.Done()
|
||||
|
||||
localWG.Wait()
|
||||
|
||||
select {
|
||||
case <-c.stopInitCh:
|
||||
return
|
||||
case <-c.closeCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
c.initialized.Store(true)
|
||||
}()
|
||||
}
|
||||
|
||||
var errStopIter = errors.New("stop iteration")
|
||||
|
||||
func (c *cache) fsTreeFlushMarkUpdate() {
|
||||
c.log.Info("filling flush marks for objects in FSTree")
|
||||
|
||||
var prm common.IteratePrm
|
||||
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return errStopIter
|
||||
case <-c.stopInitCh:
|
||||
return errStopIter
|
||||
default:
|
||||
}
|
||||
|
||||
flushed, needRemove := c.flushStatus(addr)
|
||||
if flushed {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
if needRemove {
|
||||
var prm common.DeletePrm
|
||||
prm.Address = addr
|
||||
|
||||
_, err := c.fsTree.Delete(prm)
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(addr),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("fstree DELETE"),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
_, _ = c.fsTree.Iterate(prm)
|
||||
c.log.Info("finished updating FSTree flush marks")
|
||||
}
|
||||
|
||||
func (c *cache) dbFlushMarkUpdate() {
|
||||
c.log.Info("filling flush marks for objects in database")
|
||||
|
||||
var m []string
|
||||
var indices []int
|
||||
var lastKey []byte
|
||||
var batchSize = flushBatchSize
|
||||
for {
|
||||
select {
|
||||
case <-c.closeCh:
|
||||
return
|
||||
case <-c.stopInitCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m = m[:0]
|
||||
indices = indices[:0]
|
||||
|
||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cs := b.Cursor()
|
||||
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
|
||||
m = append(m, string(k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
var addr oid.Address
|
||||
for i := range m {
|
||||
if err := addr.DecodeString(m[i]); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
flushed, needRemove := c.flushStatus(addr)
|
||||
if flushed {
|
||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
||||
if needRemove {
|
||||
indices = append(indices, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(m) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
for _, j := range indices {
|
||||
if err := b.Delete([]byte(m[j])); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
for _, j := range indices {
|
||||
storagelog.Write(c.log,
|
||||
zap.String("address", m[j]),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("db DELETE"),
|
||||
)
|
||||
}
|
||||
}
|
||||
lastKey = append([]byte(m[len(m)-1]), 0)
|
||||
}
|
||||
|
||||
c.log.Info("finished updating flush marks")
|
||||
}
|
||||
|
||||
// flushStatus returns info about the object state in the main storage.
|
||||
// First return value is true iff object exists.
|
||||
// Second return value is true iff object can be safely removed.
|
||||
func (c *cache) flushStatus(addr oid.Address) (bool, bool) {
|
||||
var existsPrm meta.ExistsPrm
|
||||
existsPrm.SetAddress(addr)
|
||||
|
||||
_, err := c.metabase.Exists(existsPrm)
|
||||
if err != nil {
|
||||
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
|
||||
return needRemove, needRemove
|
||||
}
|
||||
|
||||
var prm meta.StorageIDPrm
|
||||
prm.SetAddress(addr)
|
||||
|
||||
mRes, _ := c.metabase.StorageID(prm)
|
||||
res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
|
||||
return err == nil && res.Exists, false
|
||||
}
|
|
@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
|||
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
return b.ForEach(func(k, data []byte) error {
|
||||
if _, ok := c.flushed.Peek(string(k)); ok {
|
||||
return nil
|
||||
}
|
||||
return prm.handler(data)
|
||||
})
|
||||
})
|
||||
|
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
|||
var fsPrm common.IteratePrm
|
||||
fsPrm.IgnoreErrors = prm.ignoreErrors
|
||||
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
||||
return nil
|
||||
}
|
||||
data, err := f()
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
|
|
|
@ -36,19 +36,6 @@ func (c *cache) setMode(m mode.Mode) error {
|
|||
}
|
||||
}
|
||||
|
||||
if !c.initialized.Load() {
|
||||
close(c.stopInitCh)
|
||||
|
||||
c.initWG.Wait()
|
||||
c.stopInitCh = make(chan struct{})
|
||||
|
||||
defer func() {
|
||||
if err == nil && !turnOffMeta {
|
||||
c.initFlushMarks()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if c.db != nil {
|
||||
if err = c.db.Close(); err != nil {
|
||||
return fmt.Errorf("can't close write-cache database: %w", err)
|
||||
|
|
|
@ -26,8 +26,6 @@ func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
|
|||
defer c.modeMtx.RUnlock()
|
||||
if c.readOnly() {
|
||||
return common.PutRes{}, ErrReadOnly
|
||||
} else if !c.initialized.Load() {
|
||||
return common.PutRes{}, ErrNotInitialized
|
||||
}
|
||||
|
||||
sz := uint64(len(prm.RawData))
|
||||
|
@ -67,7 +65,7 @@ func (c *cache) putSmall(obj objectInfo) error {
|
|||
)
|
||||
c.objCounters.IncDB()
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||
|
|
|
@ -11,8 +11,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -20,19 +18,7 @@ import (
|
|||
// store represents persistent storage with in-memory LRU cache
|
||||
// for flushed items on top of it.
|
||||
type store struct {
|
||||
maxFlushedMarksCount int
|
||||
maxRemoveBatchSize int
|
||||
|
||||
// flushed contains addresses of objects that were already flushed to the main storage.
|
||||
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
|
||||
// frequently read ones.
|
||||
// MUST NOT be used inside bolt db transaction because it's eviction handler
|
||||
// removes untracked items from the database.
|
||||
flushed simplelru.LRUCache[string, bool]
|
||||
db *bbolt.DB
|
||||
|
||||
dbKeysToRemove []string
|
||||
fsKeysToRemove []string
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
const dbName = "small.bolt"
|
||||
|
@ -71,35 +57,9 @@ func (c *cache) openStore(readOnly bool) error {
|
|||
return fmt.Errorf("could not open FSTree: %w", err)
|
||||
}
|
||||
|
||||
// Write-cache can be opened multiple times during `SetMode`.
|
||||
// flushed map must not be re-created in this case.
|
||||
if c.flushed == nil {
|
||||
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
|
||||
}
|
||||
|
||||
c.initialized.Store(false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeFlushed removes an object from the writecache.
|
||||
// To minimize interference with the client operations, the actual removal
|
||||
// is done in batches.
|
||||
// It is not thread-safe and is used only as an evict callback to LRU cache.
|
||||
func (c *cache) removeFlushed(key string, value bool) {
|
||||
fromDatabase := value
|
||||
if fromDatabase {
|
||||
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
|
||||
} else {
|
||||
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
|
||||
}
|
||||
|
||||
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
|
||||
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
|
||||
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDB(keys []string) []string {
|
||||
if len(keys) == 0 {
|
||||
return keys
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -51,11 +50,8 @@ type cache struct {
|
|||
// mtx protects statistics, counters and compressFlags.
|
||||
mtx sync.RWMutex
|
||||
|
||||
mode mode.Mode
|
||||
initialized atomic.Bool
|
||||
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
|
||||
initWG sync.WaitGroup // for initialisation routines only
|
||||
modeMtx sync.RWMutex
|
||||
mode mode.Mode
|
||||
modeMtx sync.RWMutex
|
||||
|
||||
// compressFlags maps address of a big object to boolean value indicating
|
||||
// whether object should be compressed.
|
||||
|
@ -95,9 +91,8 @@ var (
|
|||
// New creates new writecache instance.
|
||||
func New(opts ...Option) Cache {
|
||||
c := &cache{
|
||||
flushCh: make(chan *object.Object),
|
||||
mode: mode.ReadWrite,
|
||||
stopInitCh: make(chan struct{}),
|
||||
flushCh: make(chan *object.Object),
|
||||
mode: mode.ReadWrite,
|
||||
|
||||
compressFlags: make(map[string]struct{}),
|
||||
options: options{
|
||||
|
@ -116,12 +111,6 @@ func New(opts ...Option) Cache {
|
|||
opts[i](&c.options)
|
||||
}
|
||||
|
||||
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
|
||||
// Assume small and big objects are stored in 50-50 proportion.
|
||||
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
|
||||
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
|
||||
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
|
@ -152,31 +141,27 @@ func (c *cache) Open(readOnly bool) error {
|
|||
|
||||
// Init runs necessary services.
|
||||
func (c *cache) Init() error {
|
||||
c.initFlushMarks()
|
||||
c.runFlushLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||
func (c *cache) Close() error {
|
||||
// We cannot lock mutex for the whole operation duration
|
||||
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
// Finish all in-progress operations.
|
||||
if err := c.setMode(mode.ReadOnly); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.closeCh != nil {
|
||||
close(c.closeCh)
|
||||
}
|
||||
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
|
||||
c.modeMtx.Unlock()
|
||||
|
||||
c.wg.Wait()
|
||||
if c.closeCh != nil {
|
||||
c.closeCh = nil
|
||||
}
|
||||
|
||||
c.initialized.Store(false)
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
c.closeCh = nil
|
||||
var err error
|
||||
if c.db != nil {
|
||||
err = c.db.Close()
|
||||
|
|
|
@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
|
|||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v))
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.relay = f
|
||||
|
|
|
@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
|
|||
object.NewFromV2(oV2),
|
||||
).
|
||||
WithRelay(s.relayRequest).
|
||||
WithCommonPrm(commonPrm), nil
|
||||
WithCommonPrm(commonPrm).
|
||||
WithCopyNumber(part.GetCopiesNumber()), nil
|
||||
}
|
||||
|
||||
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {
|
||||
|
|
|
@ -27,7 +27,7 @@ type cacheItem struct {
|
|||
}
|
||||
|
||||
const (
|
||||
defaultClientCacheSize = 10
|
||||
defaultClientCacheSize = 32
|
||||
defaultClientConnectTimeout = time.Second * 2
|
||||
defaultReconnectInterval = time.Second * 15
|
||||
)
|
||||
|
@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed")
|
|||
|
||||
func (c *clientCache) init() {
|
||||
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
|
||||
_ = value.cc.Close()
|
||||
if conn := value.cc; conn != nil {
|
||||
_ = conn.Close()
|
||||
}
|
||||
})
|
||||
c.LRU = *l
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -31,6 +32,8 @@ type Service struct {
|
|||
syncChan chan struct{}
|
||||
syncPool *ants.Pool
|
||||
|
||||
initialSyncDone atomic.Bool
|
||||
|
||||
// cnrMap contains existing (used) container IDs.
|
||||
cnrMap map[cidSDK.ID]struct{}
|
||||
// cnrMapMtx protects cnrMap
|
||||
|
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
|
|||
}
|
||||
|
||||
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
|||
}
|
||||
|
||||
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
|||
}
|
||||
|
||||
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
// Move applies client operation to the specified tree and pushes in queue
|
||||
// for replication on other nodes.
|
||||
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
|||
}
|
||||
|
||||
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
|||
}
|
||||
|
||||
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
|||
}
|
||||
|
||||
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
}
|
||||
|
||||
h := b.GetHeight()
|
||||
lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
|
||||
if err != nil || lm.Time == 0 {
|
||||
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
}
|
||||
|
||||
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
||||
err := cid.Decode(req.GetBody().GetContainerId())
|
||||
|
@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
|
|||
}
|
||||
|
||||
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
return new(HealthcheckResponse), nil
|
||||
}
|
||||
|
|
|
@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
|||
rawCID := make([]byte, sha256.Size)
|
||||
cid.Encode(rawCID)
|
||||
|
||||
errG, ctx := errgroup.WithContext(ctx)
|
||||
errG.SetLimit(1024)
|
||||
|
||||
var heightMtx sync.Mutex
|
||||
|
||||
for {
|
||||
newHeight := height
|
||||
req := &GetOpLogRequest{
|
||||
|
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
|||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
_ = errG.Wait()
|
||||
return newHeight, err
|
||||
}
|
||||
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
if err != nil {
|
||||
_ = errG.Wait()
|
||||
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
|
||||
|
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
|||
Child: lm.ChildId,
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
_ = errG.Wait()
|
||||
return newHeight, err
|
||||
}
|
||||
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
if m.Time > newHeight {
|
||||
newHeight = m.Time + 1
|
||||
} else {
|
||||
newHeight++
|
||||
}
|
||||
errG.Go(func() error {
|
||||
err := s.forest.TreeApply(cid, treeID, m, true)
|
||||
heightMtx.Lock()
|
||||
defer heightMtx.Unlock()
|
||||
if err != nil {
|
||||
if newHeight > height {
|
||||
height = newHeight
|
||||
}
|
||||
return err
|
||||
}
|
||||
if m.Time > newHeight {
|
||||
newHeight = m.Time + 1
|
||||
} else {
|
||||
newHeight++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
applyErr := errG.Wait()
|
||||
if err == nil {
|
||||
err = applyErr
|
||||
}
|
||||
|
||||
heightMtx.Lock()
|
||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||
heightMtx.Unlock()
|
||||
return newHeight, err
|
||||
}
|
||||
height = newHeight
|
||||
heightMtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -288,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
cnrs, err := s.cfg.cnrSource.List()
|
||||
if err != nil {
|
||||
s.log.Error("could not fetch containers", zap.Error(err))
|
||||
continue
|
||||
break
|
||||
}
|
||||
|
||||
newMap, cnrsToSync := s.containersToSync(cnrs)
|
||||
|
@ -299,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
|
||||
s.log.Debug("trees have been synchronized")
|
||||
}
|
||||
s.initialSyncDone.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue