[#xx] Add support for SELECT-FILTER expressions
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
parent
998fe1a7ab
commit
6ba1adfa07
8 changed files with 554 additions and 169 deletions
|
@ -158,6 +158,57 @@ func (m NetMap) PlacementVectors(vectors [][]NodeInfo, pivot []byte) ([][]NodeIn
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SelectFilterNodes returns a two-dimensional list of nodes as a result of applying the
|
||||||
|
// given SelectFilterExpr to the NetMap.
|
||||||
|
// If the SelectFilterExpr contains only filters, the result contains a single row with the
|
||||||
|
// result of the last filter application.
|
||||||
|
// If the SelectFilterExpr contains only selectors, the result contains the selection rows
|
||||||
|
// of the last select application.
|
||||||
|
func (m NetMap) SelectFilterNodes(expr *SelectFilterExpr) ([][]NodeInfo, error) {
|
||||||
|
p := PlacementPolicy{
|
||||||
|
filters: expr.filters,
|
||||||
|
}
|
||||||
|
|
||||||
|
if expr.selector != nil {
|
||||||
|
p.selectors = append(p.selectors, *expr.selector)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := newContext(m)
|
||||||
|
c.setCBF(expr.cbf)
|
||||||
|
|
||||||
|
if err := c.processFilters(p); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := c.processSelectors(p); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if expr.selector == nil {
|
||||||
|
var ret []NodeInfo
|
||||||
|
lastFilter := expr.filters[len(expr.filters)-1]
|
||||||
|
for _, ni := range m.nodes {
|
||||||
|
if c.match(c.processedFilters[lastFilter.GetName()], ni) {
|
||||||
|
ret = append(ret, ni)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return [][]NodeInfo{ret}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sel, err := c.getSelection(*c.processedSelectors[expr.selector.GetName()])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ret [][]NodeInfo
|
||||||
|
for i, ns := range sel {
|
||||||
|
ret = append(ret, []NodeInfo{})
|
||||||
|
for _, n := range ns {
|
||||||
|
ret[i] = append(ret[i], n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ContainerNodes returns two-dimensional list of nodes as a result of applying
|
// ContainerNodes returns two-dimensional list of nodes as a result of applying
|
||||||
// given PlacementPolicy to the NetMap. Each line of the list corresponds to a
|
// given PlacementPolicy to the NetMap. Each line of the list corresponds to a
|
||||||
// replica descriptor. Line order corresponds to order of ReplicaDescriptor list
|
// replica descriptor. Line order corresponds to order of ReplicaDescriptor list
|
||||||
|
|
|
@ -6,6 +6,8 @@ options {
|
||||||
|
|
||||||
policy: UNIQUE? repStmt+ cbfStmt? selectStmt* filterStmt* EOF;
|
policy: UNIQUE? repStmt+ cbfStmt? selectStmt* filterStmt* EOF;
|
||||||
|
|
||||||
|
selectFilterExpr: cbfStmt? selectStmt? filterStmt* EOF;
|
||||||
|
|
||||||
repStmt:
|
repStmt:
|
||||||
REP Count = NUMBER1 // number of object replicas
|
REP Count = NUMBER1 // number of object replicas
|
||||||
(IN Selector = ident)?; // optional selector name
|
(IN Selector = ident)?; // optional selector name
|
||||||
|
|
Binary file not shown.
|
@ -12,6 +12,10 @@ func (v *BaseQueryVisitor) VisitPolicy(ctx *PolicyContext) interface{} {
|
||||||
return v.VisitChildren(ctx)
|
return v.VisitChildren(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *BaseQueryVisitor) VisitSelectFilterExpr(ctx *SelectFilterExprContext) interface{} {
|
||||||
|
return v.VisitChildren(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (v *BaseQueryVisitor) VisitRepStmt(ctx *RepStmtContext) interface{} {
|
func (v *BaseQueryVisitor) VisitRepStmt(ctx *RepStmtContext) interface{} {
|
||||||
return v.VisitChildren(ctx)
|
return v.VisitChildren(ctx)
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -11,6 +11,9 @@ type QueryVisitor interface {
|
||||||
// Visit a parse tree produced by Query#policy.
|
// Visit a parse tree produced by Query#policy.
|
||||||
VisitPolicy(ctx *PolicyContext) interface{}
|
VisitPolicy(ctx *PolicyContext) interface{}
|
||||||
|
|
||||||
|
// Visit a parse tree produced by Query#selectFilterExpr.
|
||||||
|
VisitSelectFilterExpr(ctx *SelectFilterExprContext) interface{}
|
||||||
|
|
||||||
// Visit a parse tree produced by Query#repStmt.
|
// Visit a parse tree produced by Query#repStmt.
|
||||||
VisitRepStmt(ctx *RepStmtContext) interface{}
|
VisitRepStmt(ctx *RepStmtContext) interface{}
|
||||||
|
|
||||||
|
|
|
@ -560,6 +560,44 @@ func (p *PlacementPolicy) DecodeString(s string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SelectFilterExpr is an expression containing only selectors and filters.
|
||||||
|
// It's useful to evaluate their effect before being used in a policy.
|
||||||
|
type SelectFilterExpr struct {
|
||||||
|
cbf uint32
|
||||||
|
selector *netmap.Selector
|
||||||
|
filters []netmap.Filter
|
||||||
|
}
|
||||||
|
|
||||||
|
// DecodeString decodes a string into a SelectFilterExpr.
|
||||||
|
// Returns an error if s is malformed.
|
||||||
|
func DecodeSelectFilterString(s string) (*SelectFilterExpr, error) {
|
||||||
|
var v policyVisitor
|
||||||
|
|
||||||
|
input := antlr.NewInputStream(s)
|
||||||
|
lexer := parser.NewQueryLexer(input)
|
||||||
|
lexer.RemoveErrorListeners()
|
||||||
|
lexer.AddErrorListener(&v)
|
||||||
|
stream := antlr.NewCommonTokenStream(lexer, 0)
|
||||||
|
|
||||||
|
pp := parser.NewQuery(stream)
|
||||||
|
pp.BuildParseTrees = true
|
||||||
|
|
||||||
|
pp.RemoveErrorListeners()
|
||||||
|
pp.AddErrorListener(&v)
|
||||||
|
sfExpr := pp.SelectFilterExpr().Accept(&v)
|
||||||
|
|
||||||
|
if len(v.errors) != 0 {
|
||||||
|
return nil, v.errors[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
parsed, ok := sfExpr.(*SelectFilterExpr)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected parsed instance type %T", sfExpr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return parsed, nil
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// errUnknownFilter is returned when a value of FROM in a query is unknown.
|
// errUnknownFilter is returned when a value of FROM in a query is unknown.
|
||||||
errUnknownFilter = errors.New("filter not found")
|
errUnknownFilter = errors.New("filter not found")
|
||||||
|
@ -636,6 +674,39 @@ func (p *policyVisitor) VisitPolicy(ctx *parser.PolicyContext) any {
|
||||||
return pl
|
return pl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *policyVisitor) VisitSelectFilterExpr(ctx *parser.SelectFilterExprContext) any {
|
||||||
|
if len(p.errors) != 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sfExpr := new(SelectFilterExpr)
|
||||||
|
|
||||||
|
if cbfStmt := ctx.CbfStmt(); cbfStmt != nil {
|
||||||
|
cbf, ok := cbfStmt.(*parser.CbfStmtContext).Accept(p).(uint32)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sfExpr.cbf = cbf
|
||||||
|
}
|
||||||
|
|
||||||
|
if selStmt := ctx.SelectStmt(); selStmt != nil {
|
||||||
|
sel, ok := selStmt.Accept(p).(*netmap.Selector)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sfExpr.selector = sel
|
||||||
|
}
|
||||||
|
|
||||||
|
filtStmts := ctx.AllFilterStmt()
|
||||||
|
sfExpr.filters = make([]netmap.Filter, 0, len(filtStmts))
|
||||||
|
|
||||||
|
for _, f := range filtStmts {
|
||||||
|
sfExpr.filters = append(sfExpr.filters, *f.Accept(p).(*netmap.Filter))
|
||||||
|
}
|
||||||
|
|
||||||
|
return sfExpr
|
||||||
|
}
|
||||||
|
|
||||||
func (p *policyVisitor) VisitCbfStmt(ctx *parser.CbfStmtContext) any {
|
func (p *policyVisitor) VisitCbfStmt(ctx *parser.CbfStmtContext) any {
|
||||||
cbf, err := strconv.ParseUint(ctx.GetBackupFactor().GetText(), 10, 32)
|
cbf, err := strconv.ParseUint(ctx.GetBackupFactor().GetText(), 10, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -78,3 +78,28 @@ func TestPlacementPolicyEncoding(t *testing.T) {
|
||||||
require.Equal(t, v, v2)
|
require.Equal(t, v, v2)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDecodeSelectFilterExpr(t *testing.T) {
|
||||||
|
for _, s := range []string{
|
||||||
|
"SELECT 1 FROM *",
|
||||||
|
"FILTER Color EQ 'Red' AS RedNode",
|
||||||
|
`
|
||||||
|
FILTER Color EQ 'Red' AS RedNode
|
||||||
|
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
|
||||||
|
`,
|
||||||
|
`
|
||||||
|
SELECT 1 FROM RedCircleNode
|
||||||
|
FILTER Color EQ 'Red' AS RedNode
|
||||||
|
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
|
||||||
|
`,
|
||||||
|
`
|
||||||
|
CBF 1
|
||||||
|
SELECT 1 FROM RedCircleNode
|
||||||
|
FILTER Color EQ 'Red' AS RedNode
|
||||||
|
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
|
||||||
|
`,
|
||||||
|
} {
|
||||||
|
_, err := DecodeSelectFilterString(s)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue