forked from TrueCloudLab/frostfs-node
[#2165] services/tree: Parallelize synchronization
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
de9957e076
commit
c299b98afe
2 changed files with 30 additions and 6 deletions
|
@ -11,6 +11,7 @@ import (
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/container/acl"
|
"github.com/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||||
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "github.com/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "github.com/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,6 +27,7 @@ type Service struct {
|
||||||
containerCache containerCache
|
containerCache containerCache
|
||||||
|
|
||||||
syncChan chan struct{}
|
syncChan chan struct{}
|
||||||
|
syncPool *ants.Pool
|
||||||
cnrMap map[cidSDK.ID]struct{}
|
cnrMap map[cidSDK.ID]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +56,7 @@ func New(opts ...Option) *Service {
|
||||||
s.containerCache.init(s.containerCacheSize)
|
s.containerCache.init(s.containerCacheSize)
|
||||||
s.cnrMap = make(map[cidSDK.ID]struct{})
|
s.cnrMap = make(map[cidSDK.ID]struct{})
|
||||||
s.syncChan = make(chan struct{})
|
s.syncChan = make(chan struct{})
|
||||||
|
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)
|
||||||
|
|
||||||
return &s
|
return &s
|
||||||
}
|
}
|
||||||
|
@ -75,6 +78,7 @@ func (s *Service) Start(ctx context.Context) {
|
||||||
// Shutdown shutdowns the service.
|
// Shutdown shutdowns the service.
|
||||||
func (s *Service) Shutdown() {
|
func (s *Service) Shutdown() {
|
||||||
close(s.closeCh)
|
close(s.closeCh)
|
||||||
|
s.syncPool.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||||
|
|
|
@ -7,12 +7,14 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
"github.com/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "github.com/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "github.com/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
@ -22,6 +24,8 @@ import (
|
||||||
// because the node is not included in the container.
|
// because the node is not included in the container.
|
||||||
var ErrNotInContainer = errors.New("node is not in container")
|
var ErrNotInContainer = errors.New("node is not in container")
|
||||||
|
|
||||||
|
const defaultSyncWorkerCount = 20
|
||||||
|
|
||||||
// SynchronizeAllTrees synchronizes all the trees of the container. It fetches
|
// SynchronizeAllTrees synchronizes all the trees of the container. It fetches
|
||||||
// tree IDs from the other container nodes. Returns ErrNotInContainer if the node
|
// tree IDs from the other container nodes. Returns ErrNotInContainer if the node
|
||||||
// is not included in the container.
|
// is not included in the container.
|
||||||
|
@ -271,17 +275,33 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// sync new containers
|
// sync new containers
|
||||||
|
var wg sync.WaitGroup
|
||||||
for _, cnr := range cnrsToSync {
|
for _, cnr := range cnrsToSync {
|
||||||
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
|
wg.Add(1)
|
||||||
|
cnr := cnr
|
||||||
|
err := s.syncPool.Submit(func() {
|
||||||
|
defer wg.Done()
|
||||||
|
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
|
||||||
|
|
||||||
err = s.SynchronizeAllTrees(ctx, cnr)
|
err := s.SynchronizeAllTrees(ctx, cnr)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
|
wg.Done()
|
||||||
continue
|
s.log.Error("could not query trees for synchronization",
|
||||||
|
zap.Stringer("cid", cnr),
|
||||||
|
zap.Error(err))
|
||||||
|
if errors.Is(err, ants.ErrPoolClosed) {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
|
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
// remove stored redundant trees
|
// remove stored redundant trees
|
||||||
for cnr := range s.cnrMap {
|
for cnr := range s.cnrMap {
|
||||||
|
|
Loading…
Reference in a new issue