Compare commits

...

16 commits

Author SHA1 Message Date
fc32876347 [#351] cli: Support copies number parameter in object put
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-05-18 11:34:00 +03:00
e4a891877c [#351] Fix end of files
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-05-18 11:34:00 +03:00
4148590668 [#365] go.mod: Update api-go
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-18 09:51:07 +03:00
493cafc62a [#355] Increase tree svc client cache size to test hypotheses
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-05-17 15:17:35 +03:00
3711976dfc [#314] writecache: remove objects right after they are flushed
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-16 14:16:25 +03:00
Pavel Karpy
c3f5045842 [#314] wc: Do not lose small objects on disk errors
Do return error if an object could not been stored on WC's disk.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-15 16:27:44 +03:00
Pavel Karpy
ab65063d6d [#314] wc: Simplify background workers naming
Also, drop not used arg.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-15 16:27:42 +03:00
Pavel Karpy
c60029d3b0 [#323] node: Fix tree svc panic
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
If a connection has not been established earlier, it stores `nil` in LRU
cache. Cache eviction tries to close every connection (even a `nil` one) and
panics but not crash the app because we are using pools.
That ugly bug also leads to a deadlock where `Unlock` is not called via
`defer` func (and that is the way I found it).

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-05-04 20:04:30 +03:00
Pavel Karpy
0beb7ccf5c [#284] node: Use copy_number on server side
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-04-26 10:57:34 +03:00
0fe5e34fb0 [#231] node: Fix race condition in TTL cache
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Use key locker to lock by key.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-25 16:50:27 +03:00
bcf3f0f517 [#231] node: Invalidate container cache on PutSuccess event
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
For example: frostfs-cli creates container and makes polling
GetContainer requests. These requests go through container cache,
so not found error stores in container cache.
So container cache can contain not found error when PutSuccess event received.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-20 17:34:00 +03:00
79d59e4ed2 [#266] services/tree: Do not accept requests until initial sync is finished
Some checks failed
ci/woodpecker/pr/pre-commit Pipeline failed
ci/woodpecker/push/pre-commit Pipeline failed
`Apply` is deliberately left out -- we don't want to miss anything new.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
364b4ac572 [#266] services/tree: Batch operations on the service level
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
f7679a8168 [#266] services/tree: Return operation log up to some height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
2dc2fe8780 [#266] pilorama: Allow to get current tree height
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-04-18 13:30:45 +03:00
21412ef24a [#263] node: Up api-go version
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
Fix panic in tracing.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-18 11:45:42 +03:00
30 changed files with 328 additions and 430 deletions

View file

@ -4,8 +4,13 @@ Changelog for FrostFS Node
## [Unreleased]
### Added
- Support copies number parameter in `frostfs-cli object put` (#351)
### Changed
### Fixed
- Copy number was not used for `PUT` requests (#284)
- Tree service panic in its internal client cache (#323)
### Removed
### Updated
### Updating from v0.36.0
@ -84,7 +89,7 @@ You need to change configuration environment variables to `FROSTFS_*` if you use
New config field `object.delete.tombstone_lifetime` allows to set tombstone lifetime
more appropriate for a specific deployment.
Use `__SYSTEM__` prefix for system attributes instead of `__NEOFS__`
Use `__SYSTEM__` prefix for system attributes instead of `__NEOFS__`
(existed objects with old attributes will be treated as before, but for new objects new attributes will be used).
## Older versions

View file

@ -329,6 +329,8 @@ func CreateSession(prm CreateSessionPrm) (res CreateSessionRes, err error) {
type PutObjectPrm struct {
commonObjectPrm
copyNum uint32
hdr *object.Object
rdr io.Reader
@ -352,6 +354,11 @@ func (x *PutObjectPrm) SetHeaderCallback(f func(*object.Object)) {
x.headerCallback = f
}
// SetCopiesNumber sets number of object copies that is enough to consider put successful.
func (x *PutObjectPrm) SetCopiesNumber(copiesNumbers uint32) {
x.copyNum = copiesNumbers
}
// PutObjectRes groups the resulting values of PutObject operation.
type PutObjectRes struct {
id oid.ID
@ -381,6 +388,7 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
}
putPrm.WithXHeaders(prm.xHeaders...)
putPrm.SetCopiesNumber(prm.copyNum)
wrt, err := prm.cli.ObjectPutInit(context.Background(), putPrm)
if err != nil {

View file

@ -25,6 +25,7 @@ import (
const (
noProgressFlag = "no-progress"
notificationFlag = "notify"
copiesNumberFlag = "copies-number"
)
var putExpiredOn uint64
@ -56,6 +57,8 @@ func initObjectPutCmd() {
flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default")
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
flags.Uint32(copiesNumberFlag, 0, "Number of copies of the object to store within the RPC call")
}
func putObject(cmd *cobra.Command, _ []string) {
@ -116,6 +119,12 @@ func putObject(cmd *cobra.Command, _ []string) {
}
}
copyNum, err := cmd.Flags().GetUint32(copiesNumberFlag)
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
if copyNum > 0 {
prm.SetCopiesNumber(copyNum)
}
res, err := internalclient.PutObject(prm)
if p != nil {
p.Finish()

View file

@ -24,6 +24,61 @@ type valueWithTime[V any] struct {
e error
}
type locker struct {
mtx *sync.Mutex
waiters int // not protected by mtx, must used outer mutex to update concurrently
}
type keyLocker[K comparable] struct {
lockers map[K]*locker
lockersMtx *sync.Mutex
}
func newKeyLocker[K comparable]() *keyLocker[K] {
return &keyLocker[K]{
lockers: make(map[K]*locker),
lockersMtx: &sync.Mutex{},
}
}
func (l *keyLocker[K]) LockKey(key K) {
l.lockersMtx.Lock()
if locker, found := l.lockers[key]; found {
locker.waiters++
l.lockersMtx.Unlock()
locker.mtx.Lock()
return
}
locker := &locker{
mtx: &sync.Mutex{},
waiters: 1,
}
locker.mtx.Lock()
l.lockers[key] = locker
l.lockersMtx.Unlock()
}
func (l *keyLocker[K]) UnlockKey(key K) {
l.lockersMtx.Lock()
defer l.lockersMtx.Unlock()
locker, found := l.lockers[key]
if !found {
return
}
if locker.waiters == 1 {
delete(l.lockers, key)
}
locker.waiters--
locker.mtx.Unlock()
}
// entity that provides TTL cache interface.
type ttlNetCache[K comparable, V any] struct {
ttl time.Duration
@ -33,6 +88,8 @@ type ttlNetCache[K comparable, V any] struct {
cache *lru.Cache[K, *valueWithTime[V]]
netRdr netValueReader[K, V]
keyLocker *keyLocker[K]
}
// complicates netValueReader with TTL caching mechanism.
@ -41,10 +98,11 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
fatalOnErr(err)
return &ttlNetCache[K, V]{
ttl: ttl,
sz: sz,
cache: cache,
netRdr: netRdr,
ttl: ttl,
sz: sz,
cache: cache,
netRdr: netRdr,
keyLocker: newKeyLocker[K](),
}
}
@ -55,22 +113,33 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
// returned value should not be modified.
func (c *ttlNetCache[K, V]) get(key K) (V, error) {
val, ok := c.cache.Peek(key)
if ok {
if time.Since(val.t) < c.ttl {
return val.v, val.e
}
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
}
c.cache.Remove(key)
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
val, ok = c.cache.Peek(key)
if ok && time.Since(val.t) < c.ttl {
return val.v, val.e
}
v, err := c.netRdr(key)
c.set(key, v, err)
c.cache.Add(key, &valueWithTime[V]{
v: v,
t: time.Now(),
e: err,
})
return v, err
}
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
c.keyLocker.LockKey(k)
defer c.keyLocker.UnlockKey(k)
c.cache.Add(k, &valueWithTime[V]{
v: v,
t: time.Now(),
@ -79,6 +148,9 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
}
func (c *ttlNetCache[K, V]) remove(key K) {
c.keyLocker.LockKey(key)
defer c.keyLocker.UnlockKey(key)
c.cache.Remove(key)
}

View file

@ -0,0 +1,32 @@
package main
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestKeyLocker(t *testing.T) {
taken := false
eg, _ := errgroup.WithContext(context.Background())
keyLocker := newKeyLocker[int]()
for i := 0; i < 100; i++ {
eg.Go(func() error {
keyLocker.LockKey(0)
defer keyLocker.UnlockKey(0)
require.False(t, taken)
taken = true
require.True(t, taken)
time.Sleep(10 * time.Millisecond)
taken = false
require.False(t, taken)
return nil
})
}
require.NoError(t, eg.Wait())
}

View file

@ -130,9 +130,10 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
// but don't forget about the profit of reading the new container and caching it:
// creation success are most commonly tracked by polling GET op.
cnr, err := cachedContainerStorage.Get(ev.ID)
cnr, err := cnrSrc.Get(ev.ID)
if err == nil {
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
cachedContainerStorage.set(ev.ID, cnr, nil)
} else {
// unlike removal, we expect successful receive of the container
// after successful creation, so logging can be useful

View file

@ -187,4 +187,4 @@ FROSTFS_STORAGE_SHARD_1_GC_REMOVER_SLEEP_INTERVAL=5m
FROSTFS_TRACING_ENABLED=true
FROSTFS_TRACING_ENDPOINT="localhost"
FROSTFS_TRACING_EXPORTER="otlp_grpc"
FROSTFS_TRACING_EXPORTER="otlp_grpc"

View file

@ -219,4 +219,3 @@ tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost"

2
go.mod
View file

@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.18
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.0
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230516125015-c3f61e7c8595
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230316081442-bec77f280a85
git.frostfs.info/TrueCloudLab/hrw v1.2.0

BIN
go.sum

Binary file not shown.

View file

@ -213,6 +213,14 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return err == nil, err
}
func (e *StorageEngine) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
index, lst, err := e.getTreeShard(cid, treeID)
if err != nil {
return 0, nil
}
return lst[index].TreeHeight(cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
index, lst, err := e.getTreeShard(cid, treeID)

View file

@ -174,6 +174,32 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, e
})
}
func (t *boltForest) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var height uint64
var retErr error
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot != nil {
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
height = binary.BigEndian.Uint64(k)
} else {
retErr = ErrTreeNotFound
}
return nil
})
if err == nil {
err = retErr
}
return height, err
}
// TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
t.modeMtx.RLock()

View file

@ -216,6 +216,15 @@ func (f *memoryForest) TreeList(cid cid.ID) ([]string, error) {
return res, nil
}
func (f *memoryForest) TreeHeight(cid cid.ID, treeID string) (uint64, error) {
fullID := cid.EncodeToString() + "/" + treeID
tree, ok := f.treeMap[fullID]
if !ok {
return 0, ErrTreeNotFound
}
return tree.operations[len(tree.operations)-1].Time, nil
}
// TreeExists implements the pilorama.Forest interface.
func (f *memoryForest) TreeExists(cid cid.ID, treeID string) (bool, error) {
fullID := cid.EncodeToString() + "/" + treeID

View file

@ -527,10 +527,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
checkExists(t, false, cid, treeID)
})
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
require.NoError(t, s.TreeApply(cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID)
height, err := s.TreeHeight(cid, treeID)
require.NoError(t, err)
require.EqualValues(t, 11, height)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
checkExists(t, false, cid, "another tree") // same CID, different tree
_, err = s.TreeHeight(cidtest.ID(), treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
checkExists(t, false, cid, "another tree") // same CID, different tree
t.Run("can be removed", func(t *testing.T) {
require.NoError(t, s.TreeDrop(cid, treeID))

View file

@ -48,6 +48,8 @@ type Forest interface {
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
// TreeHeight returns current tree height.
TreeHeight(cid cidSDK.ID, treeID string) (uint64, error)
}
type ForestStorage interface {

View file

@ -155,6 +155,13 @@ func (s *Shard) TreeList(cid cidSDK.ID) ([]string, error) {
return s.pilorama.TreeList(cid)
}
func (s *Shard) TreeHeight(cid cidSDK.ID, treeID string) (uint64, error) {
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeHeight(cid, treeID)
}
// TreeExists implements the pilorama.Forest interface.
func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
if s.pilorama == nil {

View file

@ -32,11 +32,11 @@ const (
func (c *cache) runFlushLoop() {
for i := 0; i < c.workersCount; i++ {
c.wg.Add(1)
go c.flushWorker(i)
go c.workerFlushSmall()
}
c.wg.Add(1)
go c.flushBigObjects()
go c.workerFlushBig()
c.wg.Add(1)
go func() {
@ -48,7 +48,7 @@ func (c *cache) runFlushLoop() {
for {
select {
case <-tt.C:
c.flushDB()
c.flushSmallObjects()
tt.Reset(defaultFlushInterval)
case <-c.closeCh:
return
@ -57,7 +57,7 @@ func (c *cache) runFlushLoop() {
}()
}
func (c *cache) flushDB() {
func (c *cache) flushSmallObjects() {
var lastKey []byte
var m []objectInfo
for {
@ -70,7 +70,7 @@ func (c *cache) flushDB() {
m = m[:0]
c.modeMtx.RLock()
if c.readOnly() || !c.initialized.Load() {
if c.readOnly() {
c.modeMtx.RUnlock()
time.Sleep(time.Second)
continue
@ -109,10 +109,6 @@ func (c *cache) flushDB() {
var count int
for i := range m {
if c.flushed.Contains(m[i].addr) {
continue
}
obj := object.New()
if err := obj.Unmarshal(m[i].data); err != nil {
continue
@ -140,7 +136,7 @@ func (c *cache) flushDB() {
}
}
func (c *cache) flushBigObjects() {
func (c *cache) workerFlushBig() {
defer c.wg.Done()
tick := time.NewTicker(defaultFlushInterval * 10)
@ -151,9 +147,6 @@ func (c *cache) flushBigObjects() {
if c.readOnly() {
c.modeMtx.RUnlock()
break
} else if !c.initialized.Load() {
c.modeMtx.RUnlock()
continue
}
_ = c.flushFSTree(true)
@ -181,10 +174,6 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
sAddr := addr.EncodeToString()
if _, ok := c.store.flushed.Peek(sAddr); ok {
return nil
}
data, err := f()
if err != nil {
c.reportFlushError("can't read a file", sAddr, err)
@ -212,9 +201,7 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
return err
}
// mark object as flushed
c.flushed.Add(sAddr, false)
c.deleteFromDisk([]string{sAddr})
return nil
}
@ -222,8 +209,8 @@ func (c *cache) flushFSTree(ignoreErrors bool) error {
return err
}
// flushWorker writes objects to the main storage.
func (c *cache) flushWorker(_ int) {
// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() {
defer c.wg.Done()
var obj *object.Object
@ -236,9 +223,12 @@ func (c *cache) flushWorker(_ int) {
}
err := c.flushObject(obj, nil)
if err == nil {
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
}
}
@ -294,10 +284,6 @@ func (c *cache) flush(ignoreErrors bool) error {
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
sa := string(k)
if _, ok := c.flushed.Peek(sa); ok {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError("can't decode object address from the DB", sa, err)
if ignoreErrors {

View file

@ -5,7 +5,6 @@ import (
"os"
"path/filepath"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -15,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -109,22 +107,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.Flush(false))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
check(t, mb, bs, objects)
})
t.Run("flush on moving to degraded mode", func(t *testing.T) {
@ -138,23 +123,9 @@ func TestFlush(t *testing.T) {
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.SetMode(mode.Degraded))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
check(t, mb, bs, objects)
})
t.Run("ignore errors", func(t *testing.T) {
@ -223,67 +194,6 @@ func TestFlush(t *testing.T) {
})
})
})
t.Run("on init", func(t *testing.T) {
wc, bs, mb := newCache(t)
objects := []objectPair{
// removed
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// not found
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
// ok
putObject(t, wc, 1),
putObject(t, wc, smallSize+1),
}
require.NoError(t, wc.Close())
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
for i := range objects {
var prm meta.PutPrm
prm.SetObject(objects[i].obj)
_, err := mb.Put(prm)
require.NoError(t, err)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(objects[0].addr, objects[1].addr)
inhumePrm.SetTombstoneAddress(oidtest.Address())
_, err := mb.Inhume(inhumePrm)
require.NoError(t, err)
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(objects[2].addr, objects[3].addr)
_, err = mb.Delete(deletePrm)
require.NoError(t, err)
require.NoError(t, bs.SetMode(mode.ReadOnly))
require.NoError(t, mb.SetMode(mode.ReadOnly))
// Open in read-only: no error, nothing is removed.
require.NoError(t, wc.Open(true))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
require.NoError(t, err, i)
}
require.NoError(t, wc.Close())
// Open in read-write: no error, something is removed.
require.NoError(t, wc.Open(false))
initWC(t, wc)
for i := range objects {
_, err := wc.Get(context.Background(), objects[i].addr)
if i < 2 {
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
} else {
require.NoError(t, err, i)
}
}
})
}
func putObject(t *testing.T, c Cache, size int) objectPair {
@ -321,11 +231,6 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
func initWC(t *testing.T, wc Cache) {
require.NoError(t, wc.Init())
require.Eventually(t, func() bool {
rawWc := wc.(*cache)
return rawWc.initialized.Load()
}, 100*time.Second, 1*time.Millisecond)
}
type dummyEpoch struct{}

View file

@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
c.flushed.Get(saddr)
return obj, obj.Unmarshal(value)
}
@ -39,7 +38,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
c.flushed.Get(saddr)
return res.Object, nil
}

View file

@ -1,183 +0,0 @@
package writecache
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
func (c *cache) initFlushMarks() {
var localWG sync.WaitGroup
localWG.Add(1)
go func() {
defer localWG.Done()
c.fsTreeFlushMarkUpdate()
}()
localWG.Add(1)
go func() {
defer localWG.Done()
c.dbFlushMarkUpdate()
}()
c.initWG.Add(1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer c.initWG.Done()
localWG.Wait()
select {
case <-c.stopInitCh:
return
case <-c.closeCh:
return
default:
}
c.initialized.Store(true)
}()
}
var errStopIter = errors.New("stop iteration")
func (c *cache) fsTreeFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in FSTree")
var prm common.IteratePrm
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
select {
case <-c.closeCh:
return errStopIter
case <-c.stopInitCh:
return errStopIter
default:
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
var prm common.DeletePrm
prm.Address = addr
_, err := c.fsTree.Delete(prm)
if err == nil {
storagelog.Write(c.log,
storagelog.AddressField(addr),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
}
}
}
return nil
}
_, _ = c.fsTree.Iterate(prm)
c.log.Info("finished updating FSTree flush marks")
}
func (c *cache) dbFlushMarkUpdate() {
c.log.Info("filling flush marks for objects in database")
var m []string
var indices []int
var lastKey []byte
var batchSize = flushBatchSize
for {
select {
case <-c.closeCh:
return
case <-c.stopInitCh:
return
default:
}
m = m[:0]
indices = indices[:0]
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
m = append(m, string(k))
}
return nil
})
var addr oid.Address
for i := range m {
if err := addr.DecodeString(m[i]); err != nil {
continue
}
flushed, needRemove := c.flushStatus(addr)
if flushed {
c.store.flushed.Add(addr.EncodeToString(), true)
if needRemove {
indices = append(indices, i)
}
}
}
if len(m) == 0 {
break
}
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
for _, j := range indices {
if err := b.Delete([]byte(m[j])); err != nil {
return err
}
}
return nil
})
if err == nil {
for _, j := range indices {
storagelog.Write(c.log,
zap.String("address", m[j]),
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
}
}
lastKey = append([]byte(m[len(m)-1]), 0)
}
c.log.Info("finished updating flush marks")
}
// flushStatus returns info about the object state in the main storage.
// First return value is true iff object exists.
// Second return value is true iff object can be safely removed.
func (c *cache) flushStatus(addr oid.Address) (bool, bool) {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(addr)
_, err := c.metabase.Exists(existsPrm)
if err != nil {
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
return needRemove, needRemove
}
var prm meta.StorageIDPrm
prm.SetAddress(addr)
mRes, _ := c.metabase.StorageID(prm)
res, err := c.blobstor.Exists(context.TODO(), common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
return err == nil && res.Exists, false
}

View file

@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error {
if _, ok := c.flushed.Peek(string(k)); ok {
return nil
}
return prm.handler(data)
})
})
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
return nil
}
data, err := f()
if err != nil {
if prm.ignoreErrors {

View file

@ -36,19 +36,6 @@ func (c *cache) setMode(m mode.Mode) error {
}
}
if !c.initialized.Load() {
close(c.stopInitCh)
c.initWG.Wait()
c.stopInitCh = make(chan struct{})
defer func() {
if err == nil && !turnOffMeta {
c.initFlushMarks()
}
}()
}
if c.db != nil {
if err = c.db.Close(); err != nil {
return fmt.Errorf("can't close write-cache database: %w", err)

View file

@ -26,8 +26,6 @@ func (c *cache) Put(prm common.PutPrm) (common.PutRes, error) {
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly
} else if !c.initialized.Load() {
return common.PutRes{}, ErrNotInitialized
}
sz := uint64(len(prm.RawData))
@ -67,7 +65,7 @@ func (c *cache) putSmall(obj objectInfo) error {
)
c.objCounters.IncDB()
}
return nil
return err
}
// putBig writes object to FSTree and pushes it to the flush workers queue.

View file

@ -11,8 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
@ -20,19 +18,7 @@ import (
// store represents persistent storage with in-memory LRU cache
// for flushed items on top of it.
type store struct {
maxFlushedMarksCount int
maxRemoveBatchSize int
// flushed contains addresses of objects that were already flushed to the main storage.
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
// frequently read ones.
// MUST NOT be used inside bolt db transaction because it's eviction handler
// removes untracked items from the database.
flushed simplelru.LRUCache[string, bool]
db *bbolt.DB
dbKeysToRemove []string
fsKeysToRemove []string
db *bbolt.DB
}
const dbName = "small.bolt"
@ -71,35 +57,9 @@ func (c *cache) openStore(readOnly bool) error {
return fmt.Errorf("could not open FSTree: %w", err)
}
// Write-cache can be opened multiple times during `SetMode`.
// flushed map must not be re-created in this case.
if c.flushed == nil {
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
}
c.initialized.Store(false)
return nil
}
// removeFlushed removes an object from the writecache.
// To minimize interference with the client operations, the actual removal
// is done in batches.
// It is not thread-safe and is used only as an evict callback to LRU cache.
func (c *cache) removeFlushed(key string, value bool) {
fromDatabase := value
if fromDatabase {
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
} else {
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
}
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
}
}
func (c *cache) deleteFromDB(keys []string) []string {
if len(keys) == 0 {
return keys

View file

@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -51,11 +50,8 @@ type cache struct {
// mtx protects statistics, counters and compressFlags.
mtx sync.RWMutex
mode mode.Mode
initialized atomic.Bool
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
initWG sync.WaitGroup // for initialisation routines only
modeMtx sync.RWMutex
mode mode.Mode
modeMtx sync.RWMutex
// compressFlags maps address of a big object to boolean value indicating
// whether object should be compressed.
@ -95,9 +91,8 @@ var (
// New creates new writecache instance.
func New(opts ...Option) Cache {
c := &cache{
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
stopInitCh: make(chan struct{}),
flushCh: make(chan *object.Object),
mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}),
options: options{
@ -116,12 +111,6 @@ func New(opts ...Option) Cache {
opts[i](&c.options)
}
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
// Assume small and big objects are stored in 50-50 proportion.
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
return c
}
@ -152,31 +141,27 @@ func (c *cache) Open(readOnly bool) error {
// Init runs necessary services.
func (c *cache) Init() error {
c.initFlushMarks()
c.runFlushLoop()
return nil
}
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
func (c *cache) Close() error {
// We cannot lock mutex for the whole operation duration
// because it is taken by some background workers, so `wg.Wait()` is done without modeMtx.
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// Finish all in-progress operations.
if err := c.setMode(mode.ReadOnly); err != nil {
return err
}
if c.closeCh != nil {
close(c.closeCh)
}
c.mode = mode.DegradedReadOnly // prevent new operations from being processed
c.modeMtx.Unlock()
c.wg.Wait()
if c.closeCh != nil {
c.closeCh = nil
}
c.initialized.Store(false)
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
c.closeCh = nil
var err error
if c.db != nil {
err = c.db.Close()

View file

@ -40,6 +40,14 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
return p
}
func (p *PutInitPrm) WithCopyNumber(v uint32) *PutInitPrm {
if p != nil {
p.traverseOpts = append(p.traverseOpts, placement.SuccessAfter(v))
}
return p
}
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
if p != nil {
p.relay = f

View file

@ -24,7 +24,8 @@ func (s *streamer) toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.Put
object.NewFromV2(oV2),
).
WithRelay(s.relayRequest).
WithCommonPrm(commonPrm), nil
WithCommonPrm(commonPrm).
WithCopyNumber(part.GetCopiesNumber()), nil
}
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {

View file

@ -27,7 +27,7 @@ type cacheItem struct {
}
const (
defaultClientCacheSize = 10
defaultClientCacheSize = 32
defaultClientConnectTimeout = time.Second * 2
defaultReconnectInterval = time.Second * 15
)
@ -36,7 +36,9 @@ var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init() {
l, _ := simplelru.NewLRU[string, cacheItem](defaultClientCacheSize, func(_ string, value cacheItem) {
_ = value.cc.Close()
if conn := value.cc; conn != nil {
_ = conn.Close()
}
})
c.LRU = *l
}

View file

@ -13,6 +13,7 @@ import (
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -31,6 +32,8 @@ type Service struct {
syncChan chan struct{}
syncPool *ants.Pool
initialSyncDone atomic.Bool
// cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
}
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
}
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
}
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
}
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
}
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody()
var cid cidSDK.ID
@ -531,9 +562,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
}
h := b.GetHeight()
lastHeight, err := s.forest.TreeHeight(cid, b.GetTreeId())
if err != nil {
return err
}
for {
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)
if err != nil || lm.Time == 0 {
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
return err
}
@ -555,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
}
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
var cid cidSDK.ID
err := cid.Decode(req.GetBody().GetContainerId())
@ -638,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
}
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
return new(HealthcheckResponse), nil
}

View file

@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
errG, ctx := errgroup.WithContext(ctx)
errG.SetLimit(1024)
var heightMtx sync.Mutex
for {
newHeight := height
req := &GetOpLogRequest{
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
},
}
if err := SignMessage(req, s.key); err != nil {
_ = errG.Wait()
return newHeight, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
_ = errG.Wait()
return newHeight, fmt.Errorf("can't initialize client: %w", err)
}
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
_ = errG.Wait()
return newHeight, err
}
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
return newHeight, err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
errG.Go(func() error {
err := s.forest.TreeApply(cid, treeID, m, true)
heightMtx.Lock()
defer heightMtx.Unlock()
if err != nil {
if newHeight > height {
height = newHeight
}
return err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
return nil
})
}
applyErr := errG.Wait()
if err == nil {
err = applyErr
}
heightMtx.Lock()
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
heightMtx.Unlock()
return newHeight, err
}
height = newHeight
heightMtx.Unlock()
}
}
@ -288,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
cnrs, err := s.cfg.cnrSource.List()
if err != nil {
s.log.Error("could not fetch containers", zap.Error(err))
continue
break
}
newMap, cnrsToSync := s.containersToSync(cnrs)
@ -299,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.log.Debug("trees have been synchronized")
}
s.initialSyncDone.Store(true)
}
}