Compare commits

...

20 commits

Author SHA1 Message Date
f2437f7ae9 [#734] shard: Fix Delete method
Due to the flushing data from the writecache to the storage
and simultaneous deletion, a partial deletion situation is possible.
So as a solution, deletion is allowed only when the object is in storage,
because object will be deleted from writecache by flush goroutine.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-16 17:00:18 +03:00
f26233b47a [#734] metabase: Include UpdateStorageID in metrics and traces
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-16 17:00:18 +03:00
7e0c5a55de [#734] writecache: Fix flush
Now UpdateStorageID doesn't return error in case of logical error.
If object is in graveyard or GC market, it is still required to
update storage ID.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-16 17:00:17 +03:00
d5c10612f4 [#735] policer: Register metrics
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-12 09:31:36 +03:00
3a997d1207 [#680] metrics: Initialize log metrics together with services
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-11 17:08:03 +03:00
bf082348d4 [#680] metrics: Add step export-metrics in Makefile
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-11 17:08:03 +03:00
994f48f8bb [#680] metrics: Export log and morph with script
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-11 17:08:03 +03:00
aca11d7474 [#735] policer: Allow to provide metrics from the outside
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-11 15:14:13 +03:00
5e229dc248 [#701] metrics: add metric to evaluate policer performance
Add processed objects counter in policerMetrics,
add policer field to NodeMetrics

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2023-10-09 19:02:08 +00:00
4caa934eea [#729] containersvc: Remove load announcement
IR code was removed in 8879c6ea.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-09 19:01:13 +00:00
d07afd803c [#726] writecache: Fix small object flush for Badger
Do not marshal object twice.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-06 11:32:50 +03:00
997ac7cd8d [#726] writecache: Fix small object flush for BBolt
Do not marshal object twice.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-06 11:32:44 +03:00
bd5bf8b1a9 [#721] netmap: Drop already bootstraped check
Because of this check, under certain conditions,
the node could be removed from the network map,
although the node was functioning normally.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-05 11:47:06 +03:00
f3278d76a9 [#721] netmap: Send bootstrap at each epoch tick
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-05 11:46:56 +03:00
627b302745 [#709] node: Put in log info about listening endpoints
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-02 13:31:10 +00:00
a0a35ffbec [#702] node: Update SDK version
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-09-29 18:41:48 +03:00
c1e4130020 [#146] node: Add trace_id to logs
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2023-09-27 11:05:27 +03:00
b8c3c2486d [#333] Sort containers by ID
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2023-09-25 11:30:36 +03:00
c14c9a023c [#333] Sort objects by ID in SearchObjects
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2023-09-25 11:30:36 +03:00
d9b93b12c1 [#333] Sort shards by shard_ID in cli output
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2023-09-25 10:04:29 +03:00
80 changed files with 436 additions and 2024 deletions

View file

@ -70,6 +70,12 @@ dep:
CGO_ENABLED=0 \
go mod tidy -v && echo OK
# Build export-metrics
export-metrics: dep
@printf "⇒ Build export-metrics\n"
CGO_ENABLED=0 \
go build -v -trimpath -o bin/export-metrics ./scripts/export-metrics
# Regenerate proto files:
protoc:
@GOPRIVATE=github.com/TrueCloudLab go mod vendor

View file

@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"io"
"sort"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
@ -69,6 +71,16 @@ func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContain
return
}
// SortedIDList returns sorted list of identifiers of user's containers.
func (x ListContainersRes) SortedIDList() []cid.ID {
list := x.cliRes.Containers()
sort.Slice(list, func(i, j int) bool {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
return strings.Compare(lhs, rhs) < 0
})
return list
}
// PutContainerPrm groups parameters of PutContainer operation.
type PutContainerPrm struct {
Client *client.Client
@ -727,6 +739,11 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes
return nil, fmt.Errorf("read object list: %w", err)
}
sort.Slice(list, func(i, j int) bool {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
return strings.Compare(lhs, rhs) < 0
})
return &SearchObjectsRes{
ids: list,
}, nil

View file

@ -56,7 +56,7 @@ var listContainersCmd = &cobra.Command{
Client: cli,
}
containerIDs := res.IDList()
containerIDs := res.SortedIDList()
for _, cnrID := range containerIDs {
if flagVarListName == "" && !flagVarListPrintAttr {
cmd.Println(cnrID.String())

View file

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"sort"
"strings"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
@ -49,11 +50,14 @@ func listShards(cmd *cobra.Command, _ []string) {
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
shards := resp.GetBody().GetShards()
sortShardsByID(shards)
isJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
if isJSON {
prettyPrintShardsJSON(cmd, resp.GetBody().GetShards())
prettyPrintShardsJSON(cmd, shards)
} else {
prettyPrintShards(cmd, resp.GetBody().GetShards())
prettyPrintShards(cmd, shards)
}
}
@ -115,3 +119,9 @@ func shardModeToString(m control.ShardMode) string {
return "unknown"
}
func sortShardsByID(ii []*control.ShardInfo) {
sort.Slice(ii, func(i, j int) bool {
return bytes.Compare(ii[i].Shard_ID, ii[j].Shard_ID) < 0
})
}

View file

@ -1,7 +1,9 @@
package control
import (
"bytes"
"fmt"
"sort"
"strings"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
@ -167,5 +169,9 @@ func getShardIDList(cmd *cobra.Command) [][]byte {
res = append(res, raw)
}
sort.Slice(res, func(i, j int) bool {
return bytes.Compare(res[i], res[j]) < 0
})
return res
}

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring"
irMetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/spf13/viper"
"go.uber.org/zap"
@ -61,12 +62,13 @@ func main() {
cfg, err = newConfig()
exitErr(err)
logPrm.MetricsNamespace = "frostfs_ir"
metrics := irMetrics.NewInnerRingMetrics()
err = logPrm.SetLevelString(
cfg.GetString("logger.level"),
)
exitErr(err)
logPrm.SamplingHook = metrics.LogMetrics().GetSamplingHook()
log, err = logger.NewLogger(logPrm)
exitErr(err)
@ -78,7 +80,7 @@ func main() {
metricsCmp = newMetricsComponent()
metricsCmp.init()
innerRing, err = innerring.New(ctx, log, cfg, intErr)
innerRing, err = innerring.New(ctx, log, cfg, intErr, metrics)
exitErr(err)
pprofCmp.start()

View file

@ -345,8 +345,7 @@ type internals struct {
apiVersion version.Version
healthStatus *atomic.Int32
// is node under maintenance
isMaintenance atomic.Bool
alreadyBootstraped bool
isMaintenance atomic.Bool
}
// starts node's maintenance.
@ -555,22 +554,21 @@ func initCfg(appCfg *config.Config) *cfg {
key := nodeconfig.Key(appCfg)
relayOnly := nodeconfig.Relay(appCfg)
netState := newNetworkState()
netState.metrics = c.metricsCollector
c.shared = initShared(appCfg, key, netState, relayOnly)
logPrm, err := c.loggerPrm()
fatalOnErr(err)
logPrm.MetricsNamespace = "frostfs_node"
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
log, err := logger.NewLogger(logPrm)
fatalOnErr(err)
c.internals = initInternals(appCfg, log)
relayOnly := nodeconfig.Relay(appCfg)
netState := newNetworkState()
c.shared = initShared(appCfg, key, netState, relayOnly)
c.cfgAccounting = cfgAccounting{
scriptHash: contractsconfig.Balance(appCfg),
}
@ -587,9 +585,6 @@ func initCfg(appCfg *config.Config) *cfg {
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
c.metricsCollector = metrics.NewNodeMetrics()
netState.metrics = c.metricsCollector
c.onShutdown(c.clientCache.CloseAll) // clean up connections
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
c.onShutdown(c.putClientCache.CloseAll) // clean up connections
@ -631,14 +626,15 @@ func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkSt
}
return shared{
key: key,
binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr,
respSvc: response.NewService(netState),
clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),
persistate: persistate,
key: key,
binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr,
respSvc: response.NewService(netState),
clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),
persistate: persistate,
metricsCollector: metrics.NewNodeMetrics(),
}
}

View file

@ -3,44 +3,22 @@ package main
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
"strconv"
containerV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route"
placementrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route/placement"
loadstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/storage"
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
)
const (
startEstimationNotifyEvent = "StartEstimation"
stopEstimationNotifyEvent = "StopEstimation"
)
func initContainerService(ctx context.Context, c *cfg) {
func initContainerService(_ context.Context, c *cfg) {
// container wrapper that tries to invoke notary
// requests if chain is configured so
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
@ -52,44 +30,10 @@ func initContainerService(ctx context.Context, c *cfg) {
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
loadAccumulator := loadstorage.New(loadstorage.Prm{})
loadPlacementBuilder := &loadPlacementBuilder{
log: c.log,
nmSrc: c.netMapSource,
cnrSrc: cnrSrc,
}
routeBuilder := placementrouter.New(placementrouter.Prm{
PlacementBuilder: loadPlacementBuilder,
})
loadRouter := loadroute.New(
loadroute.Prm{
LocalServerInfo: c,
RemoteWriterProvider: &remoteLoadAnnounceProvider{
key: &c.key.PrivateKey,
netmapKeys: c,
clientCache: c.bgClientCache,
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
},
Builder: routeBuilder,
},
loadroute.WithLogger(c.log),
)
setLoadController(ctx, c, loadRouter, loadAccumulator)
server := containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
),
)
@ -178,50 +122,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
return cnrRdr, cnrWrt
}
func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
pubKey := c.key.PublicKey().Bytes()
// container wrapper that always sends non-notary
// requests
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
fatalOnErr(err)
resultWriter := &morphLoadWriter{
log: c.log,
cnrMorphClient: wrapperNoNotary,
key: pubKey,
}
localMetrics := &localStorageLoad{
log: c.log,
engine: c.cfgObject.cfgLocalStorage.localStorage,
}
ctrl := loadcontroller.New(
loadcontroller.Prm{
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
LocalAnnouncementTarget: loadRouter,
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
},
loadcontroller.WithLogger(c.log),
)
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
ctrl.Start(ctx, loadcontroller.StartPrm{
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
})
})
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
ctrl.Stop(ctx, loadcontroller.StopPrm{
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
})
})
}
// addContainerNotificationHandler adds handler that will be executed synchronously.
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
typ := event.TypeFromString(sTyp)
@ -284,219 +184,6 @@ func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationPar
c.cfgContainer.parsers[typ] = p
}
type morphLoadWriter struct {
log *logger.Logger
cnrMorphClient *cntClient.Client
key []byte
}
func (w *morphLoadWriter) Put(a containerSDK.SizeEstimation) error {
w.log.Debug(logs.FrostFSNodeSaveUsedSpaceAnnouncementInContract,
zap.Uint64("epoch", a.Epoch()),
zap.Stringer("cid", a.Container()),
zap.Uint64("size", a.Value()),
)
prm := cntClient.AnnounceLoadPrm{}
prm.SetAnnouncement(a)
prm.SetReporter(w.key)
return w.cnrMorphClient.AnnounceLoad(prm)
}
func (*morphLoadWriter) Close(context.Context) error {
return nil
}
type nopLoadWriter struct{}
func (nopLoadWriter) Put(containerSDK.SizeEstimation) error {
return nil
}
func (nopLoadWriter) Close(context.Context) error {
return nil
}
type remoteLoadAnnounceProvider struct {
key *ecdsa.PrivateKey
netmapKeys netmapCore.AnnouncedKeys
clientCache interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
deadEndProvider loadcontroller.WriterProvider
}
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) {
if srv == nil {
return r.deadEndProvider, nil
}
if r.netmapKeys.IsLocalKey(srv.PublicKey()) {
// if local => return no-op writer
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
}
var info client.NodeInfo
err := client.NodeInfoFromRawNetmapElement(&info, srv)
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := r.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
return &remoteLoadAnnounceWriterProvider{
client: c,
}, nil
}
type remoteLoadAnnounceWriterProvider struct {
client client.Client
}
func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
return &remoteLoadAnnounceWriter{
client: p.client,
}, nil
}
type remoteLoadAnnounceWriter struct {
client client.Client
buf []containerSDK.SizeEstimation
}
func (r *remoteLoadAnnounceWriter) Put(a containerSDK.SizeEstimation) error {
r.buf = append(r.buf, a)
return nil
}
func (r *remoteLoadAnnounceWriter) Close(ctx context.Context) error {
cliPrm := apiClient.PrmAnnounceSpace{
Announcements: r.buf,
}
_, err := r.client.ContainerAnnounceUsedSpace(ctx, cliPrm)
return err
}
type loadPlacementBuilder struct {
log *logger.Logger
nmSrc netmapCore.Source
cnrSrc containerCore.Source
}
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) {
cnrNodes, nm, err := l.buildPlacement(epoch, cnr)
if err != nil {
return nil, err
}
const pivotPrefix = "load_announcement_"
pivot := []byte(
pivotPrefix + strconv.FormatUint(epoch, 10),
)
placement, err := nm.PlacementVectors(cnrNodes, pivot)
if err != nil {
return nil, fmt.Errorf("could not build placement vectors: %w", err)
}
return placement, nil
}
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) ([][]netmap.NodeInfo, *netmap.NetMap, error) {
cnr, err := l.cnrSrc.Get(idCnr)
if err != nil {
return nil, nil, err
}
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("could not get network map: %w", err)
}
binCnr := make([]byte, sha256.Size)
idCnr.Encode(binCnr)
cnrNodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
}
return cnrNodes, nm, nil
}
type localStorageLoad struct {
log *logger.Logger
engine *engine.StorageEngine
}
func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
idList, err := engine.ListContainers(context.TODO(), d.engine)
if err != nil {
return fmt.Errorf("list containers on engine failure: %w", err)
}
for i := range idList {
sz, err := engine.ContainerSize(d.engine, idList[i])
if err != nil {
d.log.Debug(logs.FrostFSNodeFailedToCalculateContainerSizeInStorageEngine,
zap.Stringer("cid", idList[i]),
zap.String("error", err.Error()),
)
continue
}
d.log.Debug(logs.FrostFSNodeContainerSizeInStorageEngineCalculatedSuccessfully,
zap.Uint64("size", sz),
zap.Stringer("cid", idList[i]),
)
var a containerSDK.SizeEstimation
a.SetContainer(idList[i])
a.SetValue(sz)
if f != nil && !f(a) {
continue
}
if err := h(a); err != nil {
return err
}
}
return nil
}
type usedSpaceService struct {
containerService.Server
loadWriterProvider loadcontroller.WriterProvider
loadPlacementBuilder *loadPlacementBuilder
routeBuilder loadroute.Builder
cfg *cfg
}
func (c *cfg) PublicKey() []byte {
return nodeKeyFromNetmap(c)
}
@ -517,125 +204,6 @@ func (c *cfg) ExternalAddresses() []string {
return c.cfgNodeInfo.localInfo.ExternalAddresses()
}
func (c *usedSpaceService) PublicKey() []byte {
return nodeKeyFromNetmap(c.cfg)
}
func (c *usedSpaceService) IterateAddresses(f func(string) bool) {
c.cfg.iterateNetworkAddresses(f)
}
func (c *usedSpaceService) NumberOfAddresses() int {
return c.cfg.addressNum()
}
func (c *usedSpaceService) ExternalAddresses() []string {
return c.cfg.ExternalAddresses()
}
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
var passedRoute []loadcontroller.ServerInfo
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
key: hdr.GetBodySignature().GetKey(),
})
}
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
}
passedRoute = append(passedRoute, c)
w, err := c.loadWriterProvider.InitWriter(passedRoute)
if err != nil {
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
}
var est containerSDK.SizeEstimation
for _, aV2 := range req.GetBody().GetAnnouncements() {
err = est.ReadFromV2(aV2)
if err != nil {
return nil, fmt.Errorf("invalid size announcement: %w", err)
}
if err := c.processLoadValue(ctx, est, passedRoute, w); err != nil {
return nil, err
}
}
respBody := new(containerV2.AnnounceUsedSpaceResponseBody)
resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody)
c.cfg.respSvc.SetMeta(resp)
return resp, nil
}
var errNodeOutsideContainer = errors.New("node outside the container")
type containerOnlyKeyRemoteServerInfo struct {
key []byte
}
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
return i.key
}
func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
}
func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
return 0
}
func (*containerOnlyKeyRemoteServerInfo) ExternalAddresses() []string {
return nil
}
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, key []byte) (bool, error) {
cnrNodes, _, err := l.buildPlacement(epoch, cnr)
if err != nil {
return false, err
}
for i := range cnrNodes {
for j := range cnrNodes[i] {
if bytes.Equal(cnrNodes[i][j].PublicKey(), key) {
return true, nil
}
}
}
return false, nil
}
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
route []loadcontroller.ServerInfo, w loadcontroller.Writer) error {
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
if err != nil {
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
} else if !fromCnr {
return errNodeOutsideContainer
}
err = loadroute.CheckRoute(c.routeBuilder, a, route)
if err != nil {
return fmt.Errorf("wrong route of container's used space value: %w", err)
}
err = w.Put(a)
if err != nil {
return fmt.Errorf("could not write container's used space value: %w", err)
}
return nil
}
// implements interface required by container service provided by morph executor.
type morphContainerReader struct {
eacl containerCore.EACLSource

View file

@ -14,6 +14,8 @@ import (
"google.golang.org/grpc"
)
const serviceNameControl = "control"
type treeSynchronizer struct {
treeSvc *tree.Service
}
@ -66,7 +68,10 @@ func initControlService(c *cfg) {
control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
runAndLog(ctx, c, "control", false, func(context.Context, *cfg) {
runAndLog(ctx, c, serviceNameControl, false, func(context.Context, *cfg) {
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
zap.String("service", serviceNameControl),
zap.String("endpoint", endpoint))
fatalOnErr(c.cfgControlService.server.Serve(lis))
})
}))

