-
Notifications
You must be signed in to change notification settings - Fork 13
Introduce StorageBackend abstraction layer #173
Description
Background
The current DefaultHBaseCluster tightly couples the engine to HBase. To support alternative storage backends (Memory, SlateDB, MySQL, etc.), we need a clean abstraction layer that separates:
- Backend lifecycle management (connections, configuration)
- Bucket-level operations (get, put, scan, etc.)
Related: #172
Architecture
DefaultStorageBackendFactory (singleton)
├── initialize(properties) → creates StorageBackend
└── INSTANCE: StorageBackend
StorageBackend (interface)
├── getBucket(namespace, name): Mono<StorageBuckets>
├── getBucket(uri): Mono<StorageBuckets>
└── close()
HBaseStorageBackend : StorageBackend
MemoryStorageBackend : StorageBackend
StorageBuckets
├── edge: StorageBucket
└── lock: StorageBucket
StorageBucket (interface)
├── get, put, delete, scan, increment, batch, exists
└── setIfNotExists, deleteIfEquals
HBaseStorageBucket : StorageBucket
MemoryStorageBucket : StorageBucket
Plan
Step 1: Create interfaces
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/StorageBackend.kt
package com.kakao.actionbase.v2.engine.storage
import reactor.core.publisher.Mono
import java.lang.AutoCloseable
interface StorageBackend : AutoCloseable {
fun getBucket(namespace: String, name: String): Mono<StorageBuckets>
fun getBucket(uri: String): Mono<StorageBuckets>
}File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/StorageBucket.kt
package com.kakao.actionbase.v2.engine.storage
import com.kakao.actionbase.core.storage.HBaseRecord
import com.kakao.actionbase.core.storage.MutationRequest
import reactor.core.publisher.Mono
interface StorageBucket {
fun get(key: ByteArray): Mono<ByteArray?>
fun get(keys: List<ByteArray>): Mono<List<HBaseRecord>>
fun put(key: ByteArray, value: ByteArray): Mono<Void>
fun delete(key: ByteArray): Mono<Void>
fun scan(prefix: ByteArray, limit: Int, start: ByteArray?, stop: ByteArray?): Mono<List<HBaseRecord>>
fun increment(key: ByteArray, delta: Long): Mono<Long>
fun batch(requests: List<MutationRequest>): Mono<Void>
fun exists(key: ByteArray): Mono<Boolean>
fun setIfNotExists(key: ByteArray, value: ByteArray): Mono<Boolean>
fun deleteIfEquals(key: ByteArray, expectedValue: ByteArray): Mono<Boolean>
}File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/StorageBuckets.kt
package com.kakao.actionbase.v2.engine.storage
data class StorageBuckets(
val edge: StorageBucket,
val lock: StorageBucket,
)Step 2: Create MemoryStorageBucket
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/memory/MemoryStorageBucket.kt
package com.kakao.actionbase.v2.engine.storage.memory
import com.kakao.actionbase.core.storage.HBaseRecord
import com.kakao.actionbase.core.storage.MutationRequest
import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore
import com.kakao.actionbase.v2.engine.storage.StorageBucket
import reactor.core.publisher.Mono
class MemoryStorageBucket(
private val store: ByteArrayStore,
) : StorageBucket {
override fun get(key: ByteArray): Mono<ByteArray?> =
Mono.fromCallable { store[key] }
override fun get(keys: List<ByteArray>): Mono<List<HBaseRecord>> =
Mono.fromCallable {
keys.mapNotNull { k -> store[k]?.let { HBaseRecord(key = k, value = it) } }
}
override fun put(key: ByteArray, value: ByteArray): Mono<Void> =
Mono.fromCallable { store[key] = value }.then()
override fun delete(key: ByteArray): Mono<Void> =
Mono.fromCallable { store.remove(key) }.then()
override fun scan(prefix: ByteArray, limit: Int, start: ByteArray?, stop: ByteArray?): Mono<List<HBaseRecord>> =
Mono.fromCallable { store.prefixScan(prefix).take(limit) }
override fun increment(key: ByteArray, delta: Long): Mono<Long> =
Mono.fromCallable { store.increment(key, delta) }
override fun batch(requests: List<MutationRequest>): Mono<Void> =
Mono.fromCallable {
requests.forEach {
when (it) {
is MutationRequest.Put -> store[it.key] = it.value
is MutationRequest.Delete -> store.remove(it.key)
is MutationRequest.Increment -> store.increment(it.key, it.value)
}
}
}.then()
override fun exists(key: ByteArray): Mono<Boolean> =
Mono.fromCallable { store[key] != null }
override fun setIfNotExists(key: ByteArray, value: ByteArray): Mono<Boolean> =
Mono.fromCallable { store.checkAndSet(key, null, value) }
override fun deleteIfEquals(key: ByteArray, expectedValue: ByteArray): Mono<Boolean> =
Mono.fromCallable { store.checkAndSet(key, expectedValue, null) }
}Step 3: Create MemoryStorageBackend
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/memory/MemoryStorageBackend.kt
package com.kakao.actionbase.v2.engine.storage.memory
import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore
import com.kakao.actionbase.v2.engine.storage.StorageBackend
import com.kakao.actionbase.v2.engine.storage.StorageBuckets
import reactor.core.publisher.Mono
class MemoryStorageBackend : StorageBackend {
private val store = ByteArrayStore()
override fun getBucket(namespace: String, name: String): Mono<StorageBuckets> {
val bucket = MemoryStorageBucket(store)
return Mono.just(StorageBuckets(bucket, bucket))
}
override fun getBucket(uri: String): Mono<StorageBuckets> =
getBucket("", "")
override fun close() {
// nothing to close
}
}Step 4: Create HBaseStorageBucket
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/hbase/HBaseStorageBucket.kt
Wrap AsyncTable<AdvancedScanResultConsumer> and implement StorageBucket:
- Use
Constants.DEFAULT_COLUMN_FAMILYandConstants.DEFAULT_QUALIFIER - Convert HBase
Get,Put,Delete,Scan,Increment,CheckAndMutatetoStorageBucketmethods - Reference existing implementation in
HBaseHashLabelfor patterns
Step 5: Create HBaseStorageBackend
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/hbase/HBaseStorageBackend.kt
package com.kakao.actionbase.v2.engine.storage.hbase
import com.kakao.actionbase.v2.engine.storage.StorageBackend
import com.kakao.actionbase.v2.engine.storage.StorageBuckets
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.AsyncConnection
import reactor.core.publisher.Mono
class HBaseStorageBackend private constructor(
private val connectionMono: Mono<AsyncConnection>,
private val namespace: String,
private val config: Configuration,
) : StorageBackend {
override fun getBucket(namespace: String, name: String): Mono<StorageBuckets> =
connectionMono.map { conn ->
val table = conn.getTable(TableName.valueOf(namespace, name))
val bucket = HBaseStorageBucket(table)
StorageBuckets(bucket, bucket)
}
override fun getBucket(uri: String): Mono<StorageBuckets> {
val (ns, name) = parseUri(uri)
return getBucket(ns, name)
}
override fun close() {
connectionMono.block()?.close()
}
private fun parseUri(uri: String): Pair<String, String> {
val parts = uri.removePrefix("datastore://").split("/")
require(parts.size == 2) { "Invalid URI: $uri" }
return parts[0] to parts[1]
}
companion object {
fun create(properties: Map<String, String>): HBaseStorageBackend {
// Copy initialization logic from DefaultHBaseCluster.initialize()
// - Kerberos setup
// - HBase version detection (2.4 vs 2.5)
// - Connection creation
}
}
}Step 6: Create DefaultStorageBackendFactory
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/DefaultStorageBackendFactory.kt
package com.kakao.actionbase.v2.engine.storage
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseStorageBackend
import com.kakao.actionbase.v2.engine.storage.memory.MemoryStorageBackend
import org.slf4j.LoggerFactory
object DefaultStorageBackendFactory {
private val logger = LoggerFactory.getLogger(DefaultStorageBackendFactory::class.java)
private lateinit var instance0: StorageBackend
val INSTANCE: StorageBackend get() = instance0
fun initialize(properties: Map<String, String>) {
val type = properties["type"] ?: "hbase"
logger.info("Initializing StorageBackend with type: {}", type)
instance0 = when (type) {
"memory" -> {
logger.info("Using MemoryStorageBackend")
MemoryStorageBackend()
}
"embedded" -> {
logger.info("Using MockStorageBackend (embedded)")
MockStorageBackend()
}
else -> {
logger.info("Using HBaseStorageBackend")
HBaseStorageBackend.create(properties)
}
}
}
fun close() {
if (::instance0.isInitialized) {
instance0.close()
}
}
}Step 7: Update GraphDefaults
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/GraphDefaults.kt
Change:
// Before
val datastore: DefaultHBaseCluster
// After
val datastore: StorageBackendStep 8: Update Graph.kt
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/Graph.kt
Change:
// Before
DefaultHBaseCluster.initialize(config.hbase)
// ...
DefaultHBaseCluster.INSTANCE
// After
DefaultStorageBackendFactory.initialize(config.hbase)
// ...
DefaultStorageBackendFactory.INSTANCEStep 9: Update HBaseOptions.kt
File: engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/hbase/HBaseOptions.kt
Change getTables() to return StorageBuckets instead of HBaseTables:
fun getBuckets(): Mono<StorageBuckets> =
DefaultStorageBackendFactory.INSTANCE.getBucket(namespace, tableName)Step 10: Update Label classes
Update classes that use HBaseTables to use StorageBuckets:
HBaseHashLabelHBaseIndexedLabelDatastoreHashLabelDatastoreIndexedLabel
Change:
// Before
private val tables: Mono<HBaseTables>
tables.flatMap { it.edge.get(get) }
// After
private val buckets: Mono<StorageBuckets>
buckets.flatMap { it.edge.get(key) }Step 11: Delete obsolete files
- Delete
engine/src/main/kotlin/com/kakao/actionbase/v2/engine/compat/DefaultHBaseCluster.kt - Delete
engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/hbase/HBaseTables.kt
Step 12: Run tests
./gradlew :engine:testTask Checklist
- Step 1: Create interfaces (StorageBackend, StorageBucket, StorageBuckets)
- Step 2: Create MemoryStorageBucket
- Step 3: Create MemoryStorageBackend
- Step 4: Create HBaseStorageBucket
- Step 5: Create HBaseStorageBackend
- Step 6: Create DefaultStorageBackendFactory
- Step 7: Update GraphDefaults
- Step 8: Update Graph.kt
- Step 9: Update HBaseOptions.kt
- Step 10: Update Label classes
- Step 11: Delete obsolete files
- Step 12: Run tests
Done When
-
DefaultStorageBackendFactory.initialize(properties)creates appropriate backend based ontype:type=hbase→HBaseStorageBackendtype=memory→MemoryStorageBackendtype=embedded→MockStorageBackend
-
All existing tests pass
-
No references to
DefaultHBaseClusterorHBaseTablesremain
Notes
Usage
hbase:
type: memory # memory | embedded | hbase (default)