-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmemcached2.py
More file actions
2407 lines (1915 loc) · 84.9 KB
/
memcached2.py
File metadata and controls
2407 lines (1915 loc) · 84.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# Copyright 2013 Sean Reifschneider, tummy.com, ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'Sean Reifschneider <jafo@tummy.com>'
__version__ = 'X.XX'
__copyright__ = 'Copyright (C) 2013 Sean Reifschneider, tummy.com, ltd.'
__license__ = 'Apache'
'''
from datetime import time
.. module:: memcached2
:platform: Unix, Windows
:synopsis: Next-generation memcache module for Python 2 and 3.
.. moduleauthor:: Sean Reifschneider <jafo@tummy.com>
A re-implementation of the python-memcached module, designed to work with
Python 2 and 3. Note: It is tested against Python 2.7 and 3.3 during
development, there may be problems running against previous versions.
See the Memcache() class and the tests for examples of use.
Developed by Sean Reifschneider <jafo@tummy.com> in 2013.
Bugs/patches/code: https://github.com/linsomniac/python-memcached2
'''
import re
import socket
import select
import sys
import time
from binascii import crc32
import collections
from bisect import bisect
PY3 = sys.version > '3'
PY33 = sys.version > '3.3'
if PY3:
import queue
else:
import Queue as queue
if not PY33:
ConnectionResetError = socket.error
class BrokenPipeError(Exception):
'''INTERNAL: Python 2 does not define this exception, so we
create one of our own.'''
pass
def _from_bytes(data):
'''INTERNAL: Convert bytes to a regular string.'''
if PY3:
if isinstance(data, str):
return data
return str(data, 'latin-1')
return str(data)
def _to_bytes(data):
'''INTERNAL: Convert something to bytes type.'''
if PY3:
if isinstance(data, bytes):
return data
return bytes(data, 'latin-1')
return data
def _to_bool(s):
'''INTERNAL: Convert a stats boolean string into boolean.'''
if s in ['0', 'no']:
return False
if s in ['1', 'yes']:
return True
raise NotImplementedError('Unknown boolean value {0}'.format(repr(s)))
def _server_interaction(
buffers_by_server, send_threshold, send_minimum,
expected_keys, results, return_successful, return_failed):
'''INTERNAL: Write and read to sockets that are ready.
This is used by the :py:func:`~memcached2.Memcache.set_multi` code to
interact with the server when buffers overflow or to finish sending and
receiving data.
'''
return_exception = None
read_sockets = [x for x in buffers_by_server.keys() if x.backend]
write_sockets = [
x[0] for x in buffers_by_server.items()
if not send_threshold or len(x[1]) >= send_threshold]
read_ready, write_ready = select.select(
read_sockets, write_sockets, [])[:2]
# receive data from read-ready sockets
for server in read_ready:
try:
server.read_from_socket()
except ServerDisconnect:
return_exception = _deferred_exception(
server, results, buffers_by_server, expected_keys,
'ServerDisconnect received, probably server died')
while server.line_available() and expected_keys[server]:
line = server.read_until().rstrip()
key = expected_keys[server].pop(0)
if line.startswith('CLIENT_ERROR'):
return_exception = _deferred_exception(
server, results, buffers_by_server, expected_keys,
'CLIENT_ERROR received, possibly key is too long.')
if line == 'STORED':
if return_successful:
results[key] = None
elif return_failed:
results[key] = line
# send data to write-ready sockets
for server in write_ready:
data = buffers_by_server[server]
if not data:
continue
bytes_sent = server.backend.send(data)
del data[:bytes_sent]
return return_exception
def _deferred_exception(
server, results, buffers_by_server, expected_keys, message):
'''INTERNAL: Handle an exception on a socket during set_multi.
This is used by the :py:func:`~memcached2.Memcache.set_multi` code to
reset socket connections and reconnect, flushing data to re-gain sync.
'''
server.reset()
server.connect()
del buffers_by_server[server][:]
del expected_keys[server][:]
return MultiStorageException(message, results)
def _dictionary_values_empty(d):
'''INTERNAL: Return the values in the dictionary that are not false.
'''
return [x for x in d.values() if x]
def debug(msg):
'''INTERNAL: Write a debugging message to stderr with a stack trace.
'''
import inspect
import os
stack = (
' -> '.join(['{0}({1}:{2})'.format(x[3],
os.path.basename(x[1]), x[2])
for x in reversed(inspect.stack()[1:])]))
sys.stderr.write('{0} => {1}\n'.format(msg, stack))
sys.stderr.flush()
class MemcachedException(Exception):
'''Base exception that all other exceptions inherit from.
This is never raised directly.'''
class UnknownProtocol(MemcachedException):
'''An unknown protocol was specified in the memcached URI.
Sublcass of :class:`MemcachedException`.'''
class InvalidURI(MemcachedException):
'''An error was encountered in parsing the server URI.
Subclass of :class:`MemcachedException`.'''
class ServerDisconnect(MemcachedException):
'''The connection to the server closed.
Subclass of :class:`MemcachedException`.'''
class NoAvailableServers(MemcachedException):
'''There are no servers available to cache on, probably because all
are disconnected. This exception typically occurs after the code
which would do a reconnection is run.
Subclass of :class:`MemcachedException`.'''
class StoreException(MemcachedException):
'''Base class for storage related exceptions. Never raised directly.
Subclass of :class:`MemcachedException`.'''
class NotStored(StoreException):
'''Item was not stored, but not due to an error. Normally means the
condition for an "add" or "replace" was not met.. Subclass of
:class:`StoreException`.'''
class CASFailure(StoreException):
'''Item you are trying to store with a "cas" command has been modified
since you last fetched it (result=EXISTS). Subclass of
:class:`StoreException`.'''
class MultiStorageException(StoreException):
'''During a SET operation the server returned CLIENT_ERROR. This is
probably due to too long of a key being used. Subclass of
:class:`StoreException`.'''
def __init__(self, message=None, results={}):
self.message = message
self.results = results
class CASRefreshFailure(CASFailure):
'''When trying to refresh a CAS from the memcached, the retrieved value
did not match the value sent with the last update. This may happen if
another process has updated the value. Subclass of
:class:`CASFailure`.'''
class NotFound(StoreException):
'''Item you are trying to store with a "cas" command does not exist..
Subclass of :class:`StoreException`.'''
class NonNumeric(StoreException):
'''The item you are trying to incr/decr is not numeric..
Subclass of :class:`StoreException`.'''
class RetrieveException(MemcachedException):
'''Base class for retrieve related exceptions. This is never raised
directly.. Subclass of :class:`MemcachedException`.'''
class NoValue(RetrieveException):
'''Server has no data associated with this key..
Subclass of :class:`RetrieveException`.'''
class ExceptionsAreMissesMapping(collections.MutableMapping):
'''A dictionary-like interface which swallows server exceptions.
This is a dictionary-like interface to memcache, but it swallows
server exceptions, except in the case of coding errors. This is
meant for situations where you want to keep the code simple, and
treat cache misses, server errors, and the like as cache misses.
See :ref:`ExceptionsAreMissesMapping Introduction
<exceptionsaremissesmapping-introduction>`
and :ref:`ExceptionsAreMissesMapping Examples
<exceptionsaremissesmapping-examples>` for more information.
'''
def __init__(self, servers, selector=None, hasher=None):
ret = super(ExceptionsAreMissesMapping, self).__init__()
self.memcache = Memcache(servers, selector, hasher)
return ret
def __getitem__(self, key):
try:
return self.memcache.get(key)
except (NoValue, ServerDisconnect, NotStored, NotFound, CASFailure):
raise KeyError(key)
def __setitem__(self, key, value):
try:
self.memcache.set(key, value)
except (ServerDisconnect, NotStored, NotFound, CASFailure):
pass
def __delitem__(self, key):
try:
self.memcache.delete(key)
return True
except (ServerDisconnect, NoAvailableServers, NotFound):
return False
def __iter__(self):
raise NotImplementedError()
def __len__(self):
items = 0
try:
for server_stats in self.memcache.stats():
items += server_stats.get('curr_items', 0)
except (ServerDisconnect, NoAvailableServers):
pass
return items
class ValueSuperStr(str):
'''Wrapper around Memcache value results.
This acts as a string normally, containing the value read from the
server. However, it is augmented with additional attributes representing
additional data received from the server: `flags`, `key`, and
`cas_unique` (which may be None if it was not requested from the server).
If this is constructed with the `memcache`
:py:class:`~memcache2.ServerConnection` instance, then additional
methods may be used to update the value via this object. If `cas_unique`
is given, these updates are done using the CAS value.
'''
def __new__(self, value, key, flags, cas_unique=None, memcache=None):
'''Instantiate new instance.
:param value: The memcache `value`, which is the value of this
class when treated like a string.
:type value: str
:param key: The `key` associated with the `value` retrieved.
:type key: str
:param flags: `flags` associated with the `value` retrieved.
:type flags: int
:param cas_unique: The `cas_unique` value, if it was queried, or
None if no CAS information was retrieved.
:type cas_unique: int
:param memcache: The memcache server instance, used for future
operations on this key.
:type memcache: :py:class:`~memcache2.ServerConnection`
:returns: :py:class:`~memcached2.ValueSuperStr` instance
'''
data = super(ValueSuperStr, self).__new__(self, value)
data.key = key
data.flags = flags
data.cas_unique = cas_unique
data.cas_unavailable = False
data.memcache = memcache
return data
def set(self, value, flags=0, exptime=0, update_cas=False):
'''Update the value in the server.
The key that was used to retrieve this value is updated in the server.
If the value was retrieved from the server with `get_cas` enabled,
then this will update using the CAS.
:param update_cas: If `True`, a `get()` will be done after the
`set()`, and if the result is the same as what we set,
update the CAS value in this object. This is so that multiple
updates can be done with CAS set. This may result in a
:py:exc:`~memcached2.CASRefreshFailure`.
:type update_cas: boolean
See :py:method:`~memcache2.Memcache.set` for more information.
.. note::
This does not update this object's value.
.. note::
If this object was retrieved with `get_cas` set, then multiple
updates will trigger a :py:exception:`~memcache2.CASFailure`
unless `update_cas` is used.
:raises: :py:exc:`~memcached2.CASFailure`,
:py:exc:`~memcached2.CASRefreshFailure`
'''
if self.cas_unique:
if self.cas_unavailable:
raise CASFailure('CAS value already consumed.')
self.cas_unavailable = True
retval = self.memcache.set(
self.key, value, flags=flags, exptime=exptime,
cas_unique=self.cas_unique)
if self.cas_unique and update_cas:
result = self.memcache.get(self.key)
if result != value:
raise CASRefreshFailure(
'Value from server changed during CAS refresh')
self.cas_unique = result.cas_unique
self.cas_unavailable = False
return retval
def append(self, value):
'''Append `value` to the data stored for this key.
See :py:method:`~memcache2.Memcache.append` for more information.
.. note::
This does not update this object's value.
:raises: :py:exc:`~memcached2.CASFailure`
'''
if self.cas_unique is not None:
raise CASFailure('Not supported with CAS')
return self.memcache.append(self.key, value)
def prepend(self, value):
'''Prepend `value` to the data stored for this key.
See :py:method:`~memcache2.Memcache.prepend` for more information.
.. note::
This does not update this object's value.
:raises: :py:exc:`~memcached2.CASFailure`
'''
if self.cas_unique is not None:
raise CASFailure('Not supported with CAS')
return self.memcache.prepend(self.key, value)
def incr(self, value=1):
'''Increment the value for this key.
See :py:method:`~memcache2.Memcache.incr` for more information.
.. note::
This does not update this object's value.
:raises: :py:exc:`~memcached2.CASFailure`
'''
if self.cas_unique is not None:
raise CASFailure('Not supported with CAS')
return self.memcache.incr(self.key, value)
def decr(self, value=1):
'''Decrement the value for this key.
See :py:method:`~memcache2.Memcache.decr` for more information.
.. note::
This does not update this object's value.
:raises: :py:exc:`~memcached2.CASFailure`
'''
if self.cas_unique is not None:
raise CASFailure('Not supported with CAS')
return self.memcache.decr(self.key, value)
def delete(self):
'''Remove this key from the server.
See :py:method:`~memcache2.Memcache.delete` for more information.
:raises: :py:exc:`~memcached2.CASFailure`
'''
if self.cas_unique is not None:
raise CASFailure('Not supported with CAS')
return self.memcache.delete(self.key)
def delete_all(self):
'''Remove this key from all of the the servers.
See :py:method:`~memcache2.Memcache.delete_all` for more information.
:raises: :py:exc:`~memcached2.CASFailure`
'''
if self.cas_unique is not None:
raise CASFailure('Not supported with CAS')
return self.memcache.delete_all(self.key)
def touch(self, exptime):
'''Update the expiration time on an item.
See :py:method:`~memcache2.Memcache.touch` for more information.
:raises: :py:exc:`~memcached2.CASFailure`
'''
if self.cas_unique is not None:
raise CASFailure('Not supported with CAS')
return self.memcache.touch(self.key, exptime)
class ValueDictionary(dict):
'''Encode the response as a dictionary.
This is a simple dictionary of the result data from the memcache
server, including keys: "key", "value", "flags", and "cas_unique".
This is a way of getting additional data from the memcache server
for use in things like CAS updates.
'''
def __init__(self, value, key, flags, cas_unique=None, memcache=None):
'''Instantiate new instance.
:param value: The memcache `value`, which is the value of this
class when treated like a string.
:type value: str
:param key: The `key` associated with the `value` retrieved.
:type key: str
:param flags: `flags` associated with the `value` retrieved.
:type flags: int
:param cas_unique: The `cas_unique` value, if it was queried, or
None if no CAS information was retrieved.
:type cas_unique: int
:param memcache: The memcache server instance, used for future
operations on this key.
:type memcache: :py:class:`~memcache2.ServerConnection`
:returns: :py:class:`~memcached2.ValueSuperStr` instance
'''
super(ValueDictionary, self).__init__(
[
['key', key],
['value', value],
['flags', flags],
['cas_unique', cas_unique]
])
class HasherBase:
'''Turn memcache keys into hashes, for use in server selection.
Normally, the python-memcached2 classes will automatically select a
hasher to use. However, for special circumstances you may wish to
use a different hasher or develop your own.
This is an abstract base class, here largely for documentation purposes.
Hasher sub-classes such as :py:class:`~memcached2.HasherZero` and
:py:class:`~memcached2.HasherCMemcache`, implement a `hash` method
which does all the work.
See :py:func:`~memcached2.HasherBase.hash` for details of implementing
a subclass.
'''
def hash(self, key):
'''Hash a key into a number.
This must persistently turn a string into the same value. That value
is used to determine which server to use for this key.
:param key: memcache key
:type key: str
:returns: int -- Hashed version of `key`.
'''
raise NotImplementedError('This class is only meant to be subclassed')
class HasherZero(HasherBase):
'''Hasher that always returns 0, useful only for
:py:class:`~memcached2.SelectorFirst`.'''
def hash(self, key):
'''See :py:func:`memcached2.HasherBase.hash` for details of
this function.
'''
return 0
class HasherCMemcache(HasherBase):
'''Hasher compatible with the C memcache hash function.'''
def hash(self, key):
'''See :py:func:`memcached2.HasherBase.hash` for details of
this function.
'''
key = _to_bytes(key)
return ((((crc32(key) & 0xffffffff) >> 16) & 0x7fff) or 1)
class ServerPool:
'''A pool of servers that connections can be checked in/out from.
Implements a thread-safe pool for storing server connections. This
allows only the active users of sockets to be holding connections.
Fewer connections may be needed because of this.
'''
def __init__(self, reconnector=None):
'''Initialize ServerPool instance.
:param reconnector: The reconnector instance used by the pool
to determine when a re-connection to a server will be
attempted. If `None`, a default
:py:class:`~memcached2.ReconnectorTime` will be created.
:type reconnector: :py:class:`~memcached2.ReconnectorBase`
'''
self.server_pools = {}
if reconnector is None:
reconnector = ReconnectorTime()
self.reconnector = reconnector
def _add(self, server_uri):
'''INTERNAL: Add the specified server to the pool list.
'''
if server_uri in self.server_pools:
return
self.server_pools[server_uri] = queue.Queue()
def is_available(self, server_uri):
'''Is the server available for use. This checks internal state to
determine if a server should try to be used.
:param server_uri: The URI for the server to check.
:type server_uri: str
:returns: boolean -- True if the server is available for use.
'''
self._add(server_uri)
if not self.server_pools[server_uri].empty():
return True
return self.reconnector.connectable(server_uri)
def get(self, server_uri):
'''Retrieve a server for use. Either pulling it from the queue or
making a new connection object.
:param server_uri: The URI for the server we need a connection to.
:type server_uri: str
:returns: ServerConnection -- Server connection to use.
'''
self._add(server_uri)
pool = self.server_pools[server_uri]
if not pool.empty():
try:
return pool.get_nowait()
except queue.Empty:
pass
server = ServerConnection(server_uri)
return server
def put(self, connection):
'''Return a connection to the pool.
:param connection: Connection to return to the queue.
:type connection: :py:class:`~memcache2.ServerConnection`
'''
if connection.buffer:
raise ValueError('Socket returned to pool with data waiting')
server_uri = connection.uri
self._add(server_uri)
self.server_pools[server_uri].put(connection)
def empty(self):
'''Release all the server connections currently in the pool.'''
for pool in self.server_pools.values():
if pool.empty():
continue
try:
connection = pool.get_nowait()
connection.reset()
except queue.Empty:
pass
def __del__(self):
self.empty()
class SelectorBase:
'''Select which server to use.
These classes implement a variety of algorithms for determining which
server to use, based on the key being stored.
The selection is done based on a `key_hash`, as returned by the
:py:func:`memcached2.HasherBase.hash` function.
Normally, the python-memcached2 classes will automatically pick a
selector to use. However, for special circumstances you may wish to
use a specific Selector or develop your own.
This is an abstract base class, here largely for documentation purposes.
Selector sub-classes such as :py:class:`~memcached2.SelectorFirst` and
:py:class:`~memcached2.SelectorRehashDownServers`, implement a `select`
method which does all the work.
See :py:func:`~memcached2.SelectorBase.select` for details of implementing
a subclass.
'''
def select(self, server_uri_list, hasher, key, server_pool):
'''Select a server bashed on the `key_hash`.
Given the list of servers and a hash of of key, determine which
of the servers this key is associated with on.
:param server_uri_list: A list of the server URIs to select among.
:type server_uri_list: list of server URIs
:param hasher: Hasher function, such as
:py:func:`memcache2.HasherBase.hash`.
:type hasher: :py:func:`memcache2.HasherBase.hash`.
:param key: The key to hash.
:type key: str
:param server_pool: (None) A server connection pool. If not
specified, a global pool is used.
:type server_pool: :py:class:`~memcache2.ServerPool` object.
:returns: string -- The server_uri to use.
:raises: :py:exc:`~memcached2.NoAvailableServers`
'''
raise NotImplementedError('This class is only meant to be subclassed')
class SelectorFirst(SelectorBase):
'''Server selector that only returns the first server. Useful when there
is only one server to select amongst.
'''
def select(self, server_uri_list, hasher, key, server_pool):
'''See :py:func:`memcached2.SelectorBase.select` for details of
this function.
'''
server_uri = server_uri_list[0]
if server_pool.is_available(server_uri):
return server_uri
raise NoAvailableServers()
class SelectorRehashDownServers(SelectorBase):
'''Select a server, if it is down re-hash up to `hashing_retries` times.
This was the default in the original python-memcached module. If the
server that a key is housed on is down, it will re-hash the key after
adding an (ASCII) number of tries to the key and try that server.
This is most suitable if you want to inter-operate with the old
python-memcache client.
If no up server is found after `hashing_retries` attempts,
:py:exc:`memcached2.NoAvailableServers` is raised.
'''
def __init__(self, hashing_retries=10):
'''
:param hashing_retries: Retry hashing the key looking for another
server this many times.
:type hashing_retries: int
'''
self.hashing_retries = hashing_retries
def select(self, server_uri_list, hasher, key, server_pool):
'''See :py:func:`memcached2.SelectorBase.select` for details of
this function.
'''
server_uri = server_uri_list[hasher(key) % len(server_uri_list)]
if server_pool.is_available(server_uri):
return server_uri
for i in range(self.hashing_retries):
server_uri = server_uri_list[
hasher(key + str(i)) % len(server_uri_list)]
if server_pool.is_available(server_uri):
return server_uri
raise NoAvailableServers()
class SelectorFractalSharding(SelectorBase):
'''On a down server, re-partition that servers keyspace to other servers.
This uses an algorithm that basically maps every key in the keyspace to
a list of the servers that will answer queries for it. The first
available server in that list will be used. The list is such that
the keys that map to a server when it is up will get distributed across
other servers evenly, stabally, and predictably.
I called it Fractal because when a server is down you dig deeper and see a
new level of complexity in the keyspace mapping.
'''
def select(self, server_uri_list, hasher, key, server_pool):
'''See :py:func:`memcached2.SelectorBase.select` for details of
this function.
'''
key_hash = hasher(key)
remaining_uris = server_uri_list[:]
for i in range(len(server_uri_list)):
position = key_hash % len(remaining_uris)
server_uri = server_uri_list[position]
if server_pool.is_available(server_uri):
return server_uri
del(remaining_uris[position])
raise NoAvailableServers()
class SelectorConsistentHashing(SelectorBase):
'''Predictably select a server, even if its normal server is down.
This implements the Consistent Hash algorithm as
http://en.wikipedia.org/wiki/Consistent_hashing
This is done by splitting the key-space up into a number of buckets
(more than the number of servers but probably no more than the
number of servers squared). See Wikipedia for details on how this
algorithm operates.
The downside of this mechanism is that it requires building a fairly
large table at startup, so it is not suited to short lived code.
It also is fairly expensive to add and remove servers from the pool
(not implemented in this code). Note that it is NOT expensive to
fail a server, only to completely remove it.
'''
def __init__(self, total_buckets=None):
'''
:param total_buckets: How many buckets to create. Smaller values
decrease the startup overhead, but also mean that a down
server will tend to not evenly redistribute it's load across
other servers. The default value of None means the default
value of the number of servers squared.
:type total_buckets: int
'''
self.total_buckets = None
self.buckets = None
def _initialize_buckets(self, server_uri_list, hasher):
'''Create the consistent-hashing set of buckets, used for determining
what server to use.'''
if self.buckets is not None:
return
len_uri_list = len(server_uri_list)
if not self.total_buckets:
self.total_buckets = len_uri_list * len_uri_list
bucket_dict = {}
for i in range(self.total_buckets):
bucket_dict[hasher(str(i))] = i % len_uri_list
self.buckets = sorted(bucket_dict.items())
def select(self, server_uri_list, hasher, key, server_pool):
'''See :py:func:`memcached2.SelectorBase.select` for details of
this function.
'''
if self.buckets is None:
self._initialize_buckets(server_uri_list, hasher)
hashed_key = hasher(key)
offset = bisect(self.buckets, (hashed_key, 0))
len_buckets = len(self.buckets)
len_uri_list = len(server_uri_list)
already_tried_uris = set()
for i in range(len_buckets):
if len(already_tried_uris) == len_uri_list:
raise NoAvailableServers()
bucket_offset = (offset + i) % len_buckets
# the last bucket covers things up to the first bucket
if bucket_offset == 0 and hashed_key < self.buckets[0][0]:
bucket = self.buckets[-1]
else:
bucket = self.buckets[bucket_offset]
uri_offset = bucket[1]
server_uri = server_uri_list[uri_offset]
if uri_offset in already_tried_uris:
continue
already_tried_uris.update([uri_offset])
if server_pool.is_available(server_uri):
return server_uri
raise NoAvailableServers()
class ReconnectorBase:
'''Track server problems and determine when to reconnect.
This is a base class for classes that handle reconnecting to down
servers. This tracks down servers have been having problems and
determining when to defer connecting and when to retry.
The Reconnector tracks problems with servers and defers connections
when it's been having problems.
'''
def __init__(self):
pass
def connectable(self, server_url):
'''Is the specified server connectable?
:returns: boolean -- Returns whether a connection should be initiated.
:param server_url: Server to consult about connectability.
:type server_url: string: URL of server.
'''
raise NotImplementedError()
def had_error(self, server_url):
'''Called to report to the reconnector when there is an error on
a server.
:param server_url: Server related to this report.
:type server_url: string: URL of server.
'''
raise NotImplementedError()
def had_success(self, server_url):
'''Called when a successful communication occurs with a server.
:param server_url: Server related to this report.
:type server_url: string: URL of server.
'''
raise NotImplementedError()
class ReconnectorSimple(ReconnectorBase):
'''This is a simple reconnector that immediately tries connections on
down servers.
'''
def __init__(self):
ReconnectorBase.__init__(self)
self.server_data = {}
def connectable(self, server_url):
'''See :py:func:`memcached2.ReconnectorBase.connectable` for
details of this function.
'''
return True
def had_error(self, server_url):
'''See :py:func:`memcached2.ReconnectorBase.error` for details of
this function.
'''
pass
def had_success(self, server_url):
'''See :py:func:`memcached2.ReconnectorBase.had_success` for
details of this function.
'''
pass
class ReconnectorTime(ReconnectorBase):
'''Try reconnecting to a server after a specific number of seconds
since the last failed operation.
'''
def __init__(self, timeout=30):
'''
:param timeout: Number of seconds since last error.
:type timeout: float
'''
ReconnectorBase.__init__(self)
self.timeout = timeout
self.last_error = {}
def connectable(self, server_url):
'''See :py:func:`memcached2.ReconnectorBase.connectable` for
details of this function.
'''
last_error = self.last_error.get(server_url)
if last_error is None:
return True
now = time.time()
if now - last_error >= self.timeout:
self.had_success(server_url)
return True
return False