Skip to content

Commit 6dfbead

Browse files
frant-hartmsrknzl
andauthored
Implement putAllWithMetadata [HZ-2583] [5.3.z] (#25144)
The PR adds a new codec/message task and internal API in ClientMapProxy and reuses existing MergeOperation. Using the same operation as during split-brain/WAN replication makes it easy to explain the behavior. Client protocol PR: hazelcast/hazelcast-client-protocol#468 We exceptionally add a new API in patch to support the data migration tool on 5.3.z as the target cluster. Breaking changes (list specific methods/types/messages): - New API addition in a patch version . The new API is internal though Co-authored-by: Serkan Özel <serkan.ozel@hazelcast.com>
1 parent fc868ee commit 6dfbead

14 files changed

Lines changed: 31382 additions & 0 deletions

hazelcast/src/main/java/com/hazelcast/client/impl/protocol/DefaultMessageTaskFactoryProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@
240240
import com.hazelcast.client.impl.protocol.codec.MapProjectCodec;
241241
import com.hazelcast.client.impl.protocol.codec.MapProjectWithPredicateCodec;
242242
import com.hazelcast.client.impl.protocol.codec.MapPutAllCodec;
243+
import com.hazelcast.client.impl.protocol.codec.MapPutAllWithMetadataCodec;
243244
import com.hazelcast.client.impl.protocol.codec.MapPutCodec;
244245
import com.hazelcast.client.impl.protocol.codec.MapPutIfAbsentCodec;
245246
import com.hazelcast.client.impl.protocol.codec.MapPutIfAbsentWithMaxIdleCodec;
@@ -624,6 +625,7 @@
624625
import com.hazelcast.client.impl.protocol.task.map.MapProjectionWithPredicateMessageTask;
625626
import com.hazelcast.client.impl.protocol.task.map.MapPublisherCreateMessageTask;
626627
import com.hazelcast.client.impl.protocol.task.map.MapPublisherCreateWithValueMessageTask;
628+
import com.hazelcast.client.impl.protocol.task.map.MapPutAllWithMetadataMessageTask;
627629
import com.hazelcast.client.impl.protocol.task.map.MapPutAllMessageTask;
628630
import com.hazelcast.client.impl.protocol.task.map.MapPutIfAbsentMessageTask;
629631
import com.hazelcast.client.impl.protocol.task.map.MapPutIfAbsentWithMaxIdleMessageTask;
@@ -1488,6 +1490,8 @@ private void initializeMapTaskFactories() {
14881490
(cm, con) -> new MapPutIfAbsentWithMaxIdleMessageTask(cm, node, con));
14891491
factories.put(MapPutTransientWithMaxIdleCodec.REQUEST_MESSAGE_TYPE,
14901492
(cm, con) -> new MapPutTransientWithMaxIdleMessageTask(cm, node, con));
1493+
factories.put(MapPutAllWithMetadataCodec.REQUEST_MESSAGE_TYPE,
1494+
(cm, con) -> new MapPutAllWithMetadataMessageTask(cm, node, con));
14911495
}
14921496

14931497
private void initializeGeneralTaskFactories() {
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hazelcast.client.impl.protocol.codec;
18+
19+
import com.hazelcast.client.impl.protocol.ClientMessage;
20+
import com.hazelcast.client.impl.protocol.Generated;
21+
import com.hazelcast.client.impl.protocol.codec.builtin.*;
22+
import com.hazelcast.client.impl.protocol.codec.custom.*;
23+
24+
import javax.annotation.Nullable;
25+
26+
import static com.hazelcast.client.impl.protocol.ClientMessage.*;
27+
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*;
28+
29+
/*
30+
* This file is auto-generated by the Hazelcast Client Protocol Code Generator.
31+
* To change this file, edit the templates or the protocol
32+
* definitions on the https://github.com/hazelcast/hazelcast-client-protocol
33+
* and regenerate it.
34+
*/
35+
36+
/**
37+
* Copies all of the mappings from the specified entry list to this map, including metadata.
38+
* The implementation uses MergeOperation with PassThroughMergePolicy, so the effect of
39+
* this call is equivalent to synchronizing given entries using WAN replication.
40+
* Please note that all the keys in the request should belong to the partition id to which this request is being sent, all keys
41+
* matching to a different partition id shall be ignored. The API implementation using this request may need to send multiple
42+
* of these request messages for filling a request for a key set if the keys belong to different partitions.
43+
*/
44+
@Generated("a8d1d03ec3de96de55d76858ff81db9a")
45+
public final class MapPutAllWithMetadataCodec {
46+
//hex: 0x014900
47+
public static final int REQUEST_MESSAGE_TYPE = 84224;
48+
//hex: 0x014901
49+
public static final int RESPONSE_MESSAGE_TYPE = 84225;
50+
private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
51+
private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
52+
53+
private MapPutAllWithMetadataCodec() {
54+
}
55+
56+
@edu.umd.cs.findbugs.annotations.SuppressFBWarnings({"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"})
57+
public static class RequestParameters {
58+
59+
/**
60+
* name of map
61+
*/
62+
public java.lang.String name;
63+
64+
/**
65+
* entries with metadata
66+
*/
67+
public java.util.List<com.hazelcast.map.impl.SimpleEntryView<com.hazelcast.internal.serialization.Data, com.hazelcast.internal.serialization.Data>> entries;
68+
}
69+
70+
public static ClientMessage encodeRequest(java.lang.String name, java.util.Collection<com.hazelcast.map.impl.SimpleEntryView<com.hazelcast.internal.serialization.Data, com.hazelcast.internal.serialization.Data>> entries) {
71+
ClientMessage clientMessage = ClientMessage.createForEncode();
72+
clientMessage.setContainsSerializedDataInRequest(true);
73+
clientMessage.setRetryable(false);
74+
clientMessage.setOperationName("Map.PutAllWithMetadata");
75+
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
76+
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
77+
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
78+
clientMessage.add(initialFrame);
79+
StringCodec.encode(clientMessage, name);
80+
ListMultiFrameCodec.encode(clientMessage, entries, SimpleEntryViewCodec::encode);
81+
return clientMessage;
82+
}
83+
84+
public static MapPutAllWithMetadataCodec.RequestParameters decodeRequest(ClientMessage clientMessage) {
85+
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
86+
RequestParameters request = new RequestParameters();
87+
//empty initial frame
88+
iterator.next();
89+
request.name = StringCodec.decode(iterator);
90+
request.entries = ListMultiFrameCodec.decode(iterator, SimpleEntryViewCodec::decode);
91+
return request;
92+
}
93+
94+
public static ClientMessage encodeResponse() {
95+
ClientMessage clientMessage = ClientMessage.createForEncode();
96+
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
97+
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
98+
clientMessage.add(initialFrame);
99+
100+
return clientMessage;
101+
}
102+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hazelcast.client.impl.protocol.task.map;
18+
19+
import com.hazelcast.client.impl.protocol.ClientMessage;
20+
import com.hazelcast.client.impl.protocol.codec.MapPutAllWithMetadataCodec;
21+
import com.hazelcast.instance.impl.Node;
22+
import com.hazelcast.internal.nio.Connection;
23+
import com.hazelcast.internal.serialization.Data;
24+
import com.hazelcast.internal.serialization.SerializationService;
25+
import com.hazelcast.internal.util.Timer;
26+
import com.hazelcast.map.impl.MapContainer;
27+
import com.hazelcast.map.impl.MapService;
28+
import com.hazelcast.map.impl.SimpleEntryView;
29+
import com.hazelcast.map.impl.operation.MergeOperation;
30+
import com.hazelcast.security.permission.ActionConstants;
31+
import com.hazelcast.security.permission.MapPermission;
32+
import com.hazelcast.spi.impl.operationservice.Operation;
33+
import com.hazelcast.spi.merge.PassThroughMergePolicy;
34+
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
35+
import com.hazelcast.spi.merge.SplitBrainMergeTypes.MapMergeTypes;
36+
37+
import java.security.Permission;
38+
import java.util.ArrayList;
39+
import java.util.List;
40+
41+
import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry;
42+
43+
public class MapPutAllWithMetadataMessageTask
44+
extends AbstractMapPartitionMessageTask<MapPutAllWithMetadataCodec.RequestParameters> {
45+
46+
private volatile long startTimeNanos;
47+
48+
public MapPutAllWithMetadataMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
49+
super(clientMessage, node, connection);
50+
}
51+
52+
@Override
53+
protected Operation prepareOperation() {
54+
SerializationService ss = nodeEngine.getSerializationService();
55+
56+
List<MapMergeTypes<Object, Object>> mergingEntries = new ArrayList<>();
57+
for (SimpleEntryView<Data, Data> entry : parameters.entries) {
58+
mergingEntries.add(createMergingEntry(ss, entry));
59+
}
60+
61+
SplitBrainMergePolicy mergePolicy = nodeEngine.getSplitBrainMergePolicyProvider()
62+
.getMergePolicy(PassThroughMergePolicy.class.getName());
63+
64+
return new MergeOperation(parameters.name, mergingEntries, mergePolicy, true);
65+
}
66+
67+
@Override
68+
protected MapPutAllWithMetadataCodec.RequestParameters decodeClientMessage(ClientMessage clientMessage) {
69+
return MapPutAllWithMetadataCodec.decodeRequest(clientMessage);
70+
}
71+
72+
@Override
73+
protected ClientMessage encodeResponse(Object response) {
74+
return MapPutAllWithMetadataCodec.encodeResponse();
75+
}
76+
77+
@Override
78+
public String getServiceName() {
79+
return MapService.SERVICE_NAME;
80+
}
81+
82+
@Override
83+
protected void beforeProcess() {
84+
startTimeNanos = Timer.nanos();
85+
}
86+
87+
@Override
88+
protected Object processResponseBeforeSending(Object response) {
89+
MapService mapService = getService(MapService.SERVICE_NAME);
90+
MapContainer mapContainer = mapService.getMapServiceContext().getMapContainer(parameters.name);
91+
if (mapContainer.getMapConfig().isStatisticsEnabled()) {
92+
mapService.getMapServiceContext().getLocalMapStatsProvider().getLocalMapStatsImpl(parameters.name)
93+
.incrementPutLatencyNanos(parameters.entries.size(),
94+
Timer.nanosElapsed(startTimeNanos));
95+
}
96+
return response;
97+
}
98+
99+
@Override
100+
public Permission getRequiredPermission() {
101+
return new MapPermission(parameters.name, ActionConstants.ACTION_PUT);
102+
}
103+
104+
@Override
105+
public String getDistributedObjectName() {
106+
return parameters.name;
107+
}
108+
109+
@Override
110+
public String getMethodName() {
111+
return "putAllWithMetadata";
112+
}
113+
114+
@Override
115+
public Object[] getParameters() {
116+
List<SimpleEntryView<Data, Data>> entries = new ArrayList<>(parameters.entries);
117+
return new Object[]{entries};
118+
}
119+
}

hazelcast/src/main/java/com/hazelcast/client/impl/proxy/ClientMapProxy.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import com.hazelcast.client.impl.protocol.codec.MapProjectCodec;
6262
import com.hazelcast.client.impl.protocol.codec.MapProjectWithPredicateCodec;
6363
import com.hazelcast.client.impl.protocol.codec.MapPutAllCodec;
64+
import com.hazelcast.client.impl.protocol.codec.MapPutAllWithMetadataCodec;
6465
import com.hazelcast.client.impl.protocol.codec.MapPutCodec;
6566
import com.hazelcast.client.impl.protocol.codec.MapPutIfAbsentCodec;
6667
import com.hazelcast.client.impl.protocol.codec.MapPutIfAbsentWithMaxIdleCodec;
@@ -160,6 +161,7 @@
160161
import java.util.Map;
161162
import java.util.Set;
162163
import java.util.UUID;
164+
import java.util.concurrent.CompletableFuture;
163165
import java.util.concurrent.Future;
164166
import java.util.concurrent.TimeUnit;
165167
import java.util.concurrent.atomic.AtomicInteger;
@@ -185,6 +187,7 @@
185187
import static java.lang.Thread.currentThread;
186188
import static java.util.Collections.emptyMap;
187189
import static java.util.concurrent.TimeUnit.MILLISECONDS;
190+
import static java.util.stream.Collectors.groupingBy;
188191

189192
/**
190193
* Proxy implementation of {@link IMap}.
@@ -1670,6 +1673,76 @@ private void putAllInternal(@Nonnull Map<? extends K, ? extends V> map,
16701673
protected void finalizePutAll(Map<? extends K, ? extends V> map, Map<Integer, List<Entry<Data, Data>>> entryMap) {
16711674
}
16721675

1676+
public CompletableFuture<Void> putAllWithMetadataAsync(@Nonnull Collection<? extends EntryView<K, V>> entries) {
1677+
checkNotNull(entries, "Null argument entries is not allowed");
1678+
ClientPartitionService partitionService = getContext().getPartitionService();
1679+
1680+
Map<Integer, List<SimpleEntryView<Data, Data>>> entriesByPartition =
1681+
entries.stream()
1682+
.map(e -> {
1683+
checkNotNull(e.getKey(), NULL_KEY_IS_NOT_ALLOWED);
1684+
checkNotNull(e.getValue(), NULL_VALUE_IS_NOT_ALLOWED);
1685+
1686+
Data keyData = toData(e.getKey());
1687+
if (e instanceof SimpleEntryView
1688+
&& e.getKey() instanceof Data
1689+
&& e.getValue() instanceof Data
1690+
) {
1691+
return (SimpleEntryView<Data, Data>) e;
1692+
} else {
1693+
return new SimpleEntryView<>(keyData, toData(e.getValue()))
1694+
.withCost(e.getCost())
1695+
.withCreationTime(e.getCreationTime())
1696+
.withExpirationTime(e.getExpirationTime())
1697+
.withHits(e.getHits())
1698+
.withLastAccessTime(e.getLastAccessTime())
1699+
.withLastStoredTime(e.getLastStoredTime())
1700+
.withLastUpdateTime(e.getLastUpdateTime())
1701+
.withVersion(e.getVersion())
1702+
.withTtl(e.getTtl())
1703+
.withMaxIdle(e.getMaxIdle());
1704+
}
1705+
})
1706+
.collect(groupingBy(
1707+
(SimpleEntryView<Data, Data> e) -> partitionService.getPartitionId(e.getKey())
1708+
));
1709+
1710+
AtomicInteger counter = new AtomicInteger(entriesByPartition.size());
1711+
InternalCompletableFuture<Void> resultFuture = new InternalCompletableFuture<>();
1712+
if (counter.get() == 0) {
1713+
resultFuture.complete(null);
1714+
}
1715+
for (Entry<Integer, ? extends List<SimpleEntryView<Data, Data>>> entry : entriesByPartition.entrySet()) {
1716+
Integer partitionId = entry.getKey();
1717+
ClientMessage request = MapPutAllWithMetadataCodec.encodeRequest(name, entry.getValue());
1718+
ClientInvocationFuture future = new ClientInvocation(getClient(), request, getName(), partitionId)
1719+
.invoke();
1720+
1721+
future.whenCompleteAsync((clientMessage, throwable) -> {
1722+
if (throwable != null) {
1723+
resultFuture.completeExceptionally(throwable);
1724+
return;
1725+
}
1726+
if (counter.decrementAndGet() == 0) {
1727+
finalizePutAll(
1728+
entries,
1729+
entriesByPartition
1730+
);
1731+
if (!resultFuture.isDone()) {
1732+
resultFuture.complete(null);
1733+
}
1734+
}
1735+
}, ConcurrencyUtil.getDefaultAsyncExecutor());
1736+
}
1737+
1738+
return resultFuture;
1739+
}
1740+
1741+
protected void finalizePutAll(
1742+
Collection<? extends EntryView<K, V>> entries, Map<Integer,
1743+
List<SimpleEntryView<Data, Data>>> entryMap) {
1744+
}
1745+
16731746
@Override
16741747
public void clear() {
16751748
ClientMessage request = MapClearCodec.encodeRequest(name);

hazelcast/src/main/java/com/hazelcast/client/map/impl/nearcache/NearCachedClientMapProxy.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.hazelcast.client.impl.spi.impl.ClientInvocationFuture;
2828
import com.hazelcast.client.impl.spi.impl.ListenerMessageCodec;
2929
import com.hazelcast.config.NearCacheConfig;
30+
import com.hazelcast.core.EntryView;
3031
import com.hazelcast.internal.adapter.IMapDataStructureAdapter;
3132
import com.hazelcast.internal.monitor.impl.LocalMapStatsImpl;
3233
import com.hazelcast.internal.nearcache.NearCache;
@@ -39,6 +40,7 @@
3940
import com.hazelcast.logging.ILogger;
4041
import com.hazelcast.map.EntryProcessor;
4142
import com.hazelcast.map.LocalMapStats;
43+
import com.hazelcast.map.impl.SimpleEntryView;
4244
import com.hazelcast.query.Predicate;
4345
import com.hazelcast.spi.impl.InternalCompletableFuture;
4446
import com.hazelcast.core.ReadOnly;
@@ -560,6 +562,24 @@ protected void finalizePutAll(Map<? extends K, ? extends V> map, Map<Integer, Li
560562
}
561563
}
562564

565+
@Override
566+
protected void finalizePutAll(
567+
Collection<? extends EntryView<K, V>> entries,
568+
Map<Integer, List<SimpleEntryView<Data, Data>>> entriesByPartition
569+
) {
570+
if (serializeKeys) {
571+
for (List<SimpleEntryView<Data, Data>> partitionEntries : entriesByPartition.values()) {
572+
for (EntryView<Data, Data> entry : partitionEntries) {
573+
invalidateNearCache(entry.getKey());
574+
}
575+
}
576+
} else {
577+
for (EntryView<K, V> entry : entries) {
578+
invalidateNearCache(entry.getKey());
579+
}
580+
}
581+
}
582+
563583
@Override
564584
public void clear() {
565585
nearCache.clear();

0 commit comments

Comments
 (0)