Skip to content

H2 test case for corrupted file #3944

@sdmoralesma

Description

@sdmoralesma

I've attached a test case that creates a corrupted database file. I want some guidance to avoid such a problem. In summary, the test case is designed to load the database with a batch-insert every millisecond using an executor service that holds 50 threads (each one using a connection from the pool) that try to write to the database and interrupt the processing with a Timer.

The issue is not easy to reproduce and happens rarely, so it means that the test case needs to run multiple times.

Uses JDK11 and H2 2.2.224

To execute you can use:

# Single execution
java -cp h2-2.2.224.jar H2DatabaseCorruption.java

# Use 'hyperfine' to run the case multiple times automatically:
hyperfine --ignore-failure \
  --prepare 'sleep 10' \
  --runs 10 \
  --show-output \
  'java -cp h2-2.2.224.jar H2DatabaseCorruption.java'
package com.company.testing;

import org.h2.jdbcx.JdbcConnectionPool;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.logging.Logger;

public class H2DatabaseCorruption {

    private static final Logger log = Logger.getLogger(H2DatabaseCorruption.class.getName());
    private static final String JDBC_URL = "jdbc:h2:file:~/temp/h2/test;" +
            "AUTO_SERVER=TRUE;" +
            "RETENTION_TIME=0;" +
            "LOCK_TIMEOUT=10000;" +
            "ANALYZE_AUTO=0;" +
            "MAX_MEMORY_ROWS=1000000;" +
            "CACHE_SIZE=1048576;" // 1024 * 1024 = 1 GB
            ;
    private static final Path DATABASE_PATH = Paths.get(System.getProperty("user.home") + "/temp/h2/");
    private static final String INSERT_SQL = "INSERT INTO MY_TABLE SELECT ? FROM dual " +
            "WHERE NOT EXISTS " +
            "(SELECT 1 FROM MY_TABLE dc WHERE dc.identifier=?)";
    private static final String CREATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS %s ( \n" +
            " IDENTIFIER  VARCHAR NOT NULL, \n" +
            "   CONSTRAINT %s \n" +
            "     PRIMARY KEY (IDENTIFIER))";
    private static final AtomicInteger OK_COUNT = new AtomicInteger(0);
    private static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);
    private static final Queue<Throwable> EXCEPTIONS = new ConcurrentLinkedQueue<>();
    private static final Random RANDOM = new Random();
    private static final int NUM_WORKERS = 50;
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(NUM_WORKERS);
    private static final JdbcConnectionPool CONNECTION_POOL = createConnectionPool(NUM_WORKERS);
    private static final Duration TEST_DURATION = Duration.ofSeconds(30);

    public static void main(String[] args) throws Exception {
        Set<String> arguments = new HashSet<>();
        arguments.add("addHook");
        arguments.add("check-health");
        arguments.add("test");

        if (arguments.contains("addHook")) {
            Runtime.getRuntime().addShutdownHook(shutdownHookThread());
        }

        if (arguments.contains("check-health")) {
            log.info("Verifying database health by creating a new table");
            String tableName = randomAlphabetic(10);
            createDatabaseTable(tableName);
            countRowsInTable(tableName);
        }

        if (arguments.contains("test")) {
            boolean forceExit = true;

            log.info("Running test, forceExit=" + forceExit);
            setupDatabaseFiles();
            createDatabaseTable("MY_TABLE");
            configureTimerTask(forceExit);
            process();
        }
    }

    private static void setupDatabaseFiles() throws IOException {
        log.info("Setup started...");
        Files.createDirectories(DATABASE_PATH);
    }

    private static void process() throws InterruptedException {
        log.info("Start processing for " + TEST_DURATION.toSeconds() + " seconds...");
        while (IS_RUNNING.get()) {
            CompletableFuture
                    .supplyAsync(batchInsert(), EXECUTOR_SERVICE)
                    .thenAccept(i -> OK_COUNT.incrementAndGet())
                    .exceptionally(throwable -> {
                        EXCEPTIONS.add(throwable);
                        return null;
                    });

            TimeUnit.MILLISECONDS.sleep(1);
        }
    }

    private static void createDatabaseTable(String tableName) throws SQLException {
        log.info("Create table in database: " + tableName);
        try (Connection connection = CONNECTION_POOL.getConnection();
             Statement statement = connection.createStatement()) {
            statement.execute(String.format(CREATE_TABLE_SQL, tableName, tableName + "_PK"));
        }
    }

    private static void configureTimerTask(boolean forceExit) {
        log.info("Configuring timer task...");
        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                log.info("Interrupt processing...");
                IS_RUNNING.set(false);
                if (forceExit) {
                    System.exit(-1);// Forced stop
                }
            }
        };

        Timer timer = new Timer();
        timer.schedule(task, TEST_DURATION.toMillis());
    }

    private static void countRowsInTable(String tableName) {
        log.info("Querying row count...");
        JdbcConnectionPool poolWithOneConnection = createConnectionPool(1);
        try (Connection connection = poolWithOneConnection.getConnection();
             Statement statement = connection.createStatement()) {
            ResultSet rs = statement.executeQuery(String.format("SELECT COUNT(*) FROM %s", tableName));
            rs.next();
            int count = rs.getInt(1);
            if (count >= 0) {
                log.info("Database is OK. Rows=" + count + " in Table=" + tableName);
            }
        } catch (SQLException e) {
            throw new RuntimeException("Cannot query count", e);
        }
    }

    private static Supplier<Integer> batchInsert() {
        return () -> {
            try (Connection connection = CONNECTION_POOL.getConnection();
                 PreparedStatement prepStatement = connection.prepareStatement(INSERT_SQL)) {
                for (int i = 0; i < 1000; i++) {
                    String random = randomString(20);
                    try {
                        prepStatement.setString(1, random);
                        prepStatement.setString(2, random);
                        prepStatement.addBatch();
                    } catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                }
                return prepStatement.executeBatch().length;
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        };
    }

    private static String randomString(int count) {
        byte[] array = new byte[count];
        RANDOM.nextBytes(array);
        return new String(array, StandardCharsets.UTF_8);
    }

    private static JdbcConnectionPool createConnectionPool(int maxConnections) {
        JdbcConnectionPool connectionPool = JdbcConnectionPool.create(JDBC_URL, null, null);
        connectionPool.setMaxConnections(maxConnections);
        return connectionPool;
    }

    private static String randomAlphabetic(int targetStringLength) {
        int leftLimit = 97; // letter 'a'
        int rightLimit = 122; // letter 'z'
        return RANDOM.ints(leftLimit, rightLimit + 1)
                .limit(targetStringLength)
                .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
                .toString();
    }

    private static Thread shutdownHookThread() {
        return new Thread(() -> {
            log.info("Shutting down!. Running shutdown hook...");
            log.info(String.format("Processing finished. OK=%d , Exceptions=%d", OK_COUNT.get(), EXCEPTIONS.size()));
        });
    }

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions