diff --git a/go.mod b/go.mod index f3e6160..35f0342 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,18 @@ go 1.22 require ( github.com/stretchr/testify v1.9.0 - golang.org/x/sync v0.10.0 + google.golang.org/grpc v1.69.2 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.30.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect + google.golang.org/protobuf v1.35.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +require golang.org/x/sync v0.10.0 diff --git a/go.sum b/go.sum index 8c93871..7f2985d 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,47 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 h1:X58yt85/IXCx0Y3ZwN6sEIKZzQtDEYaBWrDvErdXrRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= diff --git a/tagging/context.go b/tagging/context.go new file mode 100644 index 0000000..3b6c113 --- /dev/null +++ b/tagging/context.go @@ -0,0 +1,21 @@ +package tagging + +import "context" + +type tagContextKeyType struct{} + +var currentTagKey = tagContextKeyType{} + +func ContextWithIOTag(parent context.Context, ioTag string) context.Context { + return context.WithValue(parent, currentTagKey, ioTag) +} + +func IOTagFromContext(ctx context.Context) (string, bool) { + if ctx == nil { + panic("context must be non nil") + } + if tag, ok := ctx.Value(currentTagKey).(string); ok { + return tag, true + } + return "", false +} diff --git a/tagging/context_test.go b/tagging/context_test.go new file mode 100644 index 0000000..b13b253 --- /dev/null +++ b/tagging/context_test.go @@ -0,0 +1,23 @@ +package tagging + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestContext(t *testing.T) { + ctx := context.Background() + tag, ok := IOTagFromContext(ctx) + require.False(t, ok) + require.Equal(t, "", tag) + ctx = ContextWithIOTag(ctx, "tag1") + tag, ok = IOTagFromContext(ctx) + require.True(t, ok) + require.Equal(t, "tag1", tag) + ctx = ContextWithIOTag(ctx, "tag2") + tag, ok = IOTagFromContext(ctx) + require.True(t, ok) + require.Equal(t, "tag2", tag) +} diff --git a/tagging/grpc.go b/tagging/grpc.go new file mode 100644 index 0000000..5e255dd --- /dev/null +++ b/tagging/grpc.go @@ -0,0 +1,96 @@ +package tagging + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + ioTagHeader = "x-frostfs-io-tag" +) + +// NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. +func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...) + } +} + +// NewStreamClientInterceptor creates new gRPC stream interceptor to set an IO tag to gRPC metadata. +func NewStreamClientInterceptor() grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return streamer(setIOTagToGRPCMetadata(ctx), desc, cc, method, opts...) + } +} + +// NewUnaryServerInterceptor creates new gRPC unary interceptor to extract an IO tag to gRPC metadata. +func NewUnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return handler(extractIOTagFromGRPCMetadata(ctx), req) + } +} + +// NewStreamServerInterceptor creates new gRPC stream interceptor to extract an IO tag to gRPC metadata. +func NewStreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, &serverStream{origin: ss}) + } +} + +func setIOTagToGRPCMetadata(ctx context.Context) context.Context { + ioTag, ok := IOTagFromContext(ctx) + if !ok { + return ctx + } + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.MD{} + } + md.Set(ioTagHeader, ioTag) + return metadata.NewOutgoingContext(ctx, md) +} + +func extractIOTagFromGRPCMetadata(ctx context.Context) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return ctx + } + + values := md.Get(ioTagHeader) + if len(values) > 0 { + return ContextWithIOTag(ctx, values[0]) + } + return ctx +} + +var _ grpc.ServerStream = &serverStream{} + +type serverStream struct { + origin grpc.ServerStream +} + +func (s *serverStream) Context() context.Context { + return extractIOTagFromGRPCMetadata(s.origin.Context()) +} + +func (s *serverStream) RecvMsg(m any) error { + return s.origin.RecvMsg(m) +} + +func (s *serverStream) SendHeader(md metadata.MD) error { + return s.origin.SendHeader(md) +} + +func (s *serverStream) SendMsg(m any) error { + return s.origin.SendMsg(m) +} + +func (s *serverStream) SetHeader(md metadata.MD) error { + return s.origin.SetHeader(md) +} + +func (s *serverStream) SetTrailer(md metadata.MD) { + s.origin.SetTrailer(md) +}