View file

@ -110,7 +110,8 @@ func serveGRPC(c *cfg) {
c.wg.Done()
}()
c.log.Info(logs.FrostFSNodeStartListeningGRPCEndpoint,
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
zap.String("service", "gRPC"),
zap.Stringer("endpoint", lis.Addr()),
)

View file

@ -6,7 +6,9 @@ import (
"net/http"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
httputil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/http"
"go.uber.org/zap"
)
type httpComponent struct {
@ -42,6 +44,9 @@ func (cmp *httpComponent) init(c *cfg) {
cmp.name,
func(ctx context.Context) {
runAndLog(ctx, c, cmp.name, false, func(context.Context, *cfg) {
c.log.Info(logs.FrostFSNodeStartListeningEndpoint,
zap.String("service", cmp.name),
zap.String("endpoint", cmp.address))
fatalOnErr(srv.Serve())
})
},

View file

@ -9,7 +9,6 @@ import (
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
@ -42,13 +41,13 @@ func initMorphComponents(ctx context.Context, c *cfg) {
c.key,
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
client.WithLogger(c.log),
client.WithMetrics(metrics.NewMorphClientMetrics()),
client.WithMetrics(c.metricsCollector.MorphClientMetrics()),
client.WithEndpoints(addresses...),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
}),
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
client.WithMorphCacheMetrics(metrics.NewNodeMorphCacheMetrics()),
client.WithMorphCacheMetrics(c.metricsCollector.MorphCacheMetrics()),
)
if err != nil {
c.log.Info(logs.FrostFSNodeFailedToCreateNeoRPCClient,

View file

@ -179,15 +179,8 @@ func addNewEpochNotificationHandlers(c *cfg) {
return
}
n := ev.(netmapEvent.NewEpoch).EpochNumber()
const reBootstrapInterval = 2
if (n-c.cfgNetmap.startEpoch)%reBootstrapInterval == 0 {
err := c.bootstrap()
if err != nil {
c.log.Warn(logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
}
if err := c.bootstrap(); err != nil {
c.log.Warn(logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
}
})
@ -227,10 +220,6 @@ func bootstrapNode(c *cfg) {
c.log.Info(logs.FrostFSNodeNodeIsUnderMaintenanceSkipInitialBootstrap)
return
}
if c.alreadyBootstraped {
c.log.Info(logs.NetmapNodeAlreadyInCandidateListOnlineSkipInitialBootstrap)
return
}
err := c.bootstrap()
fatalOnErrDetails("bootstrap error", err)
}
@ -263,7 +252,7 @@ func initNetmapState(c *cfg) {
fatalOnErrDetails("could not initialize current epoch number", err)
var ni *netmapSDK.NodeInfo
ni, c.alreadyBootstraped, err = c.netmapInitLocalNodeState(epoch)
ni, err = c.netmapInitLocalNodeState(epoch)
fatalOnErrDetails("could not init network state", err)
stateWord := nodeState(ni)
@ -282,13 +271,6 @@ func initNetmapState(c *cfg) {
c.handleLocalNodeInfo(ni)
}
func sameNodeInfo(a, b *netmapSDK.NodeInfo) bool {
// Suboptimal, but we do this once on the node startup.
rawA := a.Marshal()
rawB := b.Marshal()
return bytes.Equal(rawA, rawB)
}
func nodeState(ni *netmapSDK.NodeInfo) string {
if ni != nil {
switch {
@ -303,29 +285,27 @@ func nodeState(ni *netmapSDK.NodeInfo) string {
return "undefined"
}
func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool, error) {
func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
nmNodes, err := c.cfgNetmap.wrapper.GetCandidates()
if err != nil {
return nil, false, err
return nil, err
}
var candidate *netmapSDK.NodeInfo
alreadyBootstraped := false
for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
candidate = &nmNodes[i]
alreadyBootstraped = candidate.IsOnline() && sameNodeInfo(&c.cfgNodeInfo.localInfo, candidate)
break
}
}
node, err := c.netmapLocalNodeState(epoch)
if err != nil {
return nil, false, err
return nil, err
}
if candidate == nil {
return node, false, nil
return node, nil
}
nmState := nodeState(node)
@ -337,7 +317,7 @@ func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool,
zap.String("netmap", nmState),
zap.String("candidate", candidateState))
}
return candidate, alreadyBootstraped, nil
return candidate, nil
}
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {

View file

@ -167,5 +167,7 @@ func connectNats(ctx context.Context, c *cfg) {
err := c.cfgNotifications.nw.w.Connect(ctx, endpoint)
if err != nil {
panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err))
} else {
c.log.Info(logs.NatsConnectedToEndpoint, zap.String("endpoint", endpoint))
}
}

View file

@ -260,6 +260,7 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
}),
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
policer.WithPool(c.cfgObject.pool.replication),
policer.WithMetrics(c.metricsCollector.PolicerMetrics()),
)
c.workers = append(c.workers, worker{

2
go.mod
View file

@ -6,7 +6,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230928142024-84b9d29fc98c
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/cheggaaa/pb v1.0.29

BIN
go.sum

Binary file not shown.

View file

@ -123,6 +123,7 @@ const (
NatsNatsConnectionWasLost = "nats: connection was lost"
NatsNatsReconnectedToTheServer = "nats: reconnected to the server"
NatsNatsClosingConnectionAsTheContextIsDone = "nats: closing connection as the context is done"
NatsConnectedToEndpoint = "nats: successfully connected to endpoint"
ControllerStartingToAnnounceTheValuesOfTheMetrics = "starting to announce the values of the metrics"
ControllerCouldNotInitializeIteratorOverLocallyCollectedMetrics = "could not initialize iterator over locally collected metrics"
ControllerCouldNotInitializeAnnouncementAccumulator = "could not initialize announcement accumulator"
@ -413,10 +414,10 @@ const (
FrostFSIRCouldntCreateRPCClientForEndpoint = "could not create RPC client for endpoint"
FrostFSIRCreatedRPCClientForEndpoint = "created RPC client for endpoint"
FrostFSIRReloadExtraWallets = "reload extra wallets"
FrostFSNodeStartListeningEndpoint = "start listening endpoint"
FrostFSNodeCouldNotReadCertificateFromFile = "could not read certificate from file"
FrostFSNodeCantListenGRPCEndpoint = "can't listen gRPC endpoint"
FrostFSNodeStopListeningGRPCEndpoint = "stop listening gRPC endpoint"
FrostFSNodeStartListeningGRPCEndpoint = "start listening gRPC endpoint"
FrostFSNodeStoppingGRPCServer = "stopping gRPC server..."
FrostFSNodeGRPCCannotShutdownGracefullyForcingStop = "gRPC cannot shutdown gracefully, forcing stop"
FrostFSNodeGRPCServerStoppedSuccessfully = "gRPC server stopped successfully"

View file

@ -325,11 +325,12 @@ func (s *Server) registerStarter(f func() error) {
}
// New creates instance of inner ring sever structure.
func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error) (*Server, error) {
func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error,
metrics *metrics.InnerRingServiceMetrics) (*Server, error) {
var err error
server := &Server{
log: log,
irMetrics: metrics.NewInnerRingMetrics(),
irMetrics: metrics,
}
server.setHealthStatus(control.HealthStatus_HEALTH_STATUS_UNDEFINED)

View file

@ -4,6 +4,7 @@ import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -73,6 +74,7 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket,
zap.String("binary size", stringifyByteSize(dataSize)),
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
b.itemDeleted(sizeUpperBound)
}

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -69,6 +70,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromLevel,
zap.String("level", p),
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
}
}

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"go.opentelemetry.io/otel/attribute"
@ -56,7 +57,8 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
if !client.IsErrObjectNotFound(err) {
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
zap.String("level", p),
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -69,6 +70,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
zap.String("level", p),
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
}
}

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -71,7 +72,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
zap.String("level", p),
zap.String("error", err.Error()),
)
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
if outOfBounds {
return true, err

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -81,14 +82,16 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
} else {
i.B.log.Debug(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza,
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return false, nil
}
if active == nil {
i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return false, nil
}
defer active.Close()
@ -102,7 +105,8 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
} else {
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
zap.String("path", active.Path()),
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return false, nil

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -74,7 +75,8 @@ func (b *BlobStor) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exi
for _, err := range errors[:len(errors)-1] {
b.log.Warn(logs.BlobstorErrorOccurredDuringObjectExistenceChecking,
zap.Stringer("address", prm.Address),
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return common.ExistsRes{}, errors[len(errors)-1]

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -150,7 +151,8 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
if err != nil {
e.log.Warn(logs.EngineErrorDuringSearchingForObjectChildren,
zap.Stringer("addr", addr),
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return false
}
@ -161,7 +163,8 @@ func (e *StorageEngine) deleteChildren(ctx context.Context, addr oid.Address, fo
if err != nil {
e.log.Debug(logs.EngineCouldNotInhumeObjectInShard,
zap.Stringer("addr", addr),
zap.String("err", err.Error()))
zap.String("err", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
continue
}
}

View file

@ -11,6 +11,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -200,11 +201,13 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
e.evacuateLimiter.Complete(err)
}()
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField)
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField)
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
@ -266,7 +269,8 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField)
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
@ -342,7 +346,8 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
res.failed.Add(1)
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField)
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
@ -363,7 +368,8 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
err = prm.handler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField)
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
res.evacuated.Add(1)
@ -392,7 +398,8 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr),
evacuationOperationLogField)
evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return true, nil
}

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -105,7 +106,8 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
e.log.Warn(logs.ShardMetaInfoPresentButObjectNotFound,
zap.Stringer("shard_id", it.ShardWithMeta.ID()),
zap.String("error", it.MetaError.Error()),
zap.Stringer("address", prm.addr))
zap.Stringer("address", prm.addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -93,7 +94,8 @@ func (e *StorageEngine) inhume(ctx context.Context, prm InhumePrm) (InhumeRes, e
if err != nil {
e.log.Warn(logs.EngineRemovingAnObjectWithoutFullLockingCheck,
zap.Error(err),
zap.Stringer("addr", prm.addrs[i]))
zap.Stringer("addr", prm.addrs[i]),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else if locked {
return InhumeRes{}, new(apistatus.ObjectLocked)
}
@ -201,7 +203,8 @@ func (e *StorageEngine) IsLocked(ctx context.Context, addr oid.Address) (bool, e
e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) {
locked, err = h.Shard.IsLocked(ctx, addr)
if err != nil {
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr))
e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
outErr = err
return false
}

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -128,6 +129,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
e.log.Warn(logs.EngineCouldNotMarkObjectForShardRelocation,
zap.Stringer("shard", sh.ID()),
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
}
}
@ -144,7 +146,8 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
e.log.Warn(logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -118,7 +119,8 @@ func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error
e.log.Warn(logs.ShardMetaInfoPresentButObjectNotFound,
zap.Stringer("shard_id", it.ShardWithMeta.ID()),
zap.String("error", it.MetaError.Error()),
zap.Stringer("address", prm.addr))
zap.Stringer("address", prm.addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
@ -38,7 +39,8 @@ func (e *StorageEngine) TreeMove(ctx context.Context, d pilorama.CIDDescriptor,
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeMove`", err,
zap.Stringer("cid", d.CID),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return nil, err
@ -71,7 +73,8 @@ func (e *StorageEngine) TreeAddByPath(ctx context.Context, d pilorama.CIDDescrip
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeAddByPath`", err,
zap.Stringer("cid", d.CID),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return nil, err
}
@ -99,7 +102,8 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeApply`", err,
zap.Stringer("cid", cnr),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
@ -130,7 +134,8 @@ func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeGetByPath`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
@ -162,7 +167,8 @@ func (e *StorageEngine) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID s
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeGetMeta`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
@ -193,7 +199,8 @@ func (e *StorageEngine) TreeGetChildren(ctx context.Context, cid cidSDK.ID, tree
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeGetChildren`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
@ -224,7 +231,8 @@ func (e *StorageEngine) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't perform `TreeGetOpLog`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
@ -253,7 +261,8 @@ func (e *StorageEngine) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID stri
if !errors.Is(err, pilorama.ErrTreeNotFound) && !errors.Is(err, shard.ErrReadOnlyMode) {
e.reportShardError(sh, "can't perform `TreeDrop`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}
@ -281,7 +290,8 @@ func (e *StorageEngine) TreeList(ctx context.Context, cid cidSDK.ID) ([]string,
}
e.reportShardError(sh, "can't perform `TreeList`", err,
zap.Stringer("cid", cid))
zap.Stringer("cid", cid),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
// returns as much info about
// trees as possible
@ -347,7 +357,8 @@ func (e *StorageEngine) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK
if err != nil && !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't update tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
@ -373,7 +384,8 @@ func (e *StorageEngine) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, t
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't read tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
continue
}

View file

@ -3,8 +3,10 @@ package meta
import (
"context"
"errors"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
@ -93,7 +95,22 @@ func (p *UpdateStorageIDPrm) SetStorageID(id []byte) {
}
// UpdateStorageID updates storage descriptor for objects from the blobstor.
func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) {
func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("UpdateStorageID", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.UpdateStorageID",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
attribute.String("storage_id", string(prm.id)),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
@ -107,12 +124,14 @@ func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, e
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
exists, err := db.exists(tx, prm.addr, currEpoch)
if err == nil && exists || errors.Is(err, ErrObjectIsExpired) {
if err == nil && exists {
err = updateStorageID(tx, prm.addr, prm.id)
} else if errors.As(err, new(logicerr.Logical)) {
err = updateStorageID(tx, prm.addr, prm.id)
}
return err
})
success = err == nil
return res, metaerr.Wrap(err)
}

View file

@ -18,6 +18,7 @@ func TestDB_StorageID(t *testing.T) {
raw1 := testutil.GenerateObject()
raw2 := testutil.GenerateObject()
deleted := testutil.GenerateObject()
storageID := []byte{1, 2, 3, 4}
@ -34,6 +35,15 @@ func TestDB_StorageID(t *testing.T) {
err = putBig(db, raw2)
require.NoError(t, err)
// put object with storageID and delete it
err = metaPut(db, deleted, storageID)
require.NoError(t, err)
cnrID, ok := deleted.ContainerID()
require.True(t, ok)
ts := testutil.GenerateObjectWithCID(cnrID)
require.NoError(t, metaInhume(db, object.AddressOf(deleted), object.AddressOf(ts)))
// check StorageID for object without storageID
fetchedStorageID, err = metaStorageID(db, object.AddressOf(raw2))
require.NoError(t, err)
@ -44,12 +54,23 @@ func TestDB_StorageID(t *testing.T) {
require.NoError(t, err)
require.Equal(t, storageID, fetchedStorageID)
// check StorageID for deleted object with storageID
fetchedStorageID, err = metaStorageID(db, object.AddressOf(deleted))
require.NoError(t, err)
require.Equal(t, storageID, fetchedStorageID)
t.Run("update", func(t *testing.T) {
storageID := []byte{1, 2, 3, 4, 5}
require.NoError(t, metaUpdateStorageID(db, object.AddressOf(raw2), storageID))
require.NoError(t, metaUpdateStorageID(db, object.AddressOf(deleted), storageID))
fetchedStorageID, err = metaStorageID(db, object.AddressOf(raw2))
require.NoError(t, err)
require.Equal(t, storageID, fetchedStorageID)
fetchedStorageID, err = metaStorageID(db, object.AddressOf(deleted))
require.NoError(t, err)
require.Equal(t, storageID, fetchedStorageID)
})
}
@ -58,7 +79,7 @@ func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
sidPrm.SetAddress(addr)
sidPrm.SetStorageID(id)
_, err := db.UpdateStorageID(sidPrm)
_, err := db.UpdateStorageID(context.Background(), sidPrm)
return err
}

View file

@ -2,14 +2,12 @@ package shard
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -33,8 +31,7 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
p.addr = append(p.addr, addr...)
}
// Delete removes data from the shard's writeCache, metaBase and
// blobStor.
// Delete removes data from the shard's metaBase and// blobStor.
func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Delete",
trace.WithAttributes(
@ -46,10 +43,10 @@ func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
s.m.RLock()
defer s.m.RUnlock()
return s.delete(ctx, prm)
return s.delete(ctx, prm, false)
}
func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (DeleteRes, error) {
if s.info.Mode.ReadOnly() {
return DeleteRes{}, ErrReadOnlyMode
} else if s.info.Mode.NoMetabase() {
@ -64,12 +61,18 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
default:
}
s.deleteObjectFromWriteCacheSafe(ctx, addr)
s.deleteFromBlobstorSafe(ctx, addr)
if err := s.deleteFromBlobstor(ctx, addr); err != nil {
if skipFailed {
continue
}
return result, err
}
if err := s.deleteFromMetabase(ctx, addr); err != nil {
return result, err // stop on metabase error ?
if skipFailed {
continue
}
return result, err
}
result.deleted++
}
@ -77,16 +80,7 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return result, nil
}
func (s *Shard) deleteObjectFromWriteCacheSafe(ctx context.Context, addr oid.Address) {
if s.hasWriteCache() {
err := s.writeCache.Delete(ctx, addr)
if err != nil && !client.IsErrObjectNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
s.log.Warn(logs.ShardCantDeleteObjectFromWriteCache, zap.Error(err))
}
}
}
func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
var sPrm meta.StorageIDPrm
sPrm.SetAddress(addr)
@ -94,7 +88,9 @@ func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
if err != nil {
s.log.Debug(logs.StorageIDRetrievalFailure,
zap.Stringer("object", addr),
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
storageID := res.StorageID()
@ -106,8 +102,10 @@ func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
if err != nil {
s.log.Debug(logs.ObjectRemovalFailureBlobStor,
zap.Stringer("object_address", addr),
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {

View file

@ -52,13 +52,18 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
_, err = sh.Delete(context.TODO(), delPrm)
require.NoError(t, err)
if hasWriteCache {
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 30*time.Second, 100*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return client.IsErrObjectNotFound(err)
}, time.Second, 50*time.Millisecond)
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
})
t.Run("small object", func(t *testing.T) {
@ -78,12 +83,17 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err)
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
if hasWriteCache {
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 10*time.Second, 100*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return client.IsErrObjectNotFound(err)
}, time.Second, 50*time.Millisecond)
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
})
}

View file

@ -297,7 +297,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
deletePrm.SetAddresses(buf...)
// delete accumulated objects
res, err := s.delete(ctx, deletePrm)
res, err := s.delete(ctx, deletePrm, true)
result.deleted = res.deleted
result.failedToDelete = uint64(len(buf)) - res.deleted

View file

@ -10,6 +10,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -144,12 +145,14 @@ func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta
if client.IsErrObjectNotFound(err) {
s.log.Debug(logs.ShardObjectIsMissingInWritecache,
zap.Stringer("addr", addr),
zap.Bool("skip_meta", skipMeta))
zap.Bool("skip_meta", skipMeta),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
s.log.Error(logs.ShardFailedToFetchObjectFromWritecache,
zap.Error(err),
zap.Stringer("addr", addr),
zap.Bool("skip_meta", skipMeta))
zap.Bool("skip_meta", skipMeta),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
}
if skipMeta || mErr != nil {

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -110,6 +111,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
s.log.Debug(logs.ShardCouldNotMarkObjectToDeleteInMetabase,
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
s.m.RUnlock()

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -99,7 +100,8 @@ func (s *Shard) List(ctx context.Context) (res SelectRes, err error) {
if err != nil {
s.log.Debug(logs.ShardCantSelectAllObjects,
zap.Stringer("cid", lst[i]),
zap.String("error", err.Error()))
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
continue
}

View file

@ -5,6 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -53,6 +54,7 @@ func (s *Shard) ToMoveIt(ctx context.Context, prm ToMoveItPrm) (ToMoveItRes, err
if err != nil {
s.log.Debug(logs.ShardCouldNotMarkObjectForShardRelocationInMetabase,
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
}

View file

@ -91,7 +91,7 @@ func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
type testMetabase struct{}
func (testMetabase) UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
func (testMetabase) UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error) {
return meta.UpdateStorageIDRes{}, nil
}

View file

@ -51,7 +51,7 @@ type MainStorage interface {
// Metabase is the interface of the metabase used by Cache implementations.
type Metabase interface {
UpdateStorageID(meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
UpdateStorageID(context.Context, meta.UpdateStorageIDPrm) (meta.UpdateStorageIDRes, error)
}
var (

View file

@ -21,7 +21,7 @@ type cache struct {
modeMtx sync.RWMutex
// flushCh is a channel with objects to flush.
flushCh chan *objectSDK.Object
flushCh chan objectInfo
// scheduled4Flush contains objects scheduled for flush via flushCh
// helps to avoid multiple flushing of one object
scheduled4Flush map[oid.Address]struct{}
@ -52,7 +52,7 @@ const (
// New creates new writecache instance.
func New(opts ...Option) writecache.Cache {
c := &cache{
flushCh: make(chan *objectSDK.Object),
flushCh: make(chan objectInfo),
mode: mode.ReadWrite,
scheduled4Flush: map[oid.Address]struct{}{},

View file

@ -85,7 +85,11 @@ func (c *collector) Send(buf *z.Buffer) error {
c.cache.scheduled4FlushMtx.Unlock()
c.scheduled++
select {
case c.cache.flushCh <- obj:
case c.cache.flushCh <- objectInfo{
addr: addr,
data: val,
obj: obj,
}:
case <-c.cache.closeCh:
c.cancel()
return nil
@ -175,22 +179,21 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
func (c *cache) workerFlushSmall() {
defer c.wg.Done()
var obj *objectSDK.Object
var objInfo objectInfo
for {
// Give priority to direct put.
select {
case obj = <-c.flushCh:
case objInfo = <-c.flushCh:
case <-c.closeCh:
return
}
addr := objectCore.AddressOf(obj)
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err == nil {
c.deleteFromDB([]internalKey{addr2key(addr)})
c.deleteFromDB([]internalKey{addr2key(objInfo.addr)})
}
c.scheduled4FlushMtx.Lock()
delete(c.scheduled4Flush, addr)
delete(c.scheduled4Flush, objInfo.addr)
c.scheduled4FlushMtx.Unlock()
}
}
@ -223,7 +226,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
updPrm.SetAddress(addr)
updPrm.SetStorageID(res.StorageID)
_, err = c.metabase.UpdateStorageID(updPrm)
_, err = c.metabase.UpdateStorageID(ctx, updPrm)
if err != nil {
c.reportFlushError(logs.FrostFSNodeCantUpdateObjectStorageID,
addr.EncodeToString(), err)

View file

@ -29,7 +29,7 @@ type cache struct {
compressFlags map[string]struct{}
// flushCh is a channel with objects to flush.
flushCh chan *objectSDK.Object
flushCh chan objectInfo
// closeCh is close channel, protected by modeMtx.
closeCh chan struct{}
// wg is a wait group for flush workers.
@ -62,7 +62,7 @@ var (
// New creates new writecache instance.
func New(opts ...Option) writecache.Cache {
c := &cache{
flushCh: make(chan *objectSDK.Object),
flushCh: make(chan objectInfo),
mode: mode.ReadWrite,
compressFlags: make(map[string]struct{}),

View file

@ -79,7 +79,6 @@ func (c *cache) runFlushLoop() {
func (c *cache) flushSmallObjects() {
var lastKey []byte
var m []objectInfo
for {
select {
case <-c.closeCh:
@ -87,7 +86,7 @@ func (c *cache) flushSmallObjects() {
default:
}
m = m[:0]
var m []objectInfo
c.modeMtx.RLock()
if c.readOnly() {
@ -133,10 +132,11 @@ func (c *cache) flushSmallObjects() {
if err := obj.Unmarshal(m[i].data); err != nil {
continue
}
m[i].obj = obj
count++
select {
case c.flushCh <- obj:
case c.flushCh <- m[i]:
case <-c.closeCh:
c.modeMtx.RUnlock()
return
@ -231,22 +231,22 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
func (c *cache) workerFlushSmall() {
defer c.wg.Done()
var obj *objectSDK.Object
var objInfo objectInfo
for {
// Give priority to direct put.
select {
case obj = <-c.flushCh:
case objInfo = <-c.flushCh:
case <-c.closeCh:
return
}
err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
err := c.flushObject(context.TODO(), objInfo.obj, objInfo.data, writecache.StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
}
c.deleteFromDB(objectCore.AddressOf(obj).EncodeToString())
c.deleteFromDB(objInfo.addr)
}
}
@ -278,7 +278,7 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
updPrm.SetAddress(addr)
updPrm.SetStorageID(res.StorageID)
_, err = c.metabase.UpdateStorageID(updPrm)
_, err = c.metabase.UpdateStorageID(ctx, updPrm)
if err != nil {
c.reportFlushError(logs.FSTreeCantUpdateID,
addr.EncodeToString(), err)

View file

@ -20,6 +20,7 @@ const (
treeServiceSubsystem = "treeservice"
writeCacheSubsystem = "writecache"
grpcServerSubsystem = "grpc_server"
policerSubsystem = "policer"
successLabel = "success"
shardIDLabel = "shard_id"

View file

@ -4,6 +4,7 @@ import (
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
@ -14,6 +15,7 @@ type InnerRingServiceMetrics struct {
health prometheus.Gauge
eventDuration *prometheus.HistogramVec
morphCacheMetrics *morphCacheMetrics
logMetrics logger.LogMetrics
}
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring.
@ -44,6 +46,7 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
health: health,
eventDuration: eventDuration,
morphCacheMetrics: newMorphCacheMetrics(innerRingNamespace),
logMetrics: logger.NewLogMetrics(innerRingNamespace),
}
}
@ -67,3 +70,7 @@ func (m *InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success
func (m *InnerRingServiceMetrics) MorphCacheMetrics() MorphCacheMetrics {
return m.morphCacheMetrics
}
func (m *InnerRingServiceMetrics) LogMetrics() logger.LogMetrics {
return m.logMetrics
}

View file

@ -4,7 +4,6 @@ import (
"strconv"
"time"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
@ -16,7 +15,7 @@ type morphClientMetrics struct {
invokeDuration *prometheus.HistogramVec
}
func NewMorphClientMetrics() morphmetrics.Register {
func newMorphClientMetrics() *morphClientMetrics {
return &morphClientMetrics{
switchCount: metrics.NewCounter(prometheus.CounterOpts{
Namespace: namespace,

View file

@ -18,10 +18,6 @@ type morphCacheMetrics struct {
var _ MorphCacheMetrics = (*morphCacheMetrics)(nil)
func NewNodeMorphCacheMetrics() MorphCacheMetrics {
return newMorphCacheMetrics(namespace)
}
func newMorphCacheMetrics(ns string) *morphCacheMetrics {
return &morphCacheMetrics{
methodDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{

View file

@ -1,6 +1,8 @@
package metrics
import (
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
@ -18,6 +20,10 @@ type NodeMetrics struct {
metabase *metabaseMetrics
pilorama *piloramaMetrics
grpc *grpcServerMetrics
policer *policerMetrics
morphClient *morphClientMetrics
morphCache *morphCacheMetrics
log logger.LogMetrics
}
func NewNodeMetrics() *NodeMetrics {
@ -39,6 +45,10 @@ func NewNodeMetrics() *NodeMetrics {
metabase: newMetabaseMetrics(),
pilorama: newPiloramaMetrics(),
grpc: newGrpcServerMetrics(),
policer: newPolicerMetrics(),
morphClient: newMorphClientMetrics(),
morphCache: newMorphCacheMetrics(namespace),
log: logger.NewLogMetrics(namespace),
}
}
@ -90,3 +100,19 @@ func (m *NodeMetrics) PiloramaMetrics() PiloramaMetrics {
func (m *NodeMetrics) GrpcServerMetrics() GrpcServerMetrics {
return m.grpc
}
func (m *NodeMetrics) PolicerMetrics() PolicerMetrics {
return m.policer
}
func (m *NodeMetrics) MorphClientMetrics() morphmetrics.Register {
return m.morphClient
}
func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics {
return m.morphCache
}
func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
return m.log
}

29
pkg/metrics/policer.go Normal file
View file

@ -0,0 +1,29 @@
package metrics
import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type PolicerMetrics interface {
IncProcessedObjects()
}
type policerMetrics struct {
processedObjectsCounter prometheus.Counter
}
func newPolicerMetrics() *policerMetrics {
return &policerMetrics{
processedObjectsCounter: metrics.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: policerSubsystem,
Name: "processed_objects_total",
Help: "Total number of objects processed by policer",
}),
}
}
func (m *policerMetrics) IncProcessedObjects() {
m.processedObjectsCounter.Inc()
}

View file

@ -1,307 +0,0 @@
package loadcontroller
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"go.uber.org/zap"
)
// StartPrm groups the required parameters of the Controller.Start method.
type StartPrm struct {
// Epoch number by which you want to select
// the values of the used space of containers.
Epoch uint64
}
type commonContext struct {
epoch uint64
ctrl *Controller
log *logger.Logger
}
type announcer struct {
commonContext
}
// Start starts the processing of container.SizeEstimation values.
//
// Single Start operation overtakes all data from LocalMetrics to
// LocalAnnouncementTarget (Controller's parameters).
// No filter by epoch is used for the iterator, since it is expected
// that the source of metrics does not track the change of epochs.
//
// Each call acquires an announcement context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Start(ctx context.Context, prm StartPrm) {
var announcer *announcer
// acquire announcement
ctx, announcer = c.acquireAnnouncement(ctx, prm)
if announcer == nil {
return
}
// finally stop and free the announcement
defer announcer.freeAnnouncement()
// announce local values
announcer.announce(ctx)
}
func (c *announcer) announce(ctx context.Context) {
c.log.Debug(logs.ControllerStartingToAnnounceTheValuesOfTheMetrics)
var (
metricsIterator Iterator
err error
)
// initialize iterator over locally collected metrics
metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator()
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyCollectedMetrics,
zap.String("error", err.Error()),
)
return
}
// initialize target of local announcements
targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeAnnouncementAccumulator,
zap.String("error", err.Error()),
)
return
}
// iterate over all collected metrics and write them to the target
err = metricsIterator.Iterate(
func(container.SizeEstimation) bool {
return true // local metrics don't know about epochs
},
func(a container.SizeEstimation) error {
a.SetEpoch(c.epoch) // set epoch explicitly
return targetWriter.Put(a)
},
)
if err != nil {
c.log.Debug(logs.ControllerIteratorOverLocallyCollectedMetricsAborted,
zap.String("error", err.Error()),
)
return
}
// finish writing
err = targetWriter.Close(ctx)
if err != nil {
c.log.Debug(logs.ControllerCouldNotFinishWritingLocalAnnouncements,
zap.String("error", err.Error()),
)
return
}
c.log.Debug(logs.ControllerTrustAnnouncementSuccessfullyFinished)
}
func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) {
started := true
c.announceMtx.Lock()
{
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(ctx)
c.mAnnounceCtx[prm.Epoch] = cancel
started = false
}
}
c.announceMtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)}
if started {
log.Debug(logs.ControllerAnnouncementIsAlreadyStarted)
return ctx, nil
}
return ctx, &announcer{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
},
}
}
func (c *commonContext) freeAnnouncement() {
var stopped bool
c.ctrl.announceMtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch]
if stopped {
cancel()
delete(c.ctrl.mAnnounceCtx, c.epoch)
}
}
c.ctrl.announceMtx.Unlock()
if stopped {
c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted)
} else {
c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted)
}
}
// StopPrm groups the required parameters of the Controller.Stop method.
type StopPrm struct {
// Epoch number the analysis of the values of which must be interrupted.
Epoch uint64
}
type reporter struct {
commonContext
}
// Stop interrupts the processing of container.SizeEstimation values.
//
// Single Stop operation releases an announcement context and overtakes
// all data from AnnouncementAccumulator to ResultReceiver (Controller's
// parameters). Only values for the specified Epoch parameter are processed.
//
// Each call acquires a report context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Stop(ctx context.Context, prm StopPrm) {
var reporter *reporter
ctx, reporter = c.acquireReport(ctx, prm)
if reporter == nil {
return
}
// finally stop and free reporting
defer reporter.freeReport()
// interrupt announcement
reporter.freeAnnouncement()
// report the estimations
reporter.report(ctx)
}
func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) {
started := true
c.reportMtx.Lock()
{
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(ctx)
c.mReportCtx[prm.Epoch] = cancel
started = false
}
}
c.reportMtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)}
if started {
log.Debug(logs.ControllerReportIsAlreadyStarted)
return ctx, nil
}
return ctx, &reporter{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
},
}
}
func (c *commonContext) freeReport() {
var stopped bool
c.ctrl.reportMtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.ctrl.mReportCtx[c.epoch]
if stopped {
cancel()
delete(c.ctrl.mReportCtx, c.epoch)
}
}
c.ctrl.reportMtx.Unlock()
if stopped {
c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted)
} else {
c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted)
}
}
func (c *reporter) report(ctx context.Context) {
var (
localIterator Iterator
err error
)
// initialize iterator over locally accumulated announcements
localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator()
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyAccumulatedAnnouncements,
zap.String("error", err.Error()),
)
return
}
// initialize final destination of load estimations
resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeResultTarget,
zap.String("error", err.Error()),
)
return
}
// iterate over all accumulated announcements and write them to the target
err = localIterator.Iterate(
usedSpaceFilterEpochEQ(c.epoch),
resultWriter.Put,
)
if err != nil {
c.log.Debug(logs.ControllerIteratorOverLocalAnnouncementsAborted,
zap.String("error", err.Error()),
)
return
}
// finish writing
err = resultWriter.Close(ctx)
if err != nil {
c.log.Debug(logs.ControllerCouldNotFinishWritingLoadEstimations,
zap.String("error", err.Error()),
)
}
}

View file

@ -1,192 +0,0 @@
package loadcontroller_test
import (
"context"
"math/rand"
"sync"
"testing"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
type testAnnouncementStorage struct {
w loadcontroller.Writer
i loadcontroller.Iterator
mtx sync.RWMutex
m map[uint64][]container.SizeEstimation
}
func newTestStorage() *testAnnouncementStorage {
return &testAnnouncementStorage{
m: make(map[uint64][]container.SizeEstimation),
}
}
func (s *testAnnouncementStorage) InitIterator() (loadcontroller.Iterator, error) {
if s.i != nil {
return s.i, nil
}
return s, nil
}
func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, v := range s.m {
for _, a := range v {
if f(a) {
if err := h(a); err != nil {
return err
}
}
}
}
return nil
}
func (s *testAnnouncementStorage) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
if s.w != nil {
return s.w, nil
}
return s, nil
}
func (s *testAnnouncementStorage) Put(v container.SizeEstimation) error {
s.mtx.Lock()
s.m[v.Epoch()] = append(s.m[v.Epoch()], v)
s.mtx.Unlock()
return nil
}
func (s *testAnnouncementStorage) Close(context.Context) error {
return nil
}
func randAnnouncement() (a container.SizeEstimation) {
a.SetContainer(cidtest.ID())
a.SetValue(rand.Uint64())
return
}
func TestSimpleScenario(t *testing.T) {
// create storage to write final estimations
resultStorage := newTestStorage()
// create storages to accumulate announcements
accumulatingStorageN2 := newTestStorage()
// create storage of local metrics
localStorageN1 := newTestStorage()
localStorageN2 := newTestStorage()
// create 2 controllers: 1st writes announcements to 2nd, 2nd directly to final destination
ctrlN1 := loadcontroller.New(loadcontroller.Prm{
LocalMetrics: localStorageN1,
AnnouncementAccumulator: newTestStorage(),
LocalAnnouncementTarget: &testAnnouncementStorage{
w: accumulatingStorageN2,
},
ResultReceiver: resultStorage,
})
ctrlN2 := loadcontroller.New(loadcontroller.Prm{
LocalMetrics: localStorageN2,
AnnouncementAccumulator: accumulatingStorageN2,
LocalAnnouncementTarget: &testAnnouncementStorage{
w: resultStorage,
},
ResultReceiver: resultStorage,
})
const processEpoch uint64 = 10
const goodNum = 4
// create 2 random values for processing epoch and 1 for some different
announces := make([]container.SizeEstimation, 0, goodNum)
for i := 0; i < goodNum; i++ {
a := randAnnouncement()
a.SetEpoch(processEpoch)
announces = append(announces, a)
}
// store one half of "good" announcements to 1st metrics storage, another - to 2nd
// and "bad" to both
for i := 0; i < goodNum/2; i++ {
require.NoError(t, localStorageN1.Put(announces[i]))
}
for i := goodNum / 2; i < goodNum; i++ {
require.NoError(t, localStorageN2.Put(announces[i]))
}
wg := new(sync.WaitGroup)
wg.Add(2)
startPrm := loadcontroller.StartPrm{
Epoch: processEpoch,
}
// start both controllers
go func() {
ctrlN1.Start(context.Background(), startPrm)
wg.Done()
}()
go func() {
ctrlN2.Start(context.Background(), startPrm)
wg.Done()
}()
wg.Wait()
wg.Add(2)
stopPrm := loadcontroller.StopPrm{
Epoch: processEpoch,
}
// stop both controllers
go func() {
ctrlN1.Stop(context.Background(), stopPrm)
wg.Done()
}()
go func() {
ctrlN2.Stop(context.Background(), stopPrm)
wg.Done()
}()
wg.Wait()
// result target should contain all "good" announcements and shoult not container the "bad" one
var res []container.SizeEstimation
err := resultStorage.Iterate(
func(a container.SizeEstimation) bool {
return true
},
func(a container.SizeEstimation) error {
res = append(res, a)
return nil
},
)
require.NoError(t, err)
for i := range announces {
require.Contains(t, res, announces[i])
}
}

View file

@ -1,94 +0,0 @@
package loadcontroller
import (
"context"
"fmt"
"sync"
)
// Prm groups the required parameters of the Controller's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Iterator over the used space values of the containers
// collected by the node locally.
LocalMetrics IteratorProvider
// Place of recording the local values of
// the used space of containers.
LocalAnnouncementTarget WriterProvider
// Iterator over the summarized used space scores
// from the various network participants.
AnnouncementAccumulator IteratorProvider
// Place of recording the final estimates of
// the used space of containers.
ResultReceiver WriterProvider
}
// Controller represents main handler for starting
// and interrupting container volume estimation.
//
// It binds the interfaces of the local value stores
// to the target storage points. Controller is abstracted
// from the internal storage device and the network location
// of the connecting components. At its core, it is a
// high-level start-stop trigger for calculations.
//
// For correct operation, the controller must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the constructor is immediately ready to work through
// API of external control of calculations and data transfer.
type Controller struct {
prm Prm
opts *options
announceMtx sync.Mutex
mAnnounceCtx map[uint64]context.CancelFunc
reportMtx sync.Mutex
mReportCtx map[uint64]context.CancelFunc
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Controller.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Controller does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Controller {
switch {
case prm.LocalMetrics == nil:
panicOnPrmValue("LocalMetrics", prm.LocalMetrics)
case prm.AnnouncementAccumulator == nil:
panicOnPrmValue("AnnouncementAccumulator", prm.AnnouncementAccumulator)
case prm.LocalAnnouncementTarget == nil:
panicOnPrmValue("LocalAnnouncementTarget", prm.LocalAnnouncementTarget)
case prm.ResultReceiver == nil:
panicOnPrmValue("ResultReceiver", prm.ResultReceiver)
}
o := defaultOpts()
for _, opt := range opts {
opt(o)
}
return &Controller{
prm: prm,
opts: o,
mAnnounceCtx: make(map[uint64]context.CancelFunc),
mReportCtx: make(map[uint64]context.CancelFunc),
}
}

View file

@ -1,103 +0,0 @@
package loadcontroller
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// UsedSpaceHandler describes the signature of the container.SizeEstimation
// value handling function.
//
// Termination of processing without failures is usually signaled
// with a zero error, while a specific value may describe the reason
// for failure.
type UsedSpaceHandler func(container.SizeEstimation) error
// UsedSpaceFilter describes the signature of the function for
// checking whether a value meets a certain criterion.
//
// Return of true means conformity, false - vice versa.
type UsedSpaceFilter func(container.SizeEstimation) bool
// Iterator is a group of methods provided by entity
// which can iterate over a group of container.SizeEstimation values.
type Iterator interface {
// Iterate must start an iterator over values that
// meet the filter criterion (returns true).
// For each such value should call a handler, the error
// of which should be directly returned from the method.
//
// Internal failures of the iterator are also signaled via
// an error. After a successful call to the last value
// handler, nil should be returned.
Iterate(UsedSpaceFilter, UsedSpaceHandler) error
}
// IteratorProvider is a group of methods provided
// by entity which generates iterators over
// container.SizeEstimation values.
type IteratorProvider interface {
// InitIterator should return an initialized Iterator.
//
// Initialization problems are reported via error.
// If no error was returned, then the Iterator must not be nil.
//
// Implementations can have different logic for different
// contexts, so specific ones may document their own behavior.
InitIterator() (Iterator, error)
}
// Writer describes the interface for storing container.SizeEstimation values.
//
// This interface is provided by both local storage
// of values and remote (wrappers over the RPC).
type Writer interface {
// Put performs a write operation of container.SizeEstimation value
// and returns any error encountered.
//
// All values after the Close call must be flushed to the
// physical target. Implementations can cache values before
// Close operation.
//
// Put must not be called after Close.
Put(container.SizeEstimation) error
// Close exits with method-providing Writer.
//
// All cached values must be flushed before
// the Close's return.
//
// Methods must not be called after Close.
Close(ctx context.Context) error
}
// WriterProvider is a group of methods provided
// by entity which generates keepers of
// container.SizeEstimation values.
type WriterProvider interface {
// InitWriter should return an initialized Writer.
//
// Initialization problems are reported via error.
// If no error was returned, then the Writer must not be nil.
InitWriter(route []ServerInfo) (Writer, error)
}
// ServerInfo describes a set of
// characteristics of a point in a route.
type ServerInfo interface {
// PublicKey returns public key of the node
// from the route in a binary representation.
PublicKey() []byte
// Iterates over network addresses of the node
// in the route. Breaks iterating on true return
// of the handler.
IterateAddresses(func(string) bool)
// Returns number of server's network addresses.
NumberOfAddresses() int
// ExternalAddresses returns external node's addresses.
ExternalAddresses() []string
}

View file

@ -1,28 +0,0 @@
package loadcontroller
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Controller.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns option to specify logging component.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,36 +0,0 @@
package loadcontroller
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter {
return func(a container.SizeEstimation) bool {
return a.Epoch() == epoch
}
}
type storageWrapper struct {
w Writer
i Iterator
}
func (s storageWrapper) InitIterator() (Iterator, error) {
return s.i, nil
}
func (s storageWrapper) InitWriter([]ServerInfo) (Writer, error) {
return s.w, nil
}
func SimpleIteratorProvider(i Iterator) IteratorProvider {
return &storageWrapper{
i: i,
}
}
func SimpleWriterProvider(w Writer) WriterProvider {
return &storageWrapper{
w: w,
}
}

View file

@ -1,145 +0,0 @@
package loadroute
import (
"context"
"encoding/hex"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"go.uber.org/zap"
)
// InitWriter initializes and returns Writer that sends each value to its next route point.
//
// If route is present, then it is taken into account,
// and the value will be sent to its continuation. Otherwise, the route will be laid
// from scratch and the value will be sent to its primary point.
//
// After building a list of remote points of the next leg of the route, the value is sent
// sequentially to all of them. If any transmissions (even all) fail, an error will not
// be returned.
//
// Close of the composed Writer calls Close method on each internal Writer generated in
// runtime and never returns an error.
//
// Always returns nil error.
func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
if len(route) == 0 {
route = []loadcontroller.ServerInfo{r.localSrvInfo}
}
return &loadWriter{
router: r,
route: route,
mRoute: make(map[routeKey]*valuesRoute),
mServers: make(map[string]loadcontroller.Writer),
}, nil
}
type routeKey struct {
epoch uint64
cid string
}
type valuesRoute struct {
route []loadcontroller.ServerInfo
values []container.SizeEstimation
}
type loadWriter struct {
router *Router
route []loadcontroller.ServerInfo
routeMtx sync.RWMutex
mRoute map[routeKey]*valuesRoute
mServers map[string]loadcontroller.Writer
}
func (w *loadWriter) Put(a container.SizeEstimation) error {
w.routeMtx.Lock()
defer w.routeMtx.Unlock()
key := routeKey{
epoch: a.Epoch(),
cid: a.Container().EncodeToString(),
}
routeValues, ok := w.mRoute[key]
if !ok {
route, err := w.router.routeBuilder.NextStage(a, w.route)
if err != nil {
return err
} else if len(route) == 0 {
route = []loadcontroller.ServerInfo{nil}
}
routeValues = &valuesRoute{
route: route,
values: []container.SizeEstimation{a},
}
w.mRoute[key] = routeValues
}
for _, remoteInfo := range routeValues.route {
var key string
if remoteInfo != nil {
key = hex.EncodeToString(remoteInfo.PublicKey())
}
remoteWriter, ok := w.mServers[key]
if !ok {
provider, err := w.router.remoteProvider.InitRemote(remoteInfo)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotInitializeWriterProvider,
zap.String("error", err.Error()),
)
continue // best effort
}
remoteWriter, err = provider.InitWriter(w.route)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotInitializeWriter,
zap.String("error", err.Error()),
)
continue // best effort
}
w.mServers[key] = remoteWriter
}
err := remoteWriter.Put(a)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotPutTheValue,
zap.String("error", err.Error()),
)
}
// continue best effort
}
return nil
}
func (w *loadWriter) Close(ctx context.Context) error {
for key, wRemote := range w.mServers {
err := wRemote.Close(ctx)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotCloseRemoteServerWriter,
zap.String("key", key),
zap.String("error", err.Error()),
)
}
}
return nil
}

