Compare commits
1 commit
master
...
feature/se
Author | SHA1 | Date | |
---|---|---|---|
6ba1adfa07 |
8 changed files with 554 additions and 169 deletions
|
@ -158,6 +158,57 @@ func (m NetMap) PlacementVectors(vectors [][]NodeInfo, pivot []byte) ([][]NodeIn
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// SelectFilterNodes returns a two-dimensional list of nodes as a result of applying the
|
||||
// given SelectFilterExpr to the NetMap.
|
||||
// If the SelectFilterExpr contains only filters, the result contains a single row with the
|
||||
// result of the last filter application.
|
||||
// If the SelectFilterExpr contains only selectors, the result contains the selection rows
|
||||
// of the last select application.
|
||||
func (m NetMap) SelectFilterNodes(expr *SelectFilterExpr) ([][]NodeInfo, error) {
|
||||
p := PlacementPolicy{
|
||||
filters: expr.filters,
|
||||
}
|
||||
|
||||
if expr.selector != nil {
|
||||
p.selectors = append(p.selectors, *expr.selector)
|
||||
}
|
||||
|
||||
c := newContext(m)
|
||||
c.setCBF(expr.cbf)
|
||||
|
||||
if err := c.processFilters(p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.processSelectors(p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if expr.selector == nil {
|
||||
var ret []NodeInfo
|
||||
lastFilter := expr.filters[len(expr.filters)-1]
|
||||
for _, ni := range m.nodes {
|
||||
if c.match(c.processedFilters[lastFilter.GetName()], ni) {
|
||||
ret = append(ret, ni)
|
||||
}
|
||||
}
|
||||
return [][]NodeInfo{ret}, nil
|
||||
}
|
||||
|
||||
sel, err := c.getSelection(*c.processedSelectors[expr.selector.GetName()])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ret [][]NodeInfo
|
||||
for i, ns := range sel {
|
||||
ret = append(ret, []NodeInfo{})
|
||||
for _, n := range ns {
|
||||
ret[i] = append(ret[i], n)
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// ContainerNodes returns two-dimensional list of nodes as a result of applying
|
||||
// given PlacementPolicy to the NetMap. Each line of the list corresponds to a
|
||||
// replica descriptor. Line order corresponds to order of ReplicaDescriptor list
|
||||
|
|
|
@ -6,6 +6,8 @@ options {
|
|||
|
||||
policy: UNIQUE? repStmt+ cbfStmt? selectStmt* filterStmt* EOF;
|
||||
|
||||
selectFilterExpr: cbfStmt? selectStmt? filterStmt* EOF;
|
||||
|
||||
repStmt:
|
||||
REP Count = NUMBER1 // number of object replicas
|
||||
(IN Selector = ident)?; // optional selector name
|
||||
|
|
Binary file not shown.
|
@ -12,6 +12,10 @@ func (v *BaseQueryVisitor) VisitPolicy(ctx *PolicyContext) interface{} {
|
|||
return v.VisitChildren(ctx)
|
||||
}
|
||||
|
||||
func (v *BaseQueryVisitor) VisitSelectFilterExpr(ctx *SelectFilterExprContext) interface{} {
|
||||
return v.VisitChildren(ctx)
|
||||
}
|
||||
|
||||
func (v *BaseQueryVisitor) VisitRepStmt(ctx *RepStmtContext) interface{} {
|
||||
return v.VisitChildren(ctx)
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -11,6 +11,9 @@ type QueryVisitor interface {
|
|||
// Visit a parse tree produced by Query#policy.
|
||||
VisitPolicy(ctx *PolicyContext) interface{}
|
||||
|
||||
// Visit a parse tree produced by Query#selectFilterExpr.
|
||||
VisitSelectFilterExpr(ctx *SelectFilterExprContext) interface{}
|
||||
|
||||
// Visit a parse tree produced by Query#repStmt.
|
||||
VisitRepStmt(ctx *RepStmtContext) interface{}
|
||||
|
||||
|
|
|
@ -560,6 +560,44 @@ func (p *PlacementPolicy) DecodeString(s string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SelectFilterExpr is an expression containing only selectors and filters.
|
||||
// It's useful to evaluate their effect before being used in a policy.
|
||||
type SelectFilterExpr struct {
|
||||
cbf uint32
|
||||
selector *netmap.Selector
|
||||
filters []netmap.Filter
|
||||
}
|
||||
|
||||
// DecodeString decodes a string into a SelectFilterExpr.
|
||||
// Returns an error if s is malformed.
|
||||
func DecodeSelectFilterString(s string) (*SelectFilterExpr, error) {
|
||||
var v policyVisitor
|
||||
|
||||
input := antlr.NewInputStream(s)
|
||||
lexer := parser.NewQueryLexer(input)
|
||||
lexer.RemoveErrorListeners()
|
||||
lexer.AddErrorListener(&v)
|
||||
stream := antlr.NewCommonTokenStream(lexer, 0)
|
||||
|
||||
pp := parser.NewQuery(stream)
|
||||
pp.BuildParseTrees = true
|
||||
|
||||
pp.RemoveErrorListeners()
|
||||
pp.AddErrorListener(&v)
|
||||
sfExpr := pp.SelectFilterExpr().Accept(&v)
|
||||
|
||||
if len(v.errors) != 0 {
|
||||
return nil, v.errors[0]
|
||||
}
|
||||
|
||||
parsed, ok := sfExpr.(*SelectFilterExpr)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected parsed instance type %T", sfExpr)
|
||||
}
|
||||
|
||||
return parsed, nil
|
||||
}
|
||||
|
||||
var (
|
||||
// errUnknownFilter is returned when a value of FROM in a query is unknown.
|
||||
errUnknownFilter = errors.New("filter not found")
|
||||
|
@ -636,6 +674,39 @@ func (p *policyVisitor) VisitPolicy(ctx *parser.PolicyContext) any {
|
|||
return pl
|
||||
}
|
||||
|
||||
func (p *policyVisitor) VisitSelectFilterExpr(ctx *parser.SelectFilterExprContext) any {
|
||||
if len(p.errors) != 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sfExpr := new(SelectFilterExpr)
|
||||
|
||||
if cbfStmt := ctx.CbfStmt(); cbfStmt != nil {
|
||||
cbf, ok := cbfStmt.(*parser.CbfStmtContext).Accept(p).(uint32)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sfExpr.cbf = cbf
|
||||
}
|
||||
|
||||
if selStmt := ctx.SelectStmt(); selStmt != nil {
|
||||
sel, ok := selStmt.Accept(p).(*netmap.Selector)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sfExpr.selector = sel
|
||||
}
|
||||
|
||||
filtStmts := ctx.AllFilterStmt()
|
||||
sfExpr.filters = make([]netmap.Filter, 0, len(filtStmts))
|
||||
|
||||
for _, f := range filtStmts {
|
||||
sfExpr.filters = append(sfExpr.filters, *f.Accept(p).(*netmap.Filter))
|
||||
}
|
||||
|
||||
return sfExpr
|
||||
}
|
||||
|
||||
func (p *policyVisitor) VisitCbfStmt(ctx *parser.CbfStmtContext) any {
|
||||
cbf, err := strconv.ParseUint(ctx.GetBackupFactor().GetText(), 10, 32)
|
||||
if err != nil {
|
||||
|
|
|
@ -78,3 +78,28 @@ func TestPlacementPolicyEncoding(t *testing.T) {
|
|||
require.Equal(t, v, v2)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDecodeSelectFilterExpr(t *testing.T) {
|
||||
for _, s := range []string{
|
||||
"SELECT 1 FROM *",
|
||||
"FILTER Color EQ 'Red' AS RedNode",
|
||||
`
|
||||
FILTER Color EQ 'Red' AS RedNode
|
||||
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
|
||||
`,
|
||||
`
|
||||
SELECT 1 FROM RedCircleNode
|
||||
FILTER Color EQ 'Red' AS RedNode
|
||||
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
|
||||
`,
|
||||
`
|
||||
CBF 1
|
||||
SELECT 1 FROM RedCircleNode
|
||||
FILTER Color EQ 'Red' AS RedNode
|
||||
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
|
||||
`,
|
||||
} {
|
||||
_, err := DecodeSelectFilterString(s)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue