Skip to content

Commit 9908c4c

Browse files
committed
refactor: deduplicate v3 mutation paths and lock handling
1 parent ec58d98 commit 9908c4c

2 files changed

Lines changed: 317 additions & 268 deletions

File tree

engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3CompatibleTableBinding.kt

Lines changed: 66 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.kakao.actionbase.v2.engine.v3
22

33
import com.kakao.actionbase.core.edge.mapper.EdgeRecordMapper
44
import com.kakao.actionbase.core.edge.mutation.EdgeMutationBuilder
5+
import com.kakao.actionbase.core.edge.mutation.EdgeMutationRecords
56
import com.kakao.actionbase.core.edge.payload.EdgeMutationStatus
67
import com.kakao.actionbase.core.edge.payload.MultiEdgeMutationStatus
78
import com.kakao.actionbase.core.edge.record.EdgeGroupRecord
@@ -53,18 +54,7 @@ class V3CompatibleTableBinding(
5354
.flatMap {
5455
findHashEdge(encodedHashEdgeKey)
5556
}.map {
56-
// extract the state from the encoded value
57-
val stateValue = mapper.state.decoder.decodeValue(it)
58-
State(
59-
stateValue.active,
60-
stateValue.version,
61-
stateValue.createdAt,
62-
stateValue.deletedAt,
63-
stateValue.properties
64-
.mapNotNull { (key, value) ->
65-
codeToFieldNameMap[key]?.let { name -> name to value }
66-
}.toMap(),
67-
)
57+
decodeCurrentState(it, mapper, codeToFieldNameMap)
6858
}.switchIfEmpty(Mono.defer { Mono.just(State.initial) })
6959
.map { before ->
7060
val after =
@@ -76,42 +66,7 @@ class V3CompatibleTableBinding(
7666
val beforeRecord = EdgeStateRecord.of(source, target, before, entity.id)
7767
val afterRecord = EdgeStateRecord.of(source, target, after, entity.id)
7868
val mutationRecords = EdgeMutationBuilder.build(beforeRecord, afterRecord, schema.direction, schema.indexes, schema.groups)
79-
val mutations = mutableListOf<Mutation>()
80-
val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
81-
mutations +=
82-
Put(record.key)
83-
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
84-
mutations +=
85-
mutationRecords.createIndexRecords.map {
86-
val record = mapper.index.encoder.encode(it)
87-
Put(record.key)
88-
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
89-
}
90-
mutations +=
91-
mutationRecords.countRecords.map {
92-
val key = mapper.count.encoder.encodeKey(it.key)
93-
Increment(key)
94-
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value)
95-
}
96-
mutations +=
97-
mutationRecords.deleteIndexRecordKeys.map {
98-
val key = mapper.index.encoder.encodeKey(it)
99-
Delete(key)
100-
}
101-
mutations +=
102-
mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
103-
val (key, ttl) = groupKey
104-
val encodedKey = mapper.group.encoder.encodeKey(key)
105-
val increment = Increment(encodedKey)
106-
records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
107-
val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
108-
increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue)
109-
}
110-
if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) {
111-
increment.ttl = ttl
112-
}
113-
increment
114-
}
69+
val mutations = buildHBaseMutations(mutationRecords, mapper)
11570
handleDeferredRequests(mutations)
11671
.thenReturn(
11772
EdgeMutationStatus(source, target, events.size, mutationRecords.status, before, after, mutationRecords.acc),
@@ -147,18 +102,7 @@ class V3CompatibleTableBinding(
147102
.flatMap {
148103
findHashEdge(encodedHashEdgeKey)
149104
}.map {
150-
// extract the state from the encoded value
151-
val stateValue = mapper.state.decoder.decodeValue(it)
152-
State(
153-
stateValue.active,
154-
stateValue.version,
155-
stateValue.createdAt,
156-
stateValue.deletedAt,
157-
stateValue.properties
158-
.mapNotNull { (key, value) ->
159-
codeToFieldNameMap[key]?.let { name -> name to value }
160-
}.toMap(),
161-
)
105+
decodeCurrentState(it, mapper, codeToFieldNameMap)
162106
}.switchIfEmpty(Mono.defer { Mono.just(State.initial) })
163107
.map { before ->
164108
val after =
@@ -170,42 +114,7 @@ class V3CompatibleTableBinding(
170114
val beforeRecord = EdgeStateRecord.of(key, key, before, entity.id)
171115
val afterRecord = EdgeStateRecord.of(key, key, after, entity.id)
172116
val mutationRecords = EdgeMutationBuilder.buildForMultiEdge(beforeRecord, afterRecord, schema.direction, schema.indexes, schema.groups)
173-
val mutations = mutableListOf<Mutation>()
174-
val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
175-
mutations +=
176-
Put(record.key)
177-
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
178-
mutations +=
179-
mutationRecords.createIndexRecords.map {
180-
val record = mapper.index.encoder.encode(it)
181-
Put(record.key)
182-
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
183-
}
184-
mutations +=
185-
mutationRecords.countRecords.map {
186-
val key = mapper.count.encoder.encodeKey(it.key)
187-
Increment(key)
188-
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value)
189-
}
190-
mutations +=
191-
mutationRecords.deleteIndexRecordKeys.map {
192-
val key = mapper.index.encoder.encodeKey(it)
193-
Delete(key)
194-
}
195-
mutations +=
196-
mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
197-
val (key, ttl) = groupKey
198-
val encodedKey = mapper.group.encoder.encodeKey(key)
199-
val increment = Increment(encodedKey)
200-
records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
201-
val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
202-
increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue)
203-
}
204-
if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) {
205-
increment.ttl = ttl
206-
}
207-
increment
208-
}
117+
val mutations = buildHBaseMutations(mutationRecords, mapper)
209118
handleDeferredRequests(mutations)
210119
.thenReturn(
211120
MultiEdgeMutationStatus(key, events.size, mutationRecords.status, before, after, mutationRecords.acc),
@@ -219,6 +128,67 @@ class V3CompatibleTableBinding(
219128
}
220129
}
221130

