Skip to content

Commit 36f0ef5

Browse files
committed
preliminary changes
1 parent 46d6858 commit 36f0ef5

2 files changed

Lines changed: 256 additions & 124 deletions

File tree

datasketch/storage.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,15 @@ def __setstate__(self, state):
944944
class RedisListStorage(OrderedStorage, RedisStorage):
945945
def __init__(self, config, name=None):
946946
RedisStorage.__init__(self, config, name=name)
947+
# Set up encoders/decoders similar to Cassandra
948+
# Bucket tables store user keys as values, so decode them to strings
949+
# Key tables store hash values as values, so keep them as bytes
950+
if b"bucket" in name:
951+
self._val_decoder = lambda x: x.decode("utf-8") if isinstance(x, bytes) else x
952+
self._val_encoder = lambda x: x.encode("utf-8") if isinstance(x, str) else x
953+
else:
954+
self._val_decoder = lambda x: x
955+
self._val_encoder = lambda x: x
947956

948957
def keys(self):
949958
return self._redis.hkeys(self._name)
@@ -957,14 +966,16 @@ def status(self):
957966
return status
958967

959968
def get(self, key):
960-
return self._get_items(self._redis, self.redis_key(key))
969+
items = self._get_items(self._redis, self.redis_key(key))
970+
return [self._val_decoder(item) for item in items]
961971

962972
def getmany(self, *keys):
963973
pipe = self._redis.pipeline()
964974
pipe.multi()
965975
for key in keys:
966976
self._get_items(pipe, self.redis_key(key))
967-
return pipe.execute()
977+
results = pipe.execute()
978+
return [[self._val_decoder(item) for item in items] for items in results]
968979

969980
@staticmethod
970981
def _get_items(r, k):
@@ -984,10 +995,11 @@ def _remove(self, r, *keys):
984995
def remove_val(self, key, val, **kwargs):
985996
buffer = kwargs.pop("buffer", False)
986997
redis_key = self.redis_key(key)
998+
encoded_val = self._val_encoder(val)
987999
if buffer:
988-
self._buffer.lrem(redis_key, val)
1000+
self._buffer.lrem(redis_key, encoded_val)
9891001
else:
990-
self._redis.lrem(redis_key, val)
1002+
self._redis.lrem(redis_key, encoded_val)
9911003
if not self._redis.exists(redis_key):
9921004
self._redis.hdel(self._name, redis_key)
9931005

@@ -1005,7 +1017,8 @@ def insert(self, key, *vals, **kwargs):
10051017
def _insert(self, r, key, *values):
10061018
redis_key = self.redis_key(key)
10071019
r.hset(self._name, key, redis_key)
1008-
r.rpush(redis_key, *values)
1020+
encoded_values = [self._val_encoder(val) for val in values]
1021+
r.rpush(redis_key, *encoded_values)
10091022

10101023
def size(self):
10111024
return self._redis.hlen(self._name)
@@ -1042,17 +1055,19 @@ def _get_items(r, k):
10421055
def remove_val(self, key, val, **kwargs):
10431056
buffer = kwargs.pop("buffer", False)
10441057
redis_key = self.redis_key(key)
1058+
encoded_val = self._val_encoder(val)
10451059
if buffer:
1046-
self._buffer.srem(redis_key, val)
1060+
self._buffer.srem(redis_key, encoded_val)
10471061
else:
1048-
self._redis.srem(redis_key, val)
1062+
self._redis.srem(redis_key, encoded_val)
10491063
if not self._redis.exists(redis_key):
10501064
self._redis.hdel(self._name, redis_key)
10511065

10521066
def _insert(self, r, key, *values):
10531067
redis_key = self.redis_key(key)
10541068
r.hset(self._name, key, redis_key)
1055-
r.sadd(redis_key, *values)
1069+
encoded_values = [self._val_encoder(val) for val in values]
1070+
r.sadd(redis_key, *encoded_values)
10561071

10571072
@staticmethod
10581073
def _get_len(r, k):

0 commit comments

Comments
 (0)