-
Notifications
You must be signed in to change notification settings - Fork 367
Expand file tree
/
Copy pathutils.py
More file actions
399 lines (324 loc) · 14.2 KB
/
utils.py
File metadata and controls
399 lines (324 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import math
import operator
import numpy as np
from nevergrad.parametrization import parameter as p
from nevergrad.parametrization.utils import float_penalty as _float_penalty
import nevergrad.common.typing as tp
from nevergrad.common.tools import OrderedSet
class MultiValue:
"""Estimation of a value based on one or multiple evaluations.
This class provides easy access to:
- count: how many times the point was evaluated
- mean: the mean value.
- square: the mean square value
- variance: the variance
- parameter: the corresponding Parameter
It also provides access to optimistic and pessimistic bounds for the value.
Parameter
---------
parameter: Parameter
the parameter for one of the evaluations
y: float
the first evaluation of the value
"""
def __init__(self, parameter: p.Parameter, y: float, *, reference: p.Parameter) -> None:
self.count = 1
self.mean = y
self._minimum = y
self.square = y * y
# TODO May be safer to use a default variance which depends on y for scale invariance?
self.variance = 1.0e6
parameter.freeze()
self.parameter = parameter
self._ref = reference
@property
def x(self) -> np.ndarray: # for compatibility
return self.parameter.get_standardized_data(reference=self._ref)
@property
def optimistic_confidence_bound(self) -> float:
return float(self.mean - 0.1 * np.sqrt((self.variance) / (1 + self.count)))
@property
def pessimistic_confidence_bound(self) -> float:
return float(self.mean + 0.1 * np.sqrt((self.variance) / (1 + self.count)))
def get_estimation(self, name: str) -> float:
# Note: pruning below relies on the fact than only 3 modes exist. If a new mode is added, update pruning
if name == "optimistic":
return self.optimistic_confidence_bound
elif name == "pessimistic":
return self.pessimistic_confidence_bound
elif name == "minimum":
return self._minimum
elif name == "average":
return self.mean
else:
raise NotImplementedError
def add_evaluation(self, y: float) -> None:
"""Adds a new evaluation of the value
Parameter
---------
y: float
the new evaluation
"""
if y > 1e10:
y = 1e10
elif y < -1e10:
y = -1e10
self._minimum = min(self._minimum, y)
ratio = self.count / (self.count + 1)
self.mean = ratio * self.mean + (y / float(self.count + 1))
self.square = (self.count * self.square + y * y) / float(self.count + 1)
self.count += 1
try:
self.square = max(self.square, self.mean**2)
factor = math.sqrt(float(self.count) / float(self.count - 1.0))
self.variance = factor * (self.square - self.mean**2)
except OverflowError:
self.variance = 1e10
def as_array(self, reference: p.Parameter) -> np.ndarray:
return self.parameter.get_standardized_data(reference=reference)
def __repr__(self) -> str:
return f"MultiValue<mean: {self.mean}, count: {self.count}, parameter: {self.parameter}>"
def _get_nash(optimizer: tp.Any) -> tp.List[tp.Tuple[tp.Tuple[float, ...], int]]:
"""Returns an empirical distribution. limited using a threshold
equal to max_num_trials^(1/4).
"""
if not optimizer.archive:
return [(optimizer.current_bests["pessimistic"].x, 1)]
max_num_trial = max(p.count for p in optimizer.archive.values())
sum_num_trial = sum(p.count for p in optimizer.archive.values())
threshold = np.power(max_num_trial, 0.5)
if threshold <= np.power(sum_num_trial, 0.25):
return [(optimizer.provide_recommendation(), 1)]
# make deterministic at the price of sort complexity
return sorted(
((np.frombuffer(k), p.count) for k, p in optimizer.archive.bytesdict.items() if p.count >= threshold), # type: ignore
key=operator.itemgetter(1),
)
def sample_nash(optimizer: tp.Any) -> tp.Tuple[float, ...]: # Somehow like fictitious play.
nash = _get_nash(optimizer)
if len(nash) == 1:
return nash[0][0]
prob = [float(n[1]) for n in nash]
prob = [p_ / sum(prob) for p_ in prob]
index: int = np.random.choice(np.arange(len(prob)), p=prob)
return nash[index][0]
class DelayedJob:
"""Future-like object which delays computation"""
def __init__(self, func: tp.Callable[..., tp.Any], *args: tp.Any, **kwargs: tp.Any) -> None:
self.func = func
self.args = args
self.kwargs = kwargs
self._result: tp.Optional[tp.Any] = None
self._computed = False
def done(self) -> bool:
return True
def result(self) -> tp.Any:
if not self._computed:
self._result = self.func(*self.args, **self.kwargs)
self._computed = True
return self._result
class SequentialExecutor:
"""Executor which run sequentially and locally
(just calls the function and returns a FinishedJob)
"""
def submit(self, fn: tp.Callable[..., tp.Any], *args: tp.Any, **kwargs: tp.Any) -> DelayedJob:
return DelayedJob(fn, *args, **kwargs)
def _tobytes(x: tp.ArrayLike) -> bytes:
x = np.asarray(x) # for compatibility
assert x.ndim == 1, f"Input shape: {x.shape}"
assert x.dtype == np.float64, f"Incorrect type {x.dtype} is not float"
return x.tobytes()
_ERROR_STR = (
"Generating numpy arrays from the bytes keys is inefficient, "
"work on archive.bytesdict.<keys,items>() directly and convert with "
"np.frombuffer if you can. You can also use archive.<keys,items>_as_arrays() "
"but it is less efficient."
)
Y = tp.TypeVar("Y")
class Archive(tp.Generic[Y]):
"""A dict-like object with numpy arrays as keys.
The underlying `bytesdict` dict stores the arrays as bytes since arrays are not hashable.
Keys can be converted back with np.frombuffer(key)
"""
def __init__(self) -> None:
self.bytesdict: tp.Dict[bytes, Y] = {}
def __setitem__(self, x: tp.ArrayLike, value: Y) -> None:
self.bytesdict[_tobytes(x)] = value
def __getitem__(self, x: tp.ArrayLike) -> Y:
return self.bytesdict[_tobytes(x)]
def __contains__(self, x: tp.ArrayLike) -> bool:
return _tobytes(x) in self.bytesdict
def get(self, x: tp.ArrayLike, default: tp.Optional[Y] = None) -> tp.Optional[Y]:
return self.bytesdict.get(_tobytes(x), default)
def __len__(self) -> int:
return len(self.bytesdict)
def values(self) -> tp.ValuesView[Y]:
return self.bytesdict.values()
def keys(self) -> None:
raise RuntimeError(_ERROR_STR)
def items(self) -> None:
raise RuntimeError(_ERROR_STR)
def items_as_array(self) -> tp.Iterator[tp.Tuple[np.ndarray, Y]]:
raise RuntimeError("For consistency, items_as_array is renamed to items_as_arrays")
def items_as_arrays(self) -> tp.Iterator[tp.Tuple[np.ndarray, Y]]:
"""Functions that iterates on key-values but transforms keys
to np.ndarray. This is to simplify interactions, but should not
be used in an algorithm since the conversion can be inefficient.
Prefer using self.bytesdict.items() directly, and convert the bytes
to np.ndarray using np.frombuffer(b)
"""
return ((np.frombuffer(b), v) for b, v in self.bytesdict.items())
def keys_as_array(self) -> tp.Iterator[np.ndarray]:
raise RuntimeError("For consistency, keys_as_array is renamed to keys_as_arrays")
def keys_as_arrays(self) -> tp.Iterator[np.ndarray]:
"""Functions that iterates on keys but transforms them
to np.ndarray. This is to simplify interactions, but should not
be used in an algorithm since the conversion can be inefficient.
Prefer using self.bytesdict.keys() directly, and convert the bytes
to np.ndarray using np.frombuffer(b)
"""
return (np.frombuffer(b) for b in self.bytesdict)
def __repr__(self) -> str:
return f"Archive with bytesdict: {self.bytesdict!r}"
def __str__(self) -> str:
return f"Archive with bytesdict: {self.bytesdict}"
def __iter__(self) -> None:
raise RuntimeError(_ERROR_STR)
class Pruning:
"""Callable for pruning archives in the optimizer class.
See Optimizer.pruning attribute, called at each "tell".
Parameters
----------
min_len: int
minimum length of the pruned archive.
max_len: int
length at which pruning is activated (maximum allowed length for the archive).
Note
----
For each of the 3 criteria (optimistic, pessimistic and average), the min_len best (lowest)
points will be kept, which can lead to at most 3 * min_len points.
"""
def __init__(self, min_len: int, max_len: int):
self.min_len = min_len
self.max_len = max_len
self._num_prunings = 0 # for testing it is not called too often
def __call__(self, archive: Archive[MultiValue]) -> Archive[MultiValue]:
if len(archive) < self.max_len:
return archive
return self._prune(archive)
def _prune(self, archive: Archive[MultiValue]) -> Archive[MultiValue]:
self._num_prunings += 1
# separate function to ease profiling
quantiles: tp.Dict[str, float] = {}
threshold = float(self.min_len + 1) / len(archive)
names = ["optimistic", "pessimistic", "average"]
for name in names:
quantiles[name] = np.quantile( # type: ignore
[v.get_estimation(name) for v in archive.values()], threshold, method="lower"
)
new_archive: Archive[MultiValue] = Archive()
new_archive.bytesdict = {
b: v
for b, v in archive.bytesdict.items()
if any(v.get_estimation(n) < quantiles[n] for n in names)
} # strict comparison to make sure we prune even for values repeated maaany times
# this may remove all points though, but nevermind for now
return new_archive
@classmethod
def sensible_default(cls, num_workers: int, dimension: int) -> "Pruning":
"""Very conservative pruning
- keep at least 100 elements, or 7 times num_workers, whatever is biggest
- keep at least 3 x min_len, or up to 10 x min_len if it does not exceed 1gb of data
Parameters
----------
num_workers: int
number of evaluations which will be run in parallel at once
dimension: int
dimension of the optimization space
"""
# safer to keep at least 7 time the workers
min_len = max(100, 7 * num_workers)
max_len_1gb = 1024**3 // (dimension * 8 * 2) # stored twice: as key and as Parameter
max_len = max(3 * min_len, min(10 * min_len, max_len_1gb))
return cls(min_len, max_len)
class UidQueue:
"""Queue of uids to handle a population. This keeps track of:
- told uids
- asked uids
When telling, it removes from the asked queue and adds to the told queue
When asking, it takes from the told queue if not empty, else from the older
asked, and then adds to the asked queue.
"""
def __init__(self) -> None:
self.told = tp.Deque[str]() # this seems to be picklable (this syntax does not always work)
self.asked: OrderedSet[str] = OrderedSet()
def clear(self) -> None:
"""Removes all uids from the queues"""
self.told.clear()
self.asked.clear()
def ask(self) -> str:
"""Takes a uid from the told queue if not empty, else from the older asked,
then adds it to the asked queue.
"""
if self.told:
uid = self.told.popleft()
elif self.asked:
uid = next(iter(self.asked))
else:
raise RuntimeError("Both asked and told queues are empty.")
self.asked.add(uid)
return uid
def tell(self, uid: str) -> None:
"""Removes the uid from the asked queue and adds to the told queue"""
self.told.append(uid)
if uid in self.asked:
self.asked.discard(uid)
def discard(self, uid: str) -> None:
if uid in self.asked:
self.asked.discard(uid)
else:
self.told.remove(uid)
class ConstraintManager:
"""Try max_constraints_trials random explorations for satisfying constraints.
The finally chosen point, if it does not satisfy constraints, is penalized as shown in the penalty function,
using coeffcieints mentioned here.
Possibly unstable.
"""
def __init__(self) -> None:
self.max_trials = 1000
self.penalty_factor = 1.0
self.penalty_exponent = 1.001
def __repr__(self) -> str:
return "Constraints:" + ",".join(f"{x}={y}" for x, y in self.__dict__.items())
# pylint: disable=unused-argument
def update(
self,
max_trials: tp.Optional[int] = None,
penalty_factor: tp.Optional[float] = None,
penalty_exponent: tp.Optional[float] = None,
) -> None:
"""
Parameters
----------
max_trials: int
number of random tries for satisfying constraints.
penalty: float
multiplicative factor on the constraint penalization.
penalty_exponent: float
exponent, usually close to 1 and slightly greater than 1.
"""
for x, y in locals().items():
if y is not None and x != "self" and not x.startswith("_"):
setattr(self, x, y)
if self.penalty_exponent < 1:
raise ValueError("Penalty exponent needs to be equal or greater than 1")
def penalty(self, parameter: p.Parameter, num_ask: int, budget: tp.Optional[int]) -> float:
"""Computes the penalty associated with a Parameter, for constraint management"""
budget = 1 if budget is None else budget
coeff = self.penalty_factor * (self.penalty_exponent ** (num_ask / np.sqrt(budget)))
val = parameter.value
return coeff * sum(_float_penalty(func(val)) for func in parameter._constraint_checkers) # type: ignore