131+
private fun decodeCurrentState(
132+
encodedState: ByteArray,
133+
mapper: EdgeRecordMapper,
134+
codeToFieldNameMap: Map<Int, String>,
135+
): State {
136+
val stateValue = mapper.state.decoder.decodeValue(encodedState)
137+
return State(
138+
stateValue.active,
139+
stateValue.version,
140+
stateValue.createdAt,
141+
stateValue.deletedAt,
142+
stateValue.properties
143+
.mapNotNull { (key, value) ->
144+
codeToFieldNameMap[key]?.let { name -> name to value }
145+
}.toMap(),
146+
)
147+
}
148+
149+
private fun buildHBaseMutations(
150+
mutationRecords: EdgeMutationRecords,
151+
mapper: EdgeRecordMapper,
152+
): MutableList<Mutation> {
153+
val mutations = mutableListOf<Mutation>()
154+
val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
155+
mutations +=
156+
Put(record.key)
157+
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
158+
mutations +=
159+
mutationRecords.createIndexRecords.map {
160+
val indexRecord = mapper.index.encoder.encode(it)
161+
Put(indexRecord.key)
162+
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, indexRecord.value)
163+
}
164+
mutations +=
165+
mutationRecords.countRecords.map {
166+
val key = mapper.count.encoder.encodeKey(it.key)
167+
Increment(key)
168+
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value)
169+
}
170+
mutations +=
171+
mutationRecords.deleteIndexRecordKeys.map {
172+
val key = mapper.index.encoder.encodeKey(it)
173+
Delete(key)
174+
}
175+
mutations +=
176+
mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
177+
val (key, ttl) = groupKey
178+
val encodedKey = mapper.group.encoder.encodeKey(key)
179+
val increment = Increment(encodedKey)
180+
records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
181+
val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
182+
increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue)
183+
}
184+
if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) {
185+
increment.ttl = ttl
186+
}
187+
increment
188+
}
189+
return mutations
190+
}
191+
222192
companion object {
223193
private fun State.specialStateValueToNull(): State =
224194
State(

0 commit comments

Comments
 (0)