package main import ( "context" "fmt" "log" "os" "path" "time" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" ) func createTableAndCDC(ctx context.Context, db ydb.Connection, consumersCount int) { err := createTables(ctx, db) if err != nil { log.Fatalf("failed to create tables: %+v", err) } err = createCosumers(ctx, db, consumersCount) if err != nil { log.Fatalf("failed to create consumers: %+v", err) } } func createTables(ctx context.Context, db ydb.Connection) error { err := db.Table().Do(ctx, func(ctx context.Context, s table.Session) error { err := s.DropTable(ctx, path.Join(db.Name(), "bus")) if ydb.IsOperationErrorSchemeError(err) { err = nil } return err }) if err != nil { return fmt.Errorf("failed to drop table: %w", err) } _, err = db.Scripting().Execute(ctx, ` CREATE TABLE bus (id Text, freeSeats Int64, PRIMARY KEY(id)); ALTER TABLE bus ADD CHANGEFEED updates WITH ( FORMAT = "JSON", MODE = "UPDATES" ) `, nil) if err != nil { return fmt.Errorf("failed to create table: %w", err) } _, err = db.Scripting().Execute(ctx, ` UPSERT INTO bus (id, freeSeats) VALUES ("bus1", 40), ("bus2", 60); `, nil) if err != nil { return fmt.Errorf("failed insert rows: %w", err) } return nil } func createCosumers(ctx context.Context, db ydb.Connection, consumersCount int) error { for i := 0; i < consumersCount; i++ { err := db.Topic().Alter(ctx, "bus/updates", topicoptions.AlterWithAddConsumers(topictypes.Consumer{ Name: consumerName(i), })) if err != nil { return err } } return nil } func connect() ydb.Connection { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() connectionString := os.Getenv("YDB_CONNECTION_STRING") if *ydbConnectionString != "" { connectionString = *ydbConnectionString } if connectionString == "" { connectionString = defaultConnectionString } token := os.Getenv("YDB_TOKEN") if *ydbToken != "" { token = *ydbToken } var ydbOptions []ydb.Option if token != "" { ydbOptions = append(ydbOptions, ydb.WithAccessTokenCredentials(token)) } if *ydbToken != "" { ydbOptions = append(ydbOptions, ydb.WithAccessTokenCredentials(*ydbToken)) } db, err := ydb.Open(ctx, connectionString, ydbOptions...) if err != nil { log.Fatalf("failed to create to ydb: %+v", err) } log.Printf("connected to database") return db } func consumerName(index int) string { return fmt.Sprintf("consumer-%v", index) }
Related articles
ydb-go-sdk topicreader_trace
package topicreaderexamples import ( "context" "fmt" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) // CommitNotify is example for receive commit
ydb-go-sdk service
package main import ( "bytes" "context" "embed" "encoding/hex" "fmt" "hash/fnv" "io" "net/http" "os" "path" "regexp" "strings" "sync" "text/template" "time" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "gi
ydb-go-sdk balancer
package main import ( "net/http" "sync/atomic" ) type balancer struct { handlers []http.Handler counter int32 } func newBalancer(handlers ...http.Handler) *balancer { return &balancer{ handlers: handlers, } } func (b *balancer) ServeHTTP(wri
ydb-go-sdk topicreader_show
package topicreaderexamples import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) // PartitionStopHandled is example of sdk
ydb-go-sdk webserver_cdc
package main import ( "context" "log" "time" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" ) func (s *server) cdcLoop() { ctx := context.Background() consumer := consumerName
ydb-go-sdk orders
package main import ( "bytes" "context" "log" "text/template" "time" ydb "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-g
ydb-go-sdk cdc reader
package main import ( "context" "fmt" "log" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" ) func cdcRead(ctx context.Context, db ydb.Co
ydb-go-sdk stubs
package topicreaderexamples import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func getEndOffset(b *topicreader.Batch) int64 { //nolint:unused panic("example stub") } func externalSystemCommit(ctx context.Context, topic
ydb-go-sdk cache
package main import ( "sync" "time" ) type Cache struct { timeout time.Duration m sync.Mutex setCounter int values map[string]CacheItem } func NewCache(timeout time.Duration) *Cache { return &Cache{ timeout: timeout, values:
ydb-go-sdk cities
package main import ( "context" "fmt" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/type