package innerring import ( "context" "crypto/ecdsa" "fmt" "time" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" storagegroup2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup" frostfsapiclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/auditor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/storagegroup" "go.uber.org/zap" ) type ( ClientCache struct { log *logger.Logger cache interface { Get(clientcore.NodeInfo) (clientcore.Client, error) CloseAll() } key *ecdsa.PrivateKey sgTimeout, headTimeout, rangeTimeout time.Duration } clientCacheParams struct { Log *logger.Logger Key *ecdsa.PrivateKey AllowExternal bool SGTimeout, HeadTimeout, RangeTimeout time.Duration } ) func newClientCache(p *clientCacheParams) *ClientCache { return &ClientCache{ log: p.Log, cache: cache.NewSDKClientCache(cache.ClientCacheOpts{AllowExternal: p.AllowExternal, Key: p.Key}), key: p.Key, sgTimeout: p.SGTimeout, headTimeout: p.HeadTimeout, rangeTimeout: p.RangeTimeout, } } func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) { // Because cache is used by `ClientCache` exclusively, // client will always have valid key. return c.cache.Get(info) } // GetSG polls the container to get the object by id. // Returns storage groups structure from received object. // // Returns an error of type apistatus.ObjectNotFound if storage group is missing. func (c *ClientCache) GetSG(prm storagegroup2.GetSGPrm) (*storagegroup.StorageGroup, error) { var sgAddress oid.Address sgAddress.SetContainer(prm.CID) sgAddress.SetObject(prm.OID) return c.getSG(prm.Context, sgAddress, &prm.NetMap, prm.Container) } func (c *ClientCache) getSG(ctx context.Context, addr oid.Address, nm *netmap.NetMap, cn [][]netmap.NodeInfo) (*storagegroup.StorageGroup, error) { obj := addr.Object() nodes, err := placement.BuildObjectPlacement(nm, cn, &obj) if err != nil { return nil, fmt.Errorf("can't build object placement: %w", err) } var info clientcore.NodeInfo var getObjPrm frostfsapiclient.GetObjectPrm getObjPrm.SetAddress(addr) for _, node := range placement.FlattenNodes(nodes) { err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(node)) if err != nil { return nil, fmt.Errorf("parse client node info: %w", err) } cli, err := c.getWrappedClient(info) if err != nil { c.log.Warn("can't setup remote connection", zap.String("error", err.Error())) continue } ctx, cancel := context.WithTimeout(ctx, c.sgTimeout) // NOTE: we use the function which does not verify object integrity (checksums, signature), // but it would be useful to do as part of a data audit. res, err := cli.GetObject(ctx, getObjPrm) cancel() if err != nil { c.log.Warn("can't get storage group object", zap.String("error", err.Error())) continue } var sg storagegroup.StorageGroup err = storagegroup.ReadFromObject(&sg, *res.Object()) if err != nil { return nil, fmt.Errorf("can't parse storage group from a object: %w", err) } return &sg, nil } var errNotFound apistatus.ObjectNotFound return nil, errNotFound } // GetHeader requests node from the container under audit to return object header by id. func (c *ClientCache) GetHeader(ctx context.Context, prm auditor.GetHeaderPrm) (*object.Object, error) { var objAddress oid.Address objAddress.SetContainer(prm.CID) objAddress.SetObject(prm.OID) var info clientcore.NodeInfo err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(prm.Node)) if err != nil { return nil, fmt.Errorf("parse client node info: %w", err) } cli, err := c.getWrappedClient(info) if err != nil { return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) } cctx, cancel := context.WithTimeout(ctx, c.headTimeout) var obj *object.Object if prm.NodeIsRelay { obj, err = frostfsapiclient.GetObjectHeaderFromContainer(cctx, cli, objAddress) } else { obj, err = frostfsapiclient.GetRawObjectHeaderLocally(cctx, cli, objAddress) } cancel() if err != nil { return nil, fmt.Errorf("object head error: %w", err) } return obj, nil } // GetRangeHash requests node from the container under audit to return Tillich-Zemor hash of the // payload range of the object with specified identifier. func (c *ClientCache) GetRangeHash(ctx context.Context, prm auditor.GetRangeHashPrm) ([]byte, error) { var objAddress oid.Address objAddress.SetContainer(prm.CID) objAddress.SetObject(prm.OID) var info clientcore.NodeInfo err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(prm.Node)) if err != nil { return nil, fmt.Errorf("parse client node info: %w", err) } cli, err := c.getWrappedClient(info) if err != nil { return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) } cctx, cancel := context.WithTimeout(ctx, c.rangeTimeout) h, err := frostfsapiclient.HashObjectRange(cctx, cli, objAddress, prm.Range) cancel() if err != nil { return nil, fmt.Errorf("object rangehash error: %w", err) } return h, nil } func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (frostfsapiclient.Client, error) { // can be also cached var cInternal frostfsapiclient.Client cli, err := c.Get(info) if err != nil { return cInternal, fmt.Errorf("could not get API client from cache") } cInternal.WrapBasicClient(cli) cInternal.SetPrivateKey(c.key) return cInternal, nil } func (c ClientCache) ListSG(dst *storagegroup2.SearchSGDst, prm storagegroup2.SearchSGPrm) error { cli, err := c.getWrappedClient(prm.NodeInfo) if err != nil { return fmt.Errorf("could not get API client from cache") } var cliPrm frostfsapiclient.SearchSGPrm cliPrm.SetContainerID(prm.Container) res, err := cli.SearchSG(prm.Context, cliPrm) if err != nil { return err } dst.Objects = res.IDList() return nil }