diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 206895c..22a8770 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -23,6 +23,7 @@ basic-example failsafe-example spring-data-jpa-v5 + ydb-token-app diff --git a/jdbc/spring-jooq/pom.xml b/jdbc/spring-jooq/pom.xml index 0116693..aa7eb80 100644 --- a/jdbc/spring-jooq/pom.xml +++ b/jdbc/spring-jooq/pom.xml @@ -113,6 +113,7 @@ org.springframework.boot spring-boot-maven-plugin + 3.3.3 diff --git a/jdbc/ydb-token-app/pom.xml b/jdbc/ydb-token-app/pom.xml new file mode 100644 index 0000000..931441f --- /dev/null +++ b/jdbc/ydb-token-app/pom.xml @@ -0,0 +1,62 @@ + + 4.0.0 + + + tech.ydb.jdbc.examples + ydb-jdbc-examples + 1.1.0-SNAPSHOT + + + tech.ydb.apps + ydb-token-app + + YDB Token application + + + UTF-8 + 17 + + 2.7.18 + 0.9.3 + + tech.ydb.apps.Application + + + + + org.springframework.boot + spring-boot-starter-data-jpa + ${spring.boot.version} + + + org.springframework.retry + spring-retry + 2.0.7 + + + jakarta.xml.bind + jakarta.xml.bind-api + 2.3.2 + + + + tech.ydb.jdbc + ydb-jdbc-driver + + + tech.ydb.dialects + hibernate-ydb-dialect-v5 + ${ydb.hibernate.dialect.version} + + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring.boot.version} + + + + \ No newline at end of file diff --git a/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/Application.java b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/Application.java new file mode 100644 index 0000000..f860826 --- /dev/null +++ b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/Application.java @@ -0,0 +1,217 @@ +package tech.ydb.apps; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import javax.annotation.PreDestroy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.annotation.EnableRetry; + +import tech.ydb.apps.service.SchemeService; +import tech.ydb.apps.service.TokenService; +import tech.ydb.jdbc.YdbTracer; + +/** + * + * @author Aleksandr Gorshenin + */ +@EnableRetry +@SpringBootApplication +public class Application implements CommandLineRunner { + private static final Logger logger = LoggerFactory.getLogger(Application.class); + + private static final int THREADS_COUNT = 32; + private static final int RECORDS_COUNT = 1_000_000; + private static final int LOAD_BATCH_SIZE = 1000; + + private static final int WORKLOAD_DURATION_SECS = 60; // 60 seconds + + public static void main(String[] args) { + SpringApplication.run(Application.class, args).close(); + } + + private final Ticker ticker = new Ticker(logger); + + private final SchemeService schemeService; + private final TokenService tokenService; + + private final ExecutorService executor; + private final AtomicInteger threadCounter = new AtomicInteger(0); + private final AtomicInteger executionsCount = new AtomicInteger(0); + private final AtomicInteger retriesCount = new AtomicInteger(0); + + public Application(SchemeService schemeService, TokenService tokenService) { + this.schemeService = schemeService; + this.tokenService = tokenService; + + this.executor = Executors.newFixedThreadPool(THREADS_COUNT, this::threadFactory); + } + + @PreDestroy + public void close() throws Exception { + logger.info("CLI app is waiting for finishing"); + + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.MINUTES); + + ticker.printTotal(); + ticker.close(); + + logger.info("Executed {} transactions with {} retries", executionsCount.get(), retriesCount.get()); + logger.info("CLI app has finished"); + } + + @Bean + public RetryListener retryListener() { + return new RetryListener() { + @Override + public boolean open(RetryContext ctx, RetryCallback callback) { + executionsCount.incrementAndGet(); + return true; + } + + @Override + public void onError(RetryContext ctx, RetryCallback callback, Throwable th) { + logger.debug("Retry operation with error {} ", printSqlException(th)); + retriesCount.incrementAndGet(); + } + }; + } + + private String printSqlException(Throwable th) { + Throwable ex = th; + while (ex != null) { + if (ex instanceof SQLException) { + return ex.getMessage(); + } + ex = ex.getCause(); + } + return th.getMessage(); + } + + @Override + public void run(String... args) { + logger.info("CLI app has started"); + + for (String arg : args) { + logger.info("execute {} step", arg); + + if ("clean".equalsIgnoreCase(arg)) { + schemeService.executeClean(); + } + + if ("init".equalsIgnoreCase(arg)) { + schemeService.executeInit(); + } + + if ("load".equalsIgnoreCase(arg)) { + ticker.runWithMonitor(this::loadData); + } + + if ("run".equalsIgnoreCase(arg)) { + ticker.runWithMonitor(this::runWorkloads); + } + + if ("test".equalsIgnoreCase(arg)) { + ticker.runWithMonitor(this::test); + } + } + } + + private Thread threadFactory(Runnable runnable) { + return new Thread(runnable, "app-thread-" + threadCounter.incrementAndGet()); + } + + private void loadData() { + List> futures = new ArrayList<>(); + int id = 0; + while (id < RECORDS_COUNT) { + final int first = id; + id += LOAD_BATCH_SIZE; + final int last = id < RECORDS_COUNT ? id : RECORDS_COUNT; + + futures.add(CompletableFuture.runAsync(() -> { + try (Ticker.Measure measure = ticker.getLoad().newCall()) { + tokenService.insertBatch(first, last); + logger.info("inserted tokens [{}, {})", first, last); + measure.inc(); + } + }, executor)); + } + + futures.forEach(CompletableFuture::join); + } + + private void test() { + YdbTracer.current().markToPrint("test"); + + final Random rnd = new Random(); + List randomIds = IntStream.range(0, 100) + .mapToObj(idx -> rnd.nextInt(RECORDS_COUNT)) + .collect(Collectors.toList()); + + tokenService.updateBatch(randomIds); + } + + private void runWorkloads() { + long finishAt = System.currentTimeMillis() + WORKLOAD_DURATION_SECS * 1000; + List> futures = new ArrayList<>(); + for (int i = 0; i < THREADS_COUNT; i++) { + futures.add(CompletableFuture.runAsync(() -> this.workload(finishAt), executor)); + } + + futures.forEach(CompletableFuture::join); + } + + private void workload(long finishAt) { + final Random rnd = new Random(); + while (System.currentTimeMillis() < finishAt) { + int mode = rnd.nextInt(10); + + try { + if (mode < 2) { + try (Ticker.Measure measure = ticker.getBatchUpdate().newCall()) { + List randomIds = IntStream.range(0, 100) + .mapToObj(idx -> rnd.nextInt(RECORDS_COUNT)) + .collect(Collectors.toList()); + tokenService.updateBatch(randomIds); + measure.inc(); + } + + } else if (mode < 6) { + int id = rnd.nextInt(RECORDS_COUNT); + try (Ticker.Measure measure = ticker.getFetch().newCall()) { + tokenService.fetchToken(id); + measure.inc(); + } + } else { + int id = rnd.nextInt(RECORDS_COUNT); + try (Ticker.Measure measure = ticker.getUpdate().newCall()) { + tokenService.updateToken(id); + measure.inc(); + } + } + } catch (RuntimeException ex) { + logger.debug("got exception {}", ex.getMessage()); + } + } + } +} diff --git a/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/Ticker.java b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/Ticker.java new file mode 100644 index 0000000..1948c7e --- /dev/null +++ b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/Ticker.java @@ -0,0 +1,141 @@ +package tech.ydb.apps; + +import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import org.slf4j.Logger; + +/** + * + * @author Aleksandr Gorshenin + */ +public class Ticker { + public class Measure implements AutoCloseable { + private final Method method; + private final long startedAt; + private long count = 0; + + public Measure(Method method) { + this.method = method; + this.startedAt = System.currentTimeMillis(); + } + + public void inc() { + count += 1; + } + + @Override + public void close() { + if (count == 0) { + return; + } + + long ms = System.currentTimeMillis() - startedAt; + + method.count.add(count); + method.totalCount.add(count); + + method.timeMs.add(ms); + method.totalTimeMs.add(ms); + } + } + + public class Method { + private final String name; + + private final LongAdder totalCount = new LongAdder(); + private final LongAdder totalTimeMs = new LongAdder(); + + private final LongAdder count = new LongAdder(); + private final LongAdder timeMs = new LongAdder(); + + private volatile long lastPrinted = 0; + + public Method(String name) { + this.name = name; + } + + public Measure newCall() { + return new Measure(this); + } + + private void reset() { + count.reset(); + timeMs.reset(); + lastPrinted = System.currentTimeMillis(); + } + + private void print(Logger logger) { + if (count.longValue() > 0 && lastPrinted != 0) { + long ms = System.currentTimeMillis() - lastPrinted; + double rps = 1000 * count.longValue() / ms; + logger.info("{}\twas executed {} times\t with RPS {} ops", name, count.longValue(), rps); + } + + reset(); + } + + private void printTotal(Logger logger) { + if (totalCount.longValue() > 0) { + double average = 1.0d * totalTimeMs.longValue() / totalCount.longValue(); + logger.info("{}\twas executed {} times,\twith average time {} ms/op", name, totalCount.longValue(), average); + } + } + } + + private final Logger logger; + private final Method load = new Method("LOAD "); + private final Method fetch = new Method("FETCH "); + private final Method update = new Method("UPDATE"); + private final Method batchUpdate = new Method("BULK_UP"); + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( + r -> new Thread(r, "ticker") + ); + + public Ticker(Logger logger) { + this.logger = logger; + } + + public Method getLoad() { + return this.load; + } + + public Method getFetch() { + return this.fetch; + } + + public Method getUpdate() { + return this.update; + } + + public Method getBatchUpdate() { + return this.batchUpdate; + } + + public void runWithMonitor(Runnable runnable) { + Arrays.asList(load, fetch, update, batchUpdate).forEach(Method::reset); + final ScheduledFuture future = scheduler.scheduleAtFixedRate(this::print, 1, 10, TimeUnit.SECONDS); + runnable.run(); + future.cancel(false); + print(); + } + + public void close() throws InterruptedException { + scheduler.shutdownNow(); + scheduler.awaitTermination(20, TimeUnit.SECONDS); + } + + private void print() { + Arrays.asList(load, fetch, update, batchUpdate).forEach(m -> m.print(logger)); + } + + public void printTotal() { + logger.info("=========== TOTAL =============="); + Arrays.asList(load, fetch, update, batchUpdate).forEach(m -> m.printTotal(logger)); + } +} diff --git a/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/annotation/YdbRetryable.java b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/annotation/YdbRetryable.java new file mode 100644 index 0000000..4225352 --- /dev/null +++ b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/annotation/YdbRetryable.java @@ -0,0 +1,36 @@ +package tech.ydb.apps.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.sql.SQLRecoverableException; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; + +/** + * @author Aleksandr Gorshenin + * @see https://github.com/spring-projects/spring-retry/blob/main/README.md#further-customizations for details + */ +@Target({ ElementType.METHOD, ElementType.TYPE }) +@Retention(RetentionPolicy.RUNTIME) +@Retryable( + retryFor = SQLRecoverableException.class, + maxAttempts = 5, + backoff = @Backoff(delay = 100, multiplier = 2.0, maxDelay = 5000, random = true) +) +public @interface YdbRetryable { + @AliasFor(annotation = Retryable.class, attribute = "recover") + String recover() default ""; + + @AliasFor(annotation = Retryable.class, attribute = "label") + String label() default ""; + + @AliasFor(annotation = Retryable.class, attribute = "stateful") + boolean stateful() default false; + + @AliasFor(annotation = Retryable.class, attribute = "listeners") + String[] listeners() default {}; +} diff --git a/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/entity/Token.java b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/entity/Token.java new file mode 100644 index 0000000..510d9a6 --- /dev/null +++ b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/entity/Token.java @@ -0,0 +1,81 @@ +package tech.ydb.apps.entity; + +import java.io.Serializable; +import java.util.UUID; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.persistence.Transient; + +import org.hibernate.annotations.DynamicUpdate; +import org.hibernate.annotations.Type; +import org.springframework.data.domain.Persistable; + +/** + * + * @author Aleksandr Gorshenin + */ +@Entity +@DynamicUpdate +@Table(name = "app_token") +public class Token implements Serializable, Persistable { + private static final long serialVersionUID = -1533981732225785339L; + + @Id + @Type(type="uuid-char") + private UUID id; + + @Column + private String username; + + @Column + private Integer version; + + @Transient + private final boolean isNew; + + @Override + public UUID getId() { + return this.id; + } + + public String getUserName() { + return this.username; + } + + public Integer getVersion() { + return this.version; + } + + @Override + public boolean isNew() { + return isNew; + } + + public Token() { + this.isNew = false; + } + + public Token(String username) { + this.id = getKey(username); + this.username = username; + this.version = 1; + this.isNew = true; + } + + public void incVersion() { + this.version ++; + } + + @Override + public String toString() { + return "Token{id=" + id.toString() + ", username='" + username + "', version=" + version + "}"; + } + + public static UUID getKey(String username) { + // UUID based on MD5 hash + return UUID.nameUUIDFromBytes((username + "_v").getBytes()); + } +} diff --git a/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/repo/TokenRepository.java b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/repo/TokenRepository.java new file mode 100644 index 0000000..e6f3a13 --- /dev/null +++ b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/repo/TokenRepository.java @@ -0,0 +1,22 @@ +package tech.ydb.apps.repo; + +import java.util.UUID; + +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.CrudRepository; + +import tech.ydb.apps.entity.Token; + +/** + * + * @author Aleksandr Gorshenin + */ +public interface TokenRepository extends CrudRepository { + + @Query(value = "SCAN SELECT id FROM app_token", nativeQuery = true) + Iterable scanFindAll(); + + void saveAllAndFlush(Iterable list); + + void deleteAllByIdInBatch(Iterable ids); +} diff --git a/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/service/SchemeService.java b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/service/SchemeService.java new file mode 100644 index 0000000..d1c8d39 --- /dev/null +++ b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/service/SchemeService.java @@ -0,0 +1,68 @@ +package tech.ydb.apps.service; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +import javax.persistence.EntityManager; + +import com.google.common.io.CharStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import tech.ydb.apps.annotation.YdbRetryable; + +/** + * + * @author Aleksandr Gorshenin + */ +@Service +public class SchemeService { + private static final Logger logger = LoggerFactory.getLogger(SchemeService.class); + + private final EntityManager em; + private final ResourceLoader rl; + + public SchemeService(EntityManager em, ResourceLoader rl) { + this.em = em; + this.rl = rl; + } + + @Transactional + @YdbRetryable + public void executeClean() { + String script = readResourceFile("sql/drop.sql"); + if (script == null) { + logger.warn("cannot find drop sql in classpath"); + return; + } + + em.createNativeQuery(script).executeUpdate(); + } + + @Transactional + @YdbRetryable + public void executeInit() { + String script = readResourceFile("sql/init.sql"); + if (script == null) { + logger.warn("cannot find init sql in classpath"); + return; + } + + em.createNativeQuery(script).executeUpdate(); + } + + private String readResourceFile(String location) { + Resource resource = rl.getResource("classpath:" + location); + try (InputStream is = resource.getInputStream()) { + return CharStreams.toString(new InputStreamReader(is, StandardCharsets.UTF_8)); + } catch (IOException e) { + return null; + } + } +} diff --git a/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/service/TokenService.java b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/service/TokenService.java new file mode 100644 index 0000000..d2aff7d --- /dev/null +++ b/jdbc/ydb-token-app/src/main/java/tech/ydb/apps/service/TokenService.java @@ -0,0 +1,101 @@ +package tech.ydb.apps.service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import tech.ydb.apps.annotation.YdbRetryable; +import tech.ydb.apps.entity.Token; +import tech.ydb.apps.repo.TokenRepository; + +/** + * + * @author Aleksandr Gorshenin + */ +@Service +public class TokenService { + private final static Logger logger = LoggerFactory.getLogger(TokenService.class); + private final TokenRepository repository; + + public TokenService(TokenRepository repository) { + this.repository = repository; + } + + private UUID getKey(int id) { + return Token.getKey("user_" + id); + } + + @YdbRetryable + @Transactional + public void insertBatch(int firstID, int lastID) { + List batch = new ArrayList<>(); + for (int id = firstID; id < lastID; id++) { + batch.add(new Token("user_" + id)); + } + repository.saveAll(batch); + } + + @YdbRetryable + @Transactional + public Token fetchToken(int id) { + Optional token = repository.findById(getKey(id)); + + if (!token.isPresent()) { + logger.warn("token {} is not found", id); + return null; + } + + return token.get(); + } + + @YdbRetryable + @Transactional + public void updateToken(int id) { + Token token = fetchToken(id); + if (token != null) { + token.incVersion(); + repository.save(token); + logger.trace("updated token {} -> {}", id, token.getVersion()); + } + } + + @YdbRetryable + @Transactional + public void updateBatch(List ids) { + List uuids = ids.stream().map(this::getKey).collect(Collectors.toList()); + + Iterable batch = repository.findAllById(uuids); + for (Token token: batch) { + logger.trace("update token {}", token); + token.incVersion(); + } + + repository.saveAllAndFlush(batch); + } + + @YdbRetryable + @Transactional + public void removeBatch(List ids) { + List uuids = ids.stream().map(this::getKey).collect(Collectors.toList()); + repository.deleteAllByIdInBatch(uuids); + } + + @YdbRetryable + @Transactional + public void listManyRecords() { + long count = 0; + for (String id : repository.scanFindAll()) { + count ++; + if (count % 1000 == 0) { + logger.info("scan readed {} records", count); + } + } + } +} diff --git a/jdbc/ydb-token-app/src/main/resources/application.properties b/jdbc/ydb-token-app/src/main/resources/application.properties new file mode 100644 index 0000000..0a88d58 --- /dev/null +++ b/jdbc/ydb-token-app/src/main/resources/application.properties @@ -0,0 +1,19 @@ +spring.datasource.url=jdbc:ydb:grpc://localhost:2136/local +spring.datasource.driver-class-name=tech.ydb.jdbc.YdbDriver + +spring.datasource.hikari.maximum-pool-size=100 +spring.datasource.hikari.data-source-properties.useQueryService=true +spring.datasource.hikari.data-source-properties.enableTxTracer=true + +spring.jpa.properties.hibernate.jdbc.batch_size=1000 +spring.jpa.properties.hibernate.order_updates=true +spring.jpa.properties.hibernate.order_inserts=true +spring.jpa.properties.hibernate.dialect=tech.ydb.hibernate.dialect.YdbDialect +#spring.jpa.show-sql = true + +logging.level.org.hibernate.engine=OFF + +#logging.level.tech.ydb.apps=TRACE +#logging.level.tech.ydb.jdbc.YdbDriver=TRACE +#logging.level.org.hibernate.SQL=DEBUG +#logging.level.org.hibernate.type=TRACE \ No newline at end of file diff --git a/jdbc/ydb-token-app/src/main/resources/sql/drop.sql b/jdbc/ydb-token-app/src/main/resources/sql/drop.sql new file mode 100644 index 0000000..3f4fd46 --- /dev/null +++ b/jdbc/ydb-token-app/src/main/resources/sql/drop.sql @@ -0,0 +1 @@ +DROP TABLE app_token; diff --git a/jdbc/ydb-token-app/src/main/resources/sql/init.sql b/jdbc/ydb-token-app/src/main/resources/sql/init.sql new file mode 100644 index 0000000..5426b4f --- /dev/null +++ b/jdbc/ydb-token-app/src/main/resources/sql/init.sql @@ -0,0 +1,6 @@ +CREATE TABLE app_token ( + id Text NOT NULL, + username Text, + version Int32, + PRIMARY KEY (id) +);