@@ -2,6 +2,7 @@ package com.kakao.actionbase.v2.engine.v3
22
33import com.kakao.actionbase.core.edge.mapper.EdgeRecordMapper
44import com.kakao.actionbase.core.edge.mutation.EdgeMutationBuilder
5+ import com.kakao.actionbase.core.edge.mutation.EdgeMutationRecords
56import com.kakao.actionbase.core.edge.payload.EdgeMutationStatus
67import com.kakao.actionbase.core.edge.payload.MultiEdgeMutationStatus
78import com.kakao.actionbase.core.edge.record.EdgeGroupRecord
@@ -53,18 +54,7 @@ class V3CompatibleTableBinding(
5354 .flatMap {
5455 findHashEdge(encodedHashEdgeKey)
5556 }.map {
56- // extract the state from the encoded value
57- val stateValue = mapper.state.decoder.decodeValue(it)
58- State (
59- stateValue.active,
60- stateValue.version,
61- stateValue.createdAt,
62- stateValue.deletedAt,
63- stateValue.properties
64- .mapNotNull { (key, value) ->
65- codeToFieldNameMap[key]?.let { name -> name to value }
66- }.toMap(),
67- )
57+ decodeCurrentState(it, mapper, codeToFieldNameMap)
6858 }.switchIfEmpty(Mono .defer { Mono .just(State .initial) })
6959 .map { before ->
7060 val after =
@@ -76,42 +66,7 @@ class V3CompatibleTableBinding(
7666 val beforeRecord = EdgeStateRecord .of(source, target, before, entity.id)
7767 val afterRecord = EdgeStateRecord .of(source, target, after, entity.id)
7868 val mutationRecords = EdgeMutationBuilder .build(beforeRecord, afterRecord, schema.direction, schema.indexes, schema.groups)
79- val mutations = mutableListOf<Mutation >()
80- val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
81- mutations + =
82- Put (record.key)
83- .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , record.value)
84- mutations + =
85- mutationRecords.createIndexRecords.map {
86- val record = mapper.index.encoder.encode(it)
87- Put (record.key)
88- .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , record.value)
89- }
90- mutations + =
91- mutationRecords.countRecords.map {
92- val key = mapper.count.encoder.encodeKey(it.key)
93- Increment (key)
94- .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , it.value)
95- }
96- mutations + =
97- mutationRecords.deleteIndexRecordKeys.map {
98- val key = mapper.index.encoder.encodeKey(it)
99- Delete (key)
100- }
101- mutations + =
102- mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
103- val (key, ttl) = groupKey
104- val encodedKey = mapper.group.encoder.encodeKey(key)
105- val increment = Increment (encodedKey)
106- records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
107- val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
108- increment.addColumn(Constants .DEFAULT_COLUMN_FAMILY , qualifier, mergedValue)
109- }
110- if (ttl != null && ttl != Long .MAX_VALUE && ttl > 0 ) {
111- increment.ttl = ttl
112- }
113- increment
114- }
69+ val mutations = buildHBaseMutations(mutationRecords, mapper)
11570 handleDeferredRequests(mutations)
11671 .thenReturn(
11772 EdgeMutationStatus (source, target, events.size, mutationRecords.status, before, after, mutationRecords.acc),
@@ -147,18 +102,7 @@ class V3CompatibleTableBinding(
147102 .flatMap {
148103 findHashEdge(encodedHashEdgeKey)
149104 }.map {
150- // extract the state from the encoded value
151- val stateValue = mapper.state.decoder.decodeValue(it)
152- State (
153- stateValue.active,
154- stateValue.version,
155- stateValue.createdAt,
156- stateValue.deletedAt,
157- stateValue.properties
158- .mapNotNull { (key, value) ->
159- codeToFieldNameMap[key]?.let { name -> name to value }
160- }.toMap(),
161- )
105+ decodeCurrentState(it, mapper, codeToFieldNameMap)
162106 }.switchIfEmpty(Mono .defer { Mono .just(State .initial) })
163107 .map { before ->
164108 val after =
@@ -170,42 +114,7 @@ class V3CompatibleTableBinding(
170114 val beforeRecord = EdgeStateRecord .of(key, key, before, entity.id)
171115 val afterRecord = EdgeStateRecord .of(key, key, after, entity.id)
172116 val mutationRecords = EdgeMutationBuilder .buildForMultiEdge(beforeRecord, afterRecord, schema.direction, schema.indexes, schema.groups)
173- val mutations = mutableListOf<Mutation >()
174- val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
175- mutations + =
176- Put (record.key)
177- .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , record.value)
178- mutations + =
179- mutationRecords.createIndexRecords.map {
180- val record = mapper.index.encoder.encode(it)
181- Put (record.key)
182- .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , record.value)
183- }
184- mutations + =
185- mutationRecords.countRecords.map {
186- val key = mapper.count.encoder.encodeKey(it.key)
187- Increment (key)
188- .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , it.value)
189- }
190- mutations + =
191- mutationRecords.deleteIndexRecordKeys.map {
192- val key = mapper.index.encoder.encodeKey(it)
193- Delete (key)
194- }
195- mutations + =
196- mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
197- val (key, ttl) = groupKey
198- val encodedKey = mapper.group.encoder.encodeKey(key)
199- val increment = Increment (encodedKey)
200- records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
201- val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
202- increment.addColumn(Constants .DEFAULT_COLUMN_FAMILY , qualifier, mergedValue)
203- }
204- if (ttl != null && ttl != Long .MAX_VALUE && ttl > 0 ) {
205- increment.ttl = ttl
206- }
207- increment
208- }
117+ val mutations = buildHBaseMutations(mutationRecords, mapper)
209118 handleDeferredRequests(mutations)
210119 .thenReturn(
211120 MultiEdgeMutationStatus (key, events.size, mutationRecords.status, before, after, mutationRecords.acc),
@@ -219,6 +128,67 @@ class V3CompatibleTableBinding(
219128 }
220129 }
221130
131+ private fun decodeCurrentState (
132+ encodedState : ByteArray ,
133+ mapper : EdgeRecordMapper ,
134+ codeToFieldNameMap : Map <Int , String >,
135+ ): State {
136+ val stateValue = mapper.state.decoder.decodeValue(encodedState)
137+ return State (
138+ stateValue.active,
139+ stateValue.version,
140+ stateValue.createdAt,
141+ stateValue.deletedAt,
142+ stateValue.properties
143+ .mapNotNull { (key, value) ->
144+ codeToFieldNameMap[key]?.let { name -> name to value }
145+ }.toMap(),
146+ )
147+ }
148+
149+ private fun buildHBaseMutations (
150+ mutationRecords : EdgeMutationRecords ,
151+ mapper : EdgeRecordMapper ,
152+ ): MutableList <Mutation > {
153+ val mutations = mutableListOf<Mutation >()
154+ val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
155+ mutations + =
156+ Put (record.key)
157+ .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , record.value)
158+ mutations + =
159+ mutationRecords.createIndexRecords.map {
160+ val indexRecord = mapper.index.encoder.encode(it)
161+ Put (indexRecord.key)
162+ .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , indexRecord.value)
163+ }
164+ mutations + =
165+ mutationRecords.countRecords.map {
166+ val key = mapper.count.encoder.encodeKey(it.key)
167+ Increment (key)
168+ .addColumn(Constants .DEFAULT_COLUMN_FAMILY , Constants .DEFAULT_QUALIFIER , it.value)
169+ }
170+ mutations + =
171+ mutationRecords.deleteIndexRecordKeys.map {
172+ val key = mapper.index.encoder.encodeKey(it)
173+ Delete (key)
174+ }
175+ mutations + =
176+ mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
177+ val (key, ttl) = groupKey
178+ val encodedKey = mapper.group.encoder.encodeKey(key)
179+ val increment = Increment (encodedKey)
180+ records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
181+ val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
182+ increment.addColumn(Constants .DEFAULT_COLUMN_FAMILY , qualifier, mergedValue)
183+ }
184+ if (ttl != null && ttl != Long .MAX_VALUE && ttl > 0 ) {
185+ increment.ttl = ttl
186+ }
187+ increment
188+ }
189+ return mutations
190+ }
191+
222192 companion object {
223193 private fun State.specialStateValueToNull (): State =
224194 State (
0 commit comments