Skip to content

oliverselinger/failsafe-executor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

281 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

failsafe-executor

Build and Test codecov

Persistent executor service for Java that was inspired by the need for a reliable and multi-node compatible execution of processes.

Features

  • Failsafe tasks. Requires only one database-table for persistence.
  • Reliable execution. Guarantees at least once execution of a submitted task.
  • Multi-node compatible. Coordination between nodes with optimistic locking.
  • Retry-able. Exceptions are captured. Failed tasks can be retried.
  • Routing tasks amongst different nodes. An executor only picks up registered tasks.
  • Queues ensure tasks are executed sequentially when applied.
  • Lightweight. Small code base.
  • No dependencies.
  • No reflection.
  • Single file.

Getting started

  1. Add the dependency
<dependency>
    <groupId>com.github.oliverselinger</groupId>
    <artifactId>failsafe-executor</artifactId>
    <version>3.0.0</version>
</dependency>
  1. Create the table in your database. See oracle or postgres or mysql or mariadb. We recommend creating an index on created_date to make ordering fast.

  2. Instantiate and start the FailsafeExecutor, which then will start executing any submitted tasks.

FailsafeExecutor failsafeExecutor = new FailsafeExecutor(dataSource);
failsafeExecutor.start();

Execute Tasks

Execution of a task requires two steps:

Register the task

First, register the task on startup of your application with a consumer that accepts a single input argument for state transfer. Give the task a unique name.

failsafeExecutor.registerTask("TaskName", param -> {
    ... // your business logic
});

An executor only picks up tasks which have been registered with a function. If you desire task execution on a different node, you only need to register the remote task's name via theregisterRemoteTask method. This allows for simple routing, based on task names amongst different nodes.

Make sure your business logic is idempotent, since it gets executed at least once per task execution.

As parameter, we recommend to use only a single ID that your business logic is able to interpret properly. Avoid using complex objects (through serialization) since it may lead to complex migration scenarios in case your object and your business logic changes.

Execute the task

Pass your task's name and your parameter to FailsafeExecutor's execute method. The task gets persisted and is then executed at some time in the future.

String taskId = failsafeExecutor.execute("TaskName", parameter);

Optionally you can provide a taskId that is used as unique constraint in the database. On conflict (task with this id already exists in database) insertion is simply skipped. In this case no exception will be thrown and method returns gracefully.

String taskId = failsafeExecutor.execute("UniqueTaskId", "TaskName", parameter);

Defer execution

You can plan a one-time execution in future with method defer.

String taskId = failsafeExecutor.defer("TaskName", parameter, plannedExecutionTime);

Schedule Tasks

You can schedule a task's execution. Pass your task's name, your Schedule and your runnable to FailsafeExecutor's schedule method.

String taskId = failsafeExecutor.schedule("TaskName", schedule, () -> {
... // your business logic
});

For a recurring execution let your Schedule always return the next planned time for execution. For example see DailySchedule.

As before, make sure your business logic is idempotent, since it gets executed at least once per scheduled execution.

Queue Tasks

You can submit tasks to a queue for serial execution. Tasks are executed in the order they are queued, ensuring one task completes before the next starts. You have to register the queue once.

// Enqueue tasks
failsafeExecutor.registerQueue("QueueA");
failsafeExecutor.execute(..., "QueueA", ...);

Task failures

Any exceptions occurring during the execution of a task are captured. The exception's message and stacktrace are saved to the task. The task itself is marked as failed. Thus the FailsafeExecutor does not execute the task anymore. To find failed tasks use the following:

List<Task> failedTasks = failsafeExecutor.failedTasks();

Two options are offered to handle a failed task. Either retry it:

failsafeExecutor.retry(failedTask);

Or cancel it:

failsafeExecutor.cancel(failedTask);

Cancel deletes the task from database.

Record other failures

We recognized, that it can be useful to record other incidents/exceptions in FailsafeExecutor's context. Exceptions that are thrown not within a failsafe task but in regular synchronous program execution. So other exceptions can be recorded and be made visible through the failsafeExecutor.failedTasks() method. Furthermore, you can utilize the FailsafeExecutor's retry mechanism.

The method

failsafeExecutor.recordFailure(...);

persists a task in the database and marks it as failed, so this task does not get executed. But it provides the possibility to retry or cancel the task.

Monitoring the execution

The result of an execution can be observed by subscribing a listener at the FailsafeExecutor:

failsafeExecutor.subscribe(executionListener);