View file

@ -1,31 +0,0 @@
package loadroute
import (
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// Builder groups methods to route values in the network.
type Builder interface {
// NextStage must return next group of route points for the value a
// based on the passed route.
//
// Empty passed list means being at the starting point of the route.
//
// Must return empty list and no error if the endpoint of the route is reached.
// If there are more than one point to go and the last passed point is included
// in that list (means that point is the last point in one of the route groups),
// returned route must contain nil point that should be interpreted as signal to,
// among sending to other route points, save the announcement in that point.
NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error)
}
// RemoteWriterProvider describes the component
// for sending values to a fixed route point.
type RemoteWriterProvider interface {
// InitRemote must return WriterProvider to the route point
// corresponding to info.
//
// Nil info matches the end of the route.
InitRemote(info loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error)
}

View file

@ -1,28 +0,0 @@
package loadroute
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Router.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns Option to specify logging component.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,49 +0,0 @@
package placementrouter
import "fmt"
// Prm groups the required parameters of the Builder's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Calculator of the container members.
//
// Must not be nil.
PlacementBuilder PlacementBuilder
}
// Builder represents component that routes used container space
// values between nodes from the container.
//
// For correct operation, Builder must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Builder is immediately ready to work through API.
type Builder struct {
placementBuilder PlacementBuilder
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Builder.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Builder does not require additional
// initialization and is completely ready for work.
func New(prm Prm) *Builder {
switch {
case prm.PlacementBuilder == nil:
panicOnPrmValue("PlacementBuilder", prm.PlacementBuilder)
}
return &Builder{
placementBuilder: prm.PlacementBuilder,
}
}

View file

@ -1,47 +0,0 @@
package placementrouter
import (
"bytes"
"fmt"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// NextStage composes container nodes for the container and epoch from a,
// and returns the list of nodes with maximum weight (one from each vector).
//
// If passed route has more than one point, then endpoint of the route is reached.
//
// The traversed route is not checked, it is assumed to be correct.
func (b *Builder) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) {
if len(passed) > 1 {
return nil, nil
}
cnr := a.Container()
placement, err := b.placementBuilder.BuildPlacement(a.Epoch(), cnr)
if err != nil {
return nil, fmt.Errorf("could not build placement %s: %w", cnr, err)
}
res := make([]loadcontroller.ServerInfo, 0, len(placement))
for i := range placement {
if len(placement[i]) == 0 {
continue
}
if len(passed) == 1 && bytes.Equal(passed[0].PublicKey(), placement[i][0].PublicKey()) {
// add nil element so the announcement will be saved in local memory
res = append(res, nil)
} else {
// add element with remote node to send announcement to
res = append(res, netmapcore.Node(placement[i][0]))
}
}
return res, nil
}

View file

@ -1,14 +0,0 @@
package placementrouter
import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
// PlacementBuilder describes interface of FrostFS placement calculator.
type PlacementBuilder interface {
// BuildPlacement must compose and sort (according to a specific algorithm)
// storage nodes from the container by its identifier using network map
// of particular epoch.
BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error)
}

