package redisotel import ( "context" "fmt" "net" "runtime" "strings" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" "go.opentelemetry.io/otel/trace" "github.com/redis/go-redis/extra/rediscmd/v9" "github.com/redis/go-redis/v9" ) const ( instrumName = "github.com/redis/go-redis/extra/redisotel" ) func InstrumentTracing(rdb redis.UniversalClient, opts ...TracingOption) error { switch rdb := rdb.(type) { case *redis.Client: opt := rdb.Options() connString := formatDBConnString(opt.Network, opt.Addr) rdb.AddHook(newTracingHook(connString, opts...)) return nil case *redis.ClusterClient: rdb.AddHook(newTracingHook("", opts...)) rdb.OnNewNode(func(rdb *redis.Client) { opt := rdb.Options() connString := formatDBConnString(opt.Network, opt.Addr) rdb.AddHook(newTracingHook(connString, opts...)) }) return nil case *redis.Ring: rdb.AddHook(newTracingHook("", opts...)) rdb.OnNewNode(func(rdb *redis.Client) { opt := rdb.Options() connString := formatDBConnString(opt.Network, opt.Addr) rdb.AddHook(newTracingHook(connString, opts...)) }) return nil default: return fmt.Errorf("redisotel: %T not supported", rdb) } } type tracingHook struct { conf *config spanOpts []trace.SpanStartOption } var _ redis.Hook = (*tracingHook)(nil) func newTracingHook(connString string, opts ...TracingOption) *tracingHook { baseOpts := make([]baseOption, len(opts)) for i, opt := range opts { baseOpts[i] = opt } conf := newConfig(baseOpts...) if conf.tracer == nil { conf.tracer = conf.tp.Tracer( instrumName, trace.WithInstrumentationVersion("semver:"+redis.Version()), ) } if connString != "" { conf.attrs = append(conf.attrs, semconv.DBConnectionStringKey.String(connString)) } return &tracingHook{ conf: conf, spanOpts: []trace.SpanStartOption{ trace.WithSpanKind(trace.SpanKindClient), trace.WithAttributes(conf.attrs...), }, } } func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook { return func(ctx context.Context, network, addr string) (net.Conn, error) { if !trace.SpanFromContext(ctx).IsRecording() { return hook(ctx, network, addr) } ctx, span := th.conf.tracer.Start(ctx, "redis.dial", th.spanOpts...) defer span.End() conn, err := hook(ctx, network, addr) if err != nil { recordError(span, err) return nil, err } return conn, nil } } func (th *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook { return func(ctx context.Context, cmd redis.Cmder) error { if !trace.SpanFromContext(ctx).IsRecording() { return hook(ctx, cmd) } fn, file, line := funcFileLine("github.com/redis/go-redis") attrs := make([]attribute.KeyValue, 0, 8) attrs = append(attrs, semconv.CodeFunctionKey.String(fn), semconv.CodeFilepathKey.String(file), semconv.CodeLineNumberKey.Int(line), ) if th.conf.dbStmtEnabled { cmdString := rediscmd.CmdString(cmd) attrs = append(attrs, semconv.DBStatementKey.String(cmdString)) } opts := th.spanOpts opts = append(opts, trace.WithAttributes(attrs...)) ctx, span := th.conf.tracer.Start(ctx, cmd.FullName(), opts...) defer span.End() if err := hook(ctx, cmd); err != nil { recordError(span, err) return err } return nil } } func (th *tracingHook) ProcessPipelineHook( hook redis.ProcessPipelineHook, ) redis.ProcessPipelineHook { return func(ctx context.Context, cmds []redis.Cmder) error { if !trace.SpanFromContext(ctx).IsRecording() { return hook(ctx, cmds) } fn, file, line := funcFileLine("github.com/redis/go-redis") attrs := make([]attribute.KeyValue, 0, 8) attrs = append(attrs, semconv.CodeFunctionKey.String(fn), semconv.CodeFilepathKey.String(file), semconv.CodeLineNumberKey.Int(line), attribute.Int("db.redis.num_cmd", len(cmds)), ) summary, cmdsString := rediscmd.CmdsString(cmds) if th.conf.dbStmtEnabled { attrs = append(attrs, semconv.DBStatementKey.String(cmdsString)) } opts := th.spanOpts opts = append(opts, trace.WithAttributes(attrs...)) ctx, span := th.conf.tracer.Start(ctx, "redis.pipeline "+summary, opts...) defer span.End() if err := hook(ctx, cmds); err != nil { recordError(span, err) return err } return nil } } func recordError(span trace.Span, err error) { if err != redis.Nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } } func formatDBConnString(network, addr string) string { if network == "tcp" { network = "redis" } return fmt.Sprintf("%s://%s", network, addr) } func funcFileLine(pkg string) (string, string, int) { const depth = 16 var pcs [depth]uintptr n := runtime.Callers(3, pcs[:]) ff := runtime.CallersFrames(pcs[:n]) var fn, file string var line int for { f, ok := ff.Next() if !ok { break } fn, file, line = f.Function, f.File, f.Line if !strings.Contains(fn, pkg) { break } } if ind := strings.LastIndexByte(fn, "/"); ind != -1 { fn = fn[ind+1:] } return fn, file, line }
Related articles
redis conn_check_dummy
//go:build !linux && !darwin && !dragonfly && !freebsd && !netbsd && !openbsd && !solaris && !illumos // +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos package pool import "net" func connCheck(conn net.Conn) error { retu
redis rediscmd_test
package rediscmd import ( "testing" . "github.com/bsm/ginkgo/v2" . "github.com/bsm/gomega" ) func TestGinkgo(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "redisext") } var _ = Describe("AppendArg", func() { DescribeTable("...", func(
redis pool_single
package pool import "context" type SingleConnPool struct { pool Pooler cn *Conn stickyErr error } var _ Pooler = (*SingleConnPool)(nil) func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool { return &SingleConnPool{ pool: po
redis iterator
package redis import ( "context" ) // ScanIterator is used to incrementally iterate over a collection of elements. type ScanIterator struct { cmd *ScanCmd pos int } // Err returns the last iterator error, if any. func (it *ScanIterator) Err() error
redis release
#!/bin/bash set -e help() { cat <<- EOF Usage: TAG=tag $0 Updates version in go.mod files and pushes a new brash to GitHub. VARIABLES: TAG git tag, for example, v1.0.0 EOF exit 0 } if [ -z "$TAG" ] then printf "TAG is required\n
redis once
/* Copyright 2014 The Camlistore Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENS
redis reader_test
package proto_test import ( "bytes" "io" "testing" "github.com/redis/go-redis/v9/internal/proto" ) func BenchmarkReader_ParseReply_Status(b *testing.B) { benchmarkParseReply(b, "+OK\r\n", false) } func BenchmarkReader_ParseReply_Int(b *testing.B
redis cluster
package redis import ( "context" "crypto/tls" "fmt" "math" "net" "net/url" "runtime" "sort" "strings" "sync" "sync/atomic" "time" "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/hashtag" "github.com/redis/g
redis race_test
package redis_test import ( "bytes" "fmt" "net" "strconv" "sync/atomic" "testing" "time" . "github.com/bsm/ginkgo/v2" . "github.com/bsm/gomega" "github.com/redis/go-redis/v9" ) var _ = Describe("races", func() { var client *redis.Client v
redis safe
//go:build appengine // +build appengine package util func BytesToString(b []byte) string { return string(b) } func StringToBytes(s string) []byte { return []byte(s) }