[] innerring: Use Head with TTL in PoR

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-12-23 17:47:00 +03:00 committed by Alex Vanin
parent f9e81383ae
commit 10b548275a
4 changed files with 14 additions and 6 deletions
pkg
innerring
services/audit/auditor

View file

@ -114,13 +114,21 @@ func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.Stor
} }
// GetHeader requests node from the container under audit to return object header by id. // GetHeader requests node from the container under audit to return object header by id.
func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.ID) (*object.Object, error) { func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.ID, relay bool) (*object.Object, error) {
raw := true
ttl := uint32(1)
if relay {
ttl = 10 // todo: instead of hardcode value we can set TTL based on container length
raw = false
}
objAddress := new(object.Address) objAddress := new(object.Address)
objAddress.SetContainerID(task.ContainerID()) objAddress.SetContainerID(task.ContainerID())
objAddress.SetObjectID(id) objAddress.SetObjectID(id)
headParams := new(client.ObjectHeaderParams) headParams := new(client.ObjectHeaderParams)
headParams.WithRawFlag(true) headParams.WithRawFlag(raw)
headParams.WithMainFields() headParams.WithMainFields()
headParams.WithAddress(objAddress) headParams.WithAddress(objAddress)
@ -135,7 +143,7 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.
} }
cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout)
head, err := cli.GetObjectHeader(cctx, headParams, client.WithTTL(1)) head, err := cli.GetObjectHeader(cctx, headParams, client.WithTTL(ttl))
cancel() cancel()

View file

@ -83,7 +83,7 @@ type ContainerCommunicator interface {
GetSG(*audit.Task, *object.ID) (*storagegroup.StorageGroup, error) GetSG(*audit.Task, *object.ID) (*storagegroup.StorageGroup, error)
// Must return object header from the container node. // Must return object header from the container node.
GetHeader(*audit.Task, *netmap.Node, *object.ID) (*object.Object, error) GetHeader(*audit.Task, *netmap.Node, *object.ID, bool) (*object.Object, error)
// Must return homomorphic Tillich-Zemor hash of payload range of the // Must return homomorphic Tillich-Zemor hash of payload range of the
// object stored in container node. // object stored in container node.

View file

@ -50,7 +50,7 @@ func (c *Context) processObjectPlacement(id *object.ID, nodes netmap.Nodes, repl
for i := 0; ok < replicas && i < len(nodes); i++ { for i := 0; ok < replicas && i < len(nodes); i++ {
// try to get object header from node // try to get object header from node
hdr, err := c.cnrCom.GetHeader(c.task, nodes[i], id) hdr, err := c.cnrCom.GetHeader(c.task, nodes[i], id, false)
if err != nil { if err != nil {
c.log.Debug("could not get object header from candidate", c.log.Debug("could not get object header from candidate",
zap.Stringer("id", id), zap.Stringer("id", id),

View file

@ -45,7 +45,7 @@ func (c *Context) checkStorageGroupPoR(ind int, sg *object.ID) {
} }
for _, node := range placement.FlattenNodes(objectPlacement) { for _, node := range placement.FlattenNodes(objectPlacement) {
hdr, err := c.cnrCom.GetHeader(c.task, node, members[i]) hdr, err := c.cnrCom.GetHeader(c.task, node, members[i], true)
if err != nil { if err != nil {
c.log.Debug("can't head object", c.log.Debug("can't head object",
zap.String("remote_node", node.Address()), zap.String("remote_node", node.Address()),