Compare commits

...

1 commit

Author SHA1 Message Date
6ba1adfa07 [#xx] Add support for SELECT-FILTER expressions
All checks were successful
/ DCO (pull_request) Successful in 4m59s
/ Lint (pull_request) Successful in 2m32s
/ Tests (1.19) (pull_request) Successful in 1m55s
/ Tests (1.20) (pull_request) Successful in 2m9s
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-07-14 10:36:57 +03:00
8 changed files with 554 additions and 169 deletions

View file

@ -158,6 +158,57 @@ func (m NetMap) PlacementVectors(vectors [][]NodeInfo, pivot []byte) ([][]NodeIn
return result, nil
}
// SelectFilterNodes returns a two-dimensional list of nodes as a result of applying the
// given SelectFilterExpr to the NetMap.
// If the SelectFilterExpr contains only filters, the result contains a single row with the
// result of the last filter application.
// If the SelectFilterExpr contains only selectors, the result contains the selection rows
// of the last select application.
func (m NetMap) SelectFilterNodes(expr *SelectFilterExpr) ([][]NodeInfo, error) {
p := PlacementPolicy{
filters: expr.filters,
}
if expr.selector != nil {
p.selectors = append(p.selectors, *expr.selector)
}
c := newContext(m)
c.setCBF(expr.cbf)
if err := c.processFilters(p); err != nil {
return nil, err
}
if err := c.processSelectors(p); err != nil {
return nil, err
}
if expr.selector == nil {
var ret []NodeInfo
lastFilter := expr.filters[len(expr.filters)-1]
for _, ni := range m.nodes {
if c.match(c.processedFilters[lastFilter.GetName()], ni) {
ret = append(ret, ni)
}
}
return [][]NodeInfo{ret}, nil
}
sel, err := c.getSelection(*c.processedSelectors[expr.selector.GetName()])
if err != nil {
return nil, err
}
var ret [][]NodeInfo
for i, ns := range sel {
ret = append(ret, []NodeInfo{})
for _, n := range ns {
ret[i] = append(ret[i], n)
}
}
return ret, nil
}
// ContainerNodes returns two-dimensional list of nodes as a result of applying
// given PlacementPolicy to the NetMap. Each line of the list corresponds to a
// replica descriptor. Line order corresponds to order of ReplicaDescriptor list

View file

@ -6,6 +6,8 @@ options {
policy: UNIQUE? repStmt+ cbfStmt? selectStmt* filterStmt* EOF;
selectFilterExpr: cbfStmt? selectStmt? filterStmt* EOF;
repStmt:
REP Count = NUMBER1 // number of object replicas
(IN Selector = ident)?; // optional selector name

Binary file not shown.

View file

@ -12,6 +12,10 @@ func (v *BaseQueryVisitor) VisitPolicy(ctx *PolicyContext) interface{} {
return v.VisitChildren(ctx)
}
func (v *BaseQueryVisitor) VisitSelectFilterExpr(ctx *SelectFilterExprContext) interface{} {
return v.VisitChildren(ctx)
}
func (v *BaseQueryVisitor) VisitRepStmt(ctx *RepStmtContext) interface{} {
return v.VisitChildren(ctx)
}

File diff suppressed because it is too large Load diff

View file

@ -11,6 +11,9 @@ type QueryVisitor interface {
// Visit a parse tree produced by Query#policy.
VisitPolicy(ctx *PolicyContext) interface{}
// Visit a parse tree produced by Query#selectFilterExpr.
VisitSelectFilterExpr(ctx *SelectFilterExprContext) interface{}
// Visit a parse tree produced by Query#repStmt.
VisitRepStmt(ctx *RepStmtContext) interface{}

View file

@ -560,6 +560,44 @@ func (p *PlacementPolicy) DecodeString(s string) error {
return nil
}
// SelectFilterExpr is an expression containing only selectors and filters.
// It's useful to evaluate their effect before being used in a policy.
type SelectFilterExpr struct {
cbf uint32
selector *netmap.Selector
filters []netmap.Filter
}
// DecodeString decodes a string into a SelectFilterExpr.
// Returns an error if s is malformed.
func DecodeSelectFilterString(s string) (*SelectFilterExpr, error) {
var v policyVisitor
input := antlr.NewInputStream(s)
lexer := parser.NewQueryLexer(input)
lexer.RemoveErrorListeners()
lexer.AddErrorListener(&v)
stream := antlr.NewCommonTokenStream(lexer, 0)
pp := parser.NewQuery(stream)
pp.BuildParseTrees = true
pp.RemoveErrorListeners()
pp.AddErrorListener(&v)
sfExpr := pp.SelectFilterExpr().Accept(&v)
if len(v.errors) != 0 {
return nil, v.errors[0]
}
parsed, ok := sfExpr.(*SelectFilterExpr)
if !ok {
return nil, fmt.Errorf("unexpected parsed instance type %T", sfExpr)
}
return parsed, nil
}
var (
// errUnknownFilter is returned when a value of FROM in a query is unknown.
errUnknownFilter = errors.New("filter not found")
@ -636,6 +674,39 @@ func (p *policyVisitor) VisitPolicy(ctx *parser.PolicyContext) any {
return pl
}
func (p *policyVisitor) VisitSelectFilterExpr(ctx *parser.SelectFilterExprContext) any {
if len(p.errors) != 0 {
return nil
}
sfExpr := new(SelectFilterExpr)
if cbfStmt := ctx.CbfStmt(); cbfStmt != nil {
cbf, ok := cbfStmt.(*parser.CbfStmtContext).Accept(p).(uint32)
if !ok {
return nil
}
sfExpr.cbf = cbf
}
if selStmt := ctx.SelectStmt(); selStmt != nil {
sel, ok := selStmt.Accept(p).(*netmap.Selector)
if !ok {
return nil
}
sfExpr.selector = sel
}
filtStmts := ctx.AllFilterStmt()
sfExpr.filters = make([]netmap.Filter, 0, len(filtStmts))
for _, f := range filtStmts {
sfExpr.filters = append(sfExpr.filters, *f.Accept(p).(*netmap.Filter))
}
return sfExpr
}
func (p *policyVisitor) VisitCbfStmt(ctx *parser.CbfStmtContext) any {
cbf, err := strconv.ParseUint(ctx.GetBackupFactor().GetText(), 10, 32)
if err != nil {

View file

@ -78,3 +78,28 @@ func TestPlacementPolicyEncoding(t *testing.T) {
require.Equal(t, v, v2)
})
}
func TestDecodeSelectFilterExpr(t *testing.T) {
for _, s := range []string{
"SELECT 1 FROM *",
"FILTER Color EQ 'Red' AS RedNode",
`
FILTER Color EQ 'Red' AS RedNode
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
`,
`
SELECT 1 FROM RedCircleNode
FILTER Color EQ 'Red' AS RedNode
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
`,
`
CBF 1
SELECT 1 FROM RedCircleNode
FILTER Color EQ 'Red' AS RedNode
FILTER @RedNode AND Shape EQ 'Cirle' AS RedCircleNode
`,
} {
_, err := DecodeSelectFilterString(s)
require.NoError(t, err)
}
}