package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"path"
	"time"

	"github.com/ydb-platform/ydb-go-sdk/v3"
	"github.com/ydb-platform/ydb-go-sdk/v3/table"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
)

func createTableAndCDC(ctx context.Context, db ydb.Connection, consumersCount int) {
	err := createTables(ctx, db)
	if err != nil {
		log.Fatalf("failed to create tables: %+v", err)
	}

	err = createCosumers(ctx, db, consumersCount)
	if err != nil {
		log.Fatalf("failed to create consumers: %+v", err)
	}
}

func createTables(ctx context.Context, db ydb.Connection) error {
	err := db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
		err := s.DropTable(ctx, path.Join(db.Name(), "bus"))
		if ydb.IsOperationErrorSchemeError(err) {
			err = nil
		}
		return err
	})
	if err != nil {
		return fmt.Errorf("failed to drop table: %w", err)
	}

	_, err = db.Scripting().Execute(ctx, `
CREATE TABLE bus (id Text, freeSeats Int64, PRIMARY KEY(id));

ALTER TABLE 
	bus
ADD CHANGEFEED
	updates
WITH (
	FORMAT = "JSON",
	MODE = "UPDATES"
)
`, nil)
	if err != nil {
		return fmt.Errorf("failed to create table: %w", err)
	}
	_, err = db.Scripting().Execute(ctx, `
UPSERT INTO bus (id, freeSeats) VALUES ("bus1", 40), ("bus2", 60);
`, nil)
	if err != nil {
		return fmt.Errorf("failed insert rows: %w", err)
	}
	return nil
}

func createCosumers(ctx context.Context, db ydb.Connection, consumersCount int) error {
	for i := 0; i < consumersCount; i++ {
		err := db.Topic().Alter(ctx, "bus/updates", topicoptions.AlterWithAddConsumers(topictypes.Consumer{
			Name: consumerName(i),
		}))
		if err != nil {
			return err
		}
	}

	return nil
}

func connect() ydb.Connection {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()

	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if *ydbConnectionString != "" {
		connectionString = *ydbConnectionString
	}
	if connectionString == "" {
		connectionString = defaultConnectionString
	}

	token := os.Getenv("YDB_TOKEN")
	if *ydbToken != "" {
		token = *ydbToken
	}
	var ydbOptions []ydb.Option
	if token != "" {
		ydbOptions = append(ydbOptions, ydb.WithAccessTokenCredentials(token))
	}

	if *ydbToken != "" {
		ydbOptions = append(ydbOptions, ydb.WithAccessTokenCredentials(*ydbToken))
	}
	db, err := ydb.Open(ctx, connectionString, ydbOptions...)
	if err != nil {
		log.Fatalf("failed to create to ydb: %+v", err)
	}
	log.Printf("connected to database")
	return db
}

func consumerName(index int) string {
	return fmt.Sprintf("consumer-%v", index)
}

Related articles

ydb-go-sdk topicreader_trace

package topicreaderexamples import ( "context" "fmt" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) // CommitNotify is example for receive commit

ydb-go-sdk service

package main import ( "bytes" "context" "embed" "encoding/hex" "fmt" "hash/fnv" "io" "net/http" "os" "path" "regexp" "strings" "sync" "text/template" "time" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "gi

ydb-go-sdk balancer

package main import ( "net/http" "sync/atomic" ) type balancer struct { handlers []http.Handler counter int32 } func newBalancer(handlers ...http.Handler) *balancer { return &balancer{ handlers: handlers, } } func (b *balancer) ServeHTTP(wri

ydb-go-sdk topicreader_show

package topicreaderexamples import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) // PartitionStopHandled is example of sdk

ydb-go-sdk webserver_cdc

package main import ( "context" "log" "time" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" ) func (s *server) cdcLoop() { ctx := context.Background() consumer := consumerName

ydb-go-sdk orders

package main import ( "bytes" "context" "log" "text/template" "time" ydb "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-g

ydb-go-sdk cdc reader

package main import ( "context" "fmt" "log" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" ) func cdcRead(ctx context.Context, db ydb.Co

ydb-go-sdk stubs

package topicreaderexamples import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func getEndOffset(b *topicreader.Batch) int64 { //nolint:unused panic("example stub") } func externalSystemCommit(ctx context.Context, topic

ydb-go-sdk cache

package main import ( "sync" "time" ) type Cache struct { timeout time.Duration m sync.Mutex setCounter int values map[string]CacheItem } func NewCache(timeout time.Duration) *Cache { return &Cache{ timeout: timeout, values:

ydb-go-sdk cities

package main import ( "context" "fmt" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/type