[#205] netmap: Add EC statement to placement policy

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-02-22 22:29:25 +03:00 committed by Evgenii Stratonikov
parent d33b54d280
commit 70e9e40c7f
13 changed files with 809 additions and 406 deletions

View file

@ -34,8 +34,17 @@ type PlacementPolicy struct {
func (p *PlacementPolicy) readFromV2(m netmap.PlacementPolicy, checkFieldPresence bool) error {
p.replicas = m.GetReplicas()
if checkFieldPresence && len(p.replicas) == 0 {
return errors.New("missing replicas")
if checkFieldPresence {
if len(p.replicas) == 0 {
return errors.New("missing replicas")
}
if len(p.replicas) != 1 {
for i := range p.replicas {
if p.replicas[i].GetECDataCount() != 0 || p.replicas[i].GetECParityCount() != 0 {
return errors.New("erasure code group must be used exclusively")
}
}
}
}
p.backupFactor = m.GetContainerBackupFactor()
@ -393,10 +402,14 @@ func (p PlacementPolicy) WriteStringTo(w io.StringWriter) (err error) {
c := p.replicas[i].GetCount()
s := p.replicas[i].GetSelector()
if s != "" {
_, err = w.WriteString(fmt.Sprintf("%sREP %d IN %s", delim, c, s))
} else {
if c != 0 {
_, err = w.WriteString(fmt.Sprintf("%sREP %d", delim, c))
} else {
ecx, ecy := p.replicas[i].GetECDataCount(), p.replicas[i].GetECParityCount()
_, err = w.WriteString(fmt.Sprintf("%sEC %d.%d", delim, ecx, ecy))
}
if s != "" {
_, err = w.WriteString(fmt.Sprintf(" IN %s", s))
}
if err != nil {
@ -630,6 +643,8 @@ var (
"make sure to pair REP and SELECT clauses: \"REP .. IN X\" + \"SELECT ... AS X\"")
// errRedundantSelector is returned for errors found by filters policy validator.
errRedundantFilter = errors.New("policy: found redundant filter")
// errECFewSelectors is returned when EC keyword is used without UNIQUE keyword.
errECFewSelectors = errors.New("policy: too few nodes to select")
)
type policyVisitor struct {
@ -657,11 +672,18 @@ func (p *policyVisitor) VisitPolicy(ctx *parser.PolicyContext) any {
pl.unique = ctx.UNIQUE() != nil
repStmts := ctx.AllRepStmt()
pl.replicas = make([]netmap.Replica, 0, len(repStmts))
for _, r := range repStmts {
res, ok := r.Accept(p).(*netmap.Replica)
stmts := ctx.GetChildren()
for _, r := range stmts {
var res *netmap.Replica
var ok bool
switch r := r.(type) {
case parser.IRepStmtContext:
res, ok = r.Accept(p).(*netmap.Replica)
case parser.IEcStmtContext:
res, ok = r.Accept(p).(*netmap.Replica)
default:
continue
}
if !ok {
return nil
}
@ -758,6 +780,28 @@ func (p *policyVisitor) VisitRepStmt(ctx *parser.RepStmtContext) any {
return rs
}
// VisitRepStmt implements parser.QueryVisitor interface.
func (p *policyVisitor) VisitEcStmt(ctx *parser.EcStmtContext) any {
dataCount, err := strconv.ParseUint(ctx.GetData().GetText(), 10, 32)
if err != nil {
return p.reportError(errInvalidNumber)
}
parityCount, err := strconv.ParseUint(ctx.GetParity().GetText(), 10, 32)
if err != nil {
return p.reportError(errInvalidNumber)
}
rs := new(netmap.Replica)
rs.SetECDataCount(uint32(dataCount))
rs.SetECParityCount(uint32(parityCount))
if sel := ctx.GetSelector(); sel != nil {
rs.SetSelector(sel.GetText())
}
return rs
}
// VisitSelectStmt implements parser.QueryVisitor interface.
func (p *policyVisitor) VisitSelectStmt(ctx *parser.SelectStmtContext) any {
res, err := strconv.ParseUint(ctx.GetCount().GetText(), 10, 32)
@ -910,6 +954,14 @@ func validatePolicy(p PlacementPolicy) error {
if seenSelectors[selName] == nil {
return fmt.Errorf("%w: '%s'", errUnknownSelector, selName)
}
dataCount := p.replicas[i].GetECDataCount()
parityCount := p.replicas[i].GetECParityCount()
if dataCount != 0 || parityCount != 0 {
if c := seenSelectors[selName].GetCount(); c < dataCount+parityCount {
return fmt.Errorf("%w: %d < %d + %d", errECFewSelectors, c, dataCount, parityCount)
}
}
}
}