View file

@ -1,87 +0,0 @@
package loadroute
import (
"fmt"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
// Prm groups the required parameters of the Router's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Characteristics of the local node's server.
//
// Must not be nil.
LocalServerInfo loadcontroller.ServerInfo
// Component for sending values to a fixed route point.
//
// Must not be nil.
RemoteWriterProvider RemoteWriterProvider
// Route planner.
//
// Must not be nil.
Builder Builder
}
// Router represents component responsible for routing
// used container space values over the network.
//
// For each fixed pair (container ID, epoch) there is a
// single value route on the network. Router provides the
// interface for writing values to the next point of the route.
//
// For correct operation, Router must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Router is immediately ready to work through API.
type Router struct {
log *logger.Logger
remoteProvider RemoteWriterProvider
routeBuilder Builder
localSrvInfo loadcontroller.ServerInfo
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Router.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Router does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Router {
switch {
case prm.RemoteWriterProvider == nil:
panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider)
case prm.Builder == nil:
panicOnPrmValue("Builder", prm.Builder)
case prm.LocalServerInfo == nil:
panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo)
}
o := defaultOpts()
for i := range opts {
opts[i](o)
}
return &Router{
log: o.log,
remoteProvider: prm.RemoteWriterProvider,
routeBuilder: prm.Builder,
localSrvInfo: prm.LocalServerInfo,
}
}

View file

@ -1,49 +0,0 @@
package loadroute
import (
"bytes"
"errors"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
var errWrongRoute = errors.New("wrong route")
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
//
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
func CheckRoute(builder Builder, a container.SizeEstimation, route []loadcontroller.ServerInfo) error {
for i := 1; i < len(route); i++ {
servers, err := builder.NextStage(a, route[:i])
if err != nil {
return err
} else if len(servers) == 0 {
break
}
found := false
for j := range servers {
if servers[j] == nil {
// nil route point means that
// (i-1)-th node in the route
// must, among other things,
// save the announcement to its
// local memory
continue
}
if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) {
found = true
break
}
}
if !found {
return errWrongRoute
}
}
return nil
}

View file

@ -1,151 +0,0 @@
package loadstorage
import (
"context"
"sort"
"sync"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
type usedSpaceEstimations struct {
announcement container.SizeEstimation
sizes []uint64
}
type storageKey struct {
epoch uint64
cid string
}
// Storage represents in-memory storage of
// container.SizeEstimation values.
//
// The write operation has the usual behavior - to save
// the next number of used container space for a specific epoch.
// All values related to one key (epoch, container ID) are stored
// as a list.
//
// Storage also provides an iterator interface, into the handler
// of which the final score is passed, built on all values saved
// at the time of the call. Currently the only possible estimation
// formula is used - the average between 10th and 90th percentile.
//
// For correct operation, Storage must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// Storage is immediately ready to work through API.
type Storage struct {
mtx sync.RWMutex
mItems map[storageKey]*usedSpaceEstimations
}
// Prm groups the required parameters of the Storage's constructor.
//
// The component is not parameterizable at the moment.
type Prm struct{}
// New creates a new instance of the Storage.
//
// The created Storage does not require additional
// initialization and is completely ready for work.
func New(_ Prm) *Storage {
return &Storage{
mItems: make(map[storageKey]*usedSpaceEstimations),
}
}
// Put appends the next value of the occupied container space for the epoch
// to the list of already saved values.
//
// Always returns nil error.
func (s *Storage) Put(a container.SizeEstimation) error {
s.mtx.Lock()
{
key := storageKey{
epoch: a.Epoch(),
cid: a.Container().EncodeToString(),
}
estimations, ok := s.mItems[key]
if !ok {
estimations = &usedSpaceEstimations{
announcement: a,
sizes: make([]uint64, 0, 1),
}
s.mItems[key] = estimations
}
estimations.sizes = append(estimations.sizes, a.Value())
}
s.mtx.Unlock()
return nil
}
func (s *Storage) Close(context.Context) error {
return nil
}
// Iterate goes through all the lists with the key (container ID, epoch),
// calculates the final grade for all values, and passes it to the handler.
//
// Final grade is the average between 10th and 90th percentiles.
func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) (err error) {
s.mtx.RLock()
{
for _, v := range s.mItems {
if f(v.announcement) {
// calculate estimation based on 90th percentile
v.announcement.SetValue(finalEstimation(v.sizes))
if err = h(v.announcement); err != nil {
break
}
}
}
}
s.mtx.RUnlock()
return
}
func finalEstimation(vals []uint64) uint64 {
sort.Slice(vals, func(i, j int) bool {
return vals[i] < vals[j]
})
const (
lowerRank = 10
upperRank = 90
)
if len(vals) >= lowerRank {
lowerInd := percentile(lowerRank, vals)
upperInd := percentile(upperRank, vals)
vals = vals[lowerInd:upperInd]
}
sum := uint64(0)
for i := range vals {
sum += vals[i]
}
return sum / uint64(len(vals))
}
func percentile(rank int, vals []uint64) int {
p := len(vals) * rank / 100
return p
}

View file

@ -1,50 +0,0 @@
package loadstorage
import (
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
func TestStorage(t *testing.T) {
const epoch uint64 = 13
var a container.SizeEstimation
a.SetContainer(cidtest.ID())
a.SetEpoch(epoch)
const opinionsNum = 100
s := New(Prm{})
opinions := make([]uint64, opinionsNum)
for i := range opinions {
opinions[i] = rand.Uint64()
a.SetValue(opinions[i])
require.NoError(t, s.Put(a))
}
iterCounter := 0
err := s.Iterate(
func(ai container.SizeEstimation) bool {
return ai.Epoch() == epoch
},
func(ai container.SizeEstimation) error {
iterCounter++
require.Equal(t, epoch, ai.Epoch())
require.Equal(t, a.Container(), ai.Container())
require.Equal(t, finalEstimation(opinions), ai.Value())
return nil
},
)
require.NoError(t, err)
require.Equal(t, 1, iterCounter)
}

View file

@ -22,6 +22,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -242,8 +243,7 @@ func (s *Service) redirectPutSingleRequest(ctx context.Context,
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest.IterateAddresses",
trace.WithAttributes(
attribute.String("address", addr.String()),
))
attribute.String("address", addr.String())))
defer span.End()
var err error
@ -257,6 +257,7 @@ func (s *Service) redirectPutSingleRequest(ctx context.Context,
zap.Stringer("address", addr),
zap.Stringer("object_id", objID),
zap.Stringer("container_id", cnrID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
}

View file

@ -0,0 +1,9 @@
package policer
type MetricsRegister interface {
IncProcessedObjects()
}
type noopMetrics struct{}
func (noopMetrics) IncProcessedObjects() {}

View file

@ -69,6 +69,8 @@ type cfg struct {
batchSize, cacheSize uint32
rebalanceFreq, evictDuration, sleepDuration time.Duration
metrics MetricsRegister
}
func defaultCfg() *cfg {
@ -79,6 +81,7 @@ func defaultCfg() *cfg {
rebalanceFreq: 1 * time.Second,
sleepDuration: 1 * time.Second,
evictDuration: 30 * time.Second,
metrics: noopMetrics{},
}
}
@ -170,3 +173,10 @@ func WithPool(p *ants.Pool) Option {
c.taskPool = p
}
}
// WithMetrics returns option to set metrics.
func WithMetrics(m MetricsRegister) Option {
return func(c *cfg) {
c.metrics = m
}
}

View file

@ -60,6 +60,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
}
p.cache.Add(addr.Address, time.Now())
p.objsInWork.remove(addr.Address)
p.metrics.IncProcessedObjects()
}
})
if err != nil {

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"go.opentelemetry.io/otel/attribute"
@ -44,7 +45,8 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
if err != nil {
p.log.Error(logs.ReplicatorCouldNotGetObjectFromLocalStorage,
zap.Stringer("object", task.Addr),
zap.Error(err))
zap.Error(err),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return
}
@ -63,6 +65,7 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
log := p.log.With(
zap.String("node", netmap.StringifyPublicKey(task.Nodes[i])),
zap.Stringer("object", task.Addr),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)

View file

@ -6,6 +6,7 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"go.opentelemetry.io/otel/attribute"
@ -39,7 +40,9 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo
return false
}
s.log.Debug(logs.TreeRedirectingTreeServiceQuery, zap.String("endpoint", endpoint))
s.log.Debug(logs.TreeRedirectingTreeServiceQuery, zap.String("endpoint", endpoint),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
called = true
stop = f(c)
return true

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -107,12 +108,14 @@ func (s *Service) replicationWorker(ctx context.Context) {
if lastErr != nil {
if errors.Is(lastErr, errRecentlyFailed) {
s.log.Debug(logs.TreeDoNotSendUpdateToTheNode,
zap.String("last_error", lastErr.Error()))
zap.String("last_error", lastErr.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
zap.String("last_error", lastErr.Error()),
zap.String("address", lastAddr),
zap.String("key", hex.EncodeToString(task.n.PublicKey())))
zap.String("key", hex.EncodeToString(task.n.PublicKey())),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
s.metrics.AddReplicateTaskDuration(time.Since(start), false)
} else {

19
pkg/tracing/trace.go Normal file
View file

@ -0,0 +1,19 @@
package tracing
import (
"context"
"go.opentelemetry.io/otel/trace"
)
var emptyTraceID = [16]byte{}
// GetTraceID retrieves the trace ID from the provided context.
// It returns an empty string if no trace ID is found.
func GetTraceID(ctx context.Context) string {
span := trace.SpanFromContext(ctx)
if span == nil || span.SpanContext().TraceID() == emptyTraceID {
return ""
}
return span.SpanContext().TraceID().String()
}

View file

@ -31,8 +31,8 @@ type Prm struct {
// support runtime rereading
level zapcore.Level
// MetricsNamespace is the namespace string used for log counter metrics
MetricsNamespace string
// SamplingHook hook for the zap.Logger
SamplingHook func(e zapcore.Entry, sd zapcore.SamplingDecision)
// do not support runtime rereading
}
@ -82,14 +82,12 @@ func NewLogger(prm *Prm) (*Logger, error) {
lvl := zap.NewAtomicLevelAt(prm.level)
m := newLogMetrics(prm.MetricsNamespace)
c := zap.NewProductionConfig()
c.Level = lvl
c.Encoding = "console"
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
c.Sampling.Hook = func(e zapcore.Entry, sd zapcore.SamplingDecision) {
m.Inc(e.Level, sd == zapcore.LogDropped)
if prm.SamplingHook != nil {
c.Sampling.Hook = prm.SamplingHook
}
lZap, err := c.Build(

View file

@ -14,11 +14,16 @@ const (
logDroppedLabel = "dropped"
)
type LogMetrics interface {
Inc(level zapcore.Level, dropped bool)
GetSamplingHook() func(e zapcore.Entry, sd zapcore.SamplingDecision)
}
type logMetrics struct {
logCount *prometheus.CounterVec
}
func newLogMetrics(namespace string) *logMetrics {
func NewLogMetrics(namespace string) LogMetrics {
return &logMetrics{
logCount: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
@ -35,3 +40,9 @@ func (m *logMetrics) Inc(level zapcore.Level, dropped bool) {
logDroppedLabel: strconv.FormatBool(dropped),
}).Inc()
}
func (m *logMetrics) GetSamplingHook() func(zapcore.Entry, zapcore.SamplingDecision) {
return func(e zapcore.Entry, sd zapcore.SamplingDecision) {
m.Inc(e.Level, sd == zapcore.LogDropped)
}
}