The persisting method gets called before a task gets persisted in database. At the end of the execution, depending on the outcome either succeeded or failed is called. A retry of a failed task causes a call of method retrying before failure state gets deleted in database.

Handling uncaught exceptions

You can register an error handler FailsafeExecutor.registerErrorHandler(...) to get notified in case some uncaught exception happens within the process of handling tasks.

Health check

The FailsafeExecutor provides a health check.

failsafeExecutor.isRunning();

Shutdown of the executor

It is important to shutdown the FailsafeExecutor properly by calling the shutdown method. E.g. create a shutdownHook

Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
        failsafeExecutor.shutdown();
    }
});

Configuration

The FailsafeExecutor can be created using the all-args constructor. The following options are configurable:

Option Type Default Description
systemClock SystemClock LocalDateTime.now() Clock to retrieve the current time.
workerThreadCount int 5 Number of threads executing tasks.
queueSize int 6 * <worker-thread-count> Maximum number of tasks to lock by the FailsafeExecutor at the same time.
initialDelay Duration 10 sec The time to delay first execution to fetch tasks of the FailsafeExecutor.
pollingInterval Duration 5 sec How often the FailsafeExecutor checks for tasks to execute.
lockTimeout Duration 5 min If a task is locked for execution, but is not deleted nor updated due to e.g. a system crash, it will again be considered for execution after this timeout. Minimum recommended lockTimeout is 1 min.

Note: The lockTime is periodically updated by a scheduled heartbeat. It runs every lockTimeout / 4 duration.

Logging Configuration

The FailsafeExecutor uses Java's built-in logging (java.util.logging). By default, no configuration is defined. We suggest to redirecting logs via a JavaLoggingBridgeHandler.

Use an implementation of java.util.logging.Handler to route Java Util Logging (java.util.logging) messages into your preferred logging framework. This is how you install the handler:

LogManager.getLogManager().getLogger("").addHandler(new JavaLoggingBridgeHandler());

Testability

There is a static method to support you with your integration tests. Just wrap your business logic with waitForTasks. This registers a listener before executing your business logic. With it all created tasks during the execution of your business logic are registered. After execution finishes, a barrier blocks the calling thread until all tasks got executed.

waitForTasks(failsafeExecutor, ...);

After all tasks finished execution, failed tasks are collected and are passed back as return result. E.g. with that you can let your test case fail immediately.

The method ignores tasks where the planned execution date is in future, which means it does not wait for such tasks.

FAQ

How is an at-least-once execution guaranteed?

First, each task gets persisted into database before it's considered for execution. After that, the FailsafeExecutor tries to reserve the next task based on creation date by setting a lock timestamp in the database. Concurrent access by several FailsafeExecutors is controlled by applying optimistic locking. Only if the lock operation succeeds, the task is submitted for execution to the FailsafeExecutor's worker pool. In case, the FailsafeExecutor is not able to execute all his locked tasks, e.g. due to a system crash, a predefined lock timeout guarantees that a task will again be considered for execution by other FailsafeExecutors which may be running on different nodes.

Are tasks executed in the insertion order?

No. Basically, the FailsafeExecutor orders tasks by creation date for locking. However then locked tasks are executed by a pool of threads. So execution order can not be guaranteed. Furthermore more randomness is applied if the FailsafeExecutor is running on multiple nodes. If you FIFO execution, register and use a queue.

Is there any retry mechanism?

No. For that, you can implement it yourself inside of a tasks runnable or consumer function or utilize a library, e.g. resilience4j

Can method execute and schedule take part in a Spring-managed transaction?

Yes. Wrap your dataSource object with a TransactionAwareDataSourceProxy before passing it to FailsafeExecutor's constructor. The proxy adds awareness of Spring-managed transactions.

@Bean(destroyMethod = "stop")
public FailsafeExecutor failsafeExecutor(DataSource dataSource) {
    FailsafeExecutor failsafeExecutor = new FailsafeExecutor(...., new TransactionAwareDataSourceProxy(dataSource), ...);
    failsafeExecutor.start();
    return failsafeExecutor;
}

Is it running in production?

Yes.

What Java versions are supported?

Requires Java 17+.

How to test against a specific database?

Testcontainers are available for running tests against different database types. You can specify the database by setting the TEST_DB environment variable. By default, H2 is used if no database is specified.

Supported database types (from TestcontainersDbExtension.DatabaseType):

  • H2 (default)
  • POSTGRES
  • MYSQL
  • MARIADB
  • ORACLE

Example usage:

TEST_DB=MARIADB mvn test

About

Lightweight, reliable and multi-node compatible executor service for Java

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages