Skip to content

Introduce StorageBackend abstraction layer #173

@em3s

Description

@em3s

Background

The current DefaultHBaseCluster tightly couples the engine to HBase. To support alternative storage backends (Memory, SlateDB, MySQL, etc.), we need a clean abstraction layer that separates:

  1. Backend lifecycle management (connections, configuration)
  2. Bucket-level operations (get, put, scan, etc.)

Related: #172

Architecture

DefaultStorageBackendFactory (singleton)
├── initialize(properties) → creates StorageBackend
└── INSTANCE: StorageBackend

StorageBackend (interface)
├── getBucket(namespace, name): Mono<StorageBuckets>
├── getBucket(uri): Mono<StorageBuckets>
└── close()

HBaseStorageBackend : StorageBackend
MemoryStorageBackend : StorageBackend

StorageBuckets
├── edge: StorageBucket
└── lock: StorageBucket

StorageBucket (interface)
├── get, put, delete, scan, increment, batch, exists
└── setIfNotExists, deleteIfEquals

HBaseStorageBucket : StorageBucket
MemoryStorageBucket : StorageBucket

Plan

Step 1: Create interfaces

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/StorageBackend.kt

package com.kakao.actionbase.v2.engine.storage

import reactor.core.publisher.Mono
import java.lang.AutoCloseable

interface StorageBackend : AutoCloseable {
    fun getBucket(namespace: String, name: String): Mono<StorageBuckets>
    fun getBucket(uri: String): Mono<StorageBuckets>
}

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/StorageBucket.kt

package com.kakao.actionbase.v2.engine.storage

import com.kakao.actionbase.core.storage.HBaseRecord
import com.kakao.actionbase.core.storage.MutationRequest
import reactor.core.publisher.Mono

interface StorageBucket {
    fun get(key: ByteArray): Mono<ByteArray?>
    fun get(keys: List<ByteArray>): Mono<List<HBaseRecord>>
    fun put(key: ByteArray, value: ByteArray): Mono<Void>
    fun delete(key: ByteArray): Mono<Void>
    fun scan(prefix: ByteArray, limit: Int, start: ByteArray?, stop: ByteArray?): Mono<List<HBaseRecord>>
    fun increment(key: ByteArray, delta: Long): Mono<Long>
    fun batch(requests: List<MutationRequest>): Mono<Void>
    fun exists(key: ByteArray): Mono<Boolean>
    fun setIfNotExists(key: ByteArray, value: ByteArray): Mono<Boolean>
    fun deleteIfEquals(key: ByteArray, expectedValue: ByteArray): Mono<Boolean>
}

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/StorageBuckets.kt

package com.kakao.actionbase.v2.engine.storage

data class StorageBuckets(
    val edge: StorageBucket,
    val lock: StorageBucket,
)

Step 2: Create MemoryStorageBucket

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/memory/MemoryStorageBucket.kt

package com.kakao.actionbase.v2.engine.storage.memory

import com.kakao.actionbase.core.storage.HBaseRecord
import com.kakao.actionbase.core.storage.MutationRequest
import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore
import com.kakao.actionbase.v2.engine.storage.StorageBucket
import reactor.core.publisher.Mono

class MemoryStorageBucket(
    private val store: ByteArrayStore,
) : StorageBucket {

    override fun get(key: ByteArray): Mono<ByteArray?> =
        Mono.fromCallable { store[key] }

    override fun get(keys: List<ByteArray>): Mono<List<HBaseRecord>> =
        Mono.fromCallable {
            keys.mapNotNull { k -> store[k]?.let { HBaseRecord(key = k, value = it) } }
        }

    override fun put(key: ByteArray, value: ByteArray): Mono<Void> =
        Mono.fromCallable { store[key] = value }.then()

    override fun delete(key: ByteArray): Mono<Void> =
        Mono.fromCallable { store.remove(key) }.then()

    override fun scan(prefix: ByteArray, limit: Int, start: ByteArray?, stop: ByteArray?): Mono<List<HBaseRecord>> =
        Mono.fromCallable { store.prefixScan(prefix).take(limit) }

    override fun increment(key: ByteArray, delta: Long): Mono<Long> =
        Mono.fromCallable { store.increment(key, delta) }

    override fun batch(requests: List<MutationRequest>): Mono<Void> =
        Mono.fromCallable {
            requests.forEach {
                when (it) {
                    is MutationRequest.Put -> store[it.key] = it.value
                    is MutationRequest.Delete -> store.remove(it.key)
                    is MutationRequest.Increment -> store.increment(it.key, it.value)
                }
            }
        }.then()

    override fun exists(key: ByteArray): Mono<Boolean> =
        Mono.fromCallable { store[key] != null }

    override fun setIfNotExists(key: ByteArray, value: ByteArray): Mono<Boolean> =
        Mono.fromCallable { store.checkAndSet(key, null, value) }

    override fun deleteIfEquals(key: ByteArray, expectedValue: ByteArray): Mono<Boolean> =
        Mono.fromCallable { store.checkAndSet(key, expectedValue, null) }
}

Step 3: Create MemoryStorageBackend

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/memory/MemoryStorageBackend.kt

package com.kakao.actionbase.v2.engine.storage.memory

import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore
import com.kakao.actionbase.v2.engine.storage.StorageBackend
import com.kakao.actionbase.v2.engine.storage.StorageBuckets
import reactor.core.publisher.Mono

class MemoryStorageBackend : StorageBackend {
    private val store = ByteArrayStore()

    override fun getBucket(namespace: String, name: String): Mono<StorageBuckets> {
        val bucket = MemoryStorageBucket(store)
        return Mono.just(StorageBuckets(bucket, bucket))
    }

    override fun getBucket(uri: String): Mono<StorageBuckets> =
        getBucket("", "")

    override fun close() {
        // nothing to close
    }
}

Step 4: Create HBaseStorageBucket

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/hbase/HBaseStorageBucket.kt

Wrap AsyncTable<AdvancedScanResultConsumer> and implement StorageBucket:

  • Use Constants.DEFAULT_COLUMN_FAMILY and Constants.DEFAULT_QUALIFIER
  • Convert HBase Get, Put, Delete, Scan, Increment, CheckAndMutate to StorageBucket methods
  • Reference existing implementation in HBaseHashLabel for patterns

Step 5: Create HBaseStorageBackend

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/hbase/HBaseStorageBackend.kt

package com.kakao.actionbase.v2.engine.storage.hbase

import com.kakao.actionbase.v2.engine.storage.StorageBackend
import com.kakao.actionbase.v2.engine.storage.StorageBuckets
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.AsyncConnection
import reactor.core.publisher.Mono

class HBaseStorageBackend private constructor(
    private val connectionMono: Mono<AsyncConnection>,
    private val namespace: String,
    private val config: Configuration,
) : StorageBackend {

    override fun getBucket(namespace: String, name: String): Mono<StorageBuckets> =
        connectionMono.map { conn ->
            val table = conn.getTable(TableName.valueOf(namespace, name))
            val bucket = HBaseStorageBucket(table)
            StorageBuckets(bucket, bucket)
        }

    override fun getBucket(uri: String): Mono<StorageBuckets> {
        val (ns, name) = parseUri(uri)
        return getBucket(ns, name)
    }

    override fun close() {
        connectionMono.block()?.close()
    }

    private fun parseUri(uri: String): Pair<String, String> {
        val parts = uri.removePrefix("datastore://").split("/")
        require(parts.size == 2) { "Invalid URI: $uri" }
        return parts[0] to parts[1]
    }

    companion object {
        fun create(properties: Map<String, String>): HBaseStorageBackend {
            // Copy initialization logic from DefaultHBaseCluster.initialize()
            // - Kerberos setup
            // - HBase version detection (2.4 vs 2.5)
            // - Connection creation
        }
    }
}

Step 6: Create DefaultStorageBackendFactory

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/DefaultStorageBackendFactory.kt

package com.kakao.actionbase.v2.engine.storage

import com.kakao.actionbase.v2.engine.storage.hbase.HBaseStorageBackend
import com.kakao.actionbase.v2.engine.storage.memory.MemoryStorageBackend
import org.slf4j.LoggerFactory

object DefaultStorageBackendFactory {
    private val logger = LoggerFactory.getLogger(DefaultStorageBackendFactory::class.java)
    private lateinit var instance0: StorageBackend

    val INSTANCE: StorageBackend get() = instance0

    fun initialize(properties: Map<String, String>) {
        val type = properties["type"] ?: "hbase"
        logger.info("Initializing StorageBackend with type: {}", type)

        instance0 = when (type) {
            "memory" -> {
                logger.info("Using MemoryStorageBackend")
                MemoryStorageBackend()
            }
            "embedded" -> {
                logger.info("Using MockStorageBackend (embedded)")
                MockStorageBackend()
            }
            else -> {
                logger.info("Using HBaseStorageBackend")
                HBaseStorageBackend.create(properties)
            }
        }
    }

    fun close() {
        if (::instance0.isInitialized) {
            instance0.close()
        }
    }
}

Step 7: Update GraphDefaults

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/GraphDefaults.kt

Change:

// Before
val datastore: DefaultHBaseCluster

// After
val datastore: StorageBackend

Step 8: Update Graph.kt

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/Graph.kt

Change:

// Before
DefaultHBaseCluster.initialize(config.hbase)
// ...
DefaultHBaseCluster.INSTANCE

// After
DefaultStorageBackendFactory.initialize(config.hbase)
// ...
DefaultStorageBackendFactory.INSTANCE

Step 9: Update HBaseOptions.kt

File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/hbase/HBaseOptions.kt

Change getTables() to return StorageBuckets instead of HBaseTables:

fun getBuckets(): Mono<StorageBuckets> =
    DefaultStorageBackendFactory.INSTANCE.getBucket(namespace, tableName)

Step 10: Update Label classes

Update classes that use HBaseTables to use StorageBuckets:

  • HBaseHashLabel
  • HBaseIndexedLabel
  • DatastoreHashLabel
  • DatastoreIndexedLabel

Change:

// Before
private val tables: Mono<HBaseTables>
tables.flatMap { it.edge.get(get) }

// After
private val buckets: Mono<StorageBuckets>
buckets.flatMap { it.edge.get(key) }

Step 11: Delete obsolete files

  • Delete engine/src/main/kotlin/com/kakao/actionbase/v2/engine/compat/DefaultHBaseCluster.kt
  • Delete engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/hbase/HBaseTables.kt

Step 12: Run tests

./gradlew :engine:test

Task Checklist

  • Step 1: Create interfaces (StorageBackend, StorageBucket, StorageBuckets)
  • Step 2: Create MemoryStorageBucket
  • Step 3: Create MemoryStorageBackend
  • Step 4: Create HBaseStorageBucket
  • Step 5: Create HBaseStorageBackend
  • Step 6: Create DefaultStorageBackendFactory
  • Step 7: Update GraphDefaults
  • Step 8: Update Graph.kt
  • Step 9: Update HBaseOptions.kt
  • Step 10: Update Label classes
  • Step 11: Delete obsolete files
  • Step 12: Run tests

Done When

  1. DefaultStorageBackendFactory.initialize(properties) creates appropriate backend based on type:

    • type=hbaseHBaseStorageBackend
    • type=memoryMemoryStorageBackend
    • type=embeddedMockStorageBackend
  2. All existing tests pass

  3. No references to DefaultHBaseCluster or HBaseTables remain

Notes

Usage

hbase:
  type: memory    # memory | embedded | hbase (default)

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions