package router import ( "context" "encoding/hex" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" "go.uber.org/zap" ) // RouteInfo wraps epoch provider with additional passed // route data. It is only used inside Router and is // not passed in any external methods. type RouteInfo struct { common.EpochProvider passedRoute []common.ServerInfo } // NewRouteInfo wraps the main context of value passing with its traversal route and epoch. func NewRouteInfo(ep common.EpochProvider, passed []common.ServerInfo) *RouteInfo { return &RouteInfo{ EpochProvider: ep, passedRoute: passed, } } type trustWriter struct { router *Router routeInfo *RouteInfo routeMtx sync.RWMutex mServers map[string]common.Writer } // InitWriter initializes and returns Writer that sends each value to its next route point. // // If ep was created by NewRouteInfo, then the traversed route is taken into account, // and the value will be sent to its continuation. Otherwise, the route will be laid // from scratch and the value will be sent to its primary point. // // After building a list of remote points of the next leg of the route, the value is sent // sequentially to all of them. If any transmissions (even all) fail, an error will not // be returned. // // Close of the composed Writer calls Close method on each internal Writer generated in // runtime and never returns an error. // // Always returns nil error. func (r *Router) InitWriter(ep common.EpochProvider) (common.Writer, error) { var ( routeInfo *RouteInfo ok bool ) if routeInfo, ok = ep.(*RouteInfo); !ok { routeInfo = &RouteInfo{ EpochProvider: ep, passedRoute: []common.ServerInfo{r.localSrvInfo}, } } return &trustWriter{ router: r, routeInfo: routeInfo, mServers: make(map[string]common.Writer), }, nil } func (w *trustWriter) Write(ctx context.Context, t reputation.Trust) error { w.routeMtx.Lock() defer w.routeMtx.Unlock() route, err := w.router.routeBuilder.NextStage(w.routeInfo.Epoch(), t, w.routeInfo.passedRoute) if err != nil { return err } else if len(route) == 0 { route = []common.ServerInfo{nil} } for _, remoteInfo := range route { var key string if remoteInfo != nil { key = hex.EncodeToString(remoteInfo.PublicKey()) } remoteWriter, ok := w.mServers[key] if !ok { provider, err := w.router.remoteProvider.InitRemote(remoteInfo) if err != nil { w.router.log.Debug(logs.RouterCouldNotInitializeWriterProvider, zap.String("error", err.Error()), ) continue } // init writer with original context wrapped in routeContext remoteWriter, err = provider.InitWriter(w.routeInfo.EpochProvider) if err != nil { w.router.log.Debug(logs.RouterCouldNotInitializeWriter, zap.String("error", err.Error()), ) continue } w.mServers[key] = remoteWriter } err := remoteWriter.Write(ctx, t) if err != nil { w.router.log.Debug(logs.RouterCouldNotWriteTheValue, zap.String("error", err.Error()), ) } } return nil } func (w *trustWriter) Close(ctx context.Context) error { for key, wRemote := range w.mServers { err := wRemote.Close(ctx) if err != nil { w.router.log.Debug(logs.RouterCouldNotCloseRemoteServerWriter, zap.String("key", key), zap.String("error", err.Error()), ) } } return nil }