[BEAM-2724] Preparing statesampler to work with structured names#3786
[BEAM-2724] Preparing statesampler to work with structured names#3786pabloem wants to merge 7 commits intoapache:masterfrom
Conversation
|
Note: This PR is not yet ready for review and merge. |
fc6a119 to
9e6f3c0
Compare
|
r: @charlesccychen The new io_target argument allows states to track time spent in IO such as side inputs, shuffle and state. Tests have passed , and the latest commit only updates documentation. |
| stage_name=self.prefix, | ||
| step_name=step_name, | ||
| io_target=io_target) | ||
| scoped_state = self.scoped_states_by_name.get(counter_name, None) |
There was a problem hiding this comment.
We are conflating the type of what can be used as keys in self.scoped_states_by_name. Previously, this was just strings, but now, you are using this CounterName object, which doesn't seem correct. The CounterName class doesn't define __hash__() or even __cmp__() / __eq__(), so it is possible for you to create two semantically identical CounterName objects which do not compare as equal, and are not considered the same key by the self.scoped_states_by_name dict object.
This is related to the comment above noting that sometimes counter_name in this code is a CounterName, and at other times, it is a string. We should make this code unambiguous in this respect.
There was a problem hiding this comment.
The CounterName class is a subclass of a namedtuple (which, itself contains only strings or other namedtuples) - therefore it does implement the __hash__, __eq__, and __cmp__. So two objects that are semantically equal will hash to the same value. So this should be safe.
Does that make sense? What do you think?
| if state_name is None: | ||
| # If state_name is None, the worker is still using old style | ||
| # msec counters. | ||
| counter_name = '%s-%s-msecs' % (self.prefix, step_name) |
There was a problem hiding this comment.
Shouldn't this be '%s%s-msecs' to preserve the old behavior?
Also, there is some confusion in this code, in this legacy case where only step_name is provided, in what is used to index into self.scoped_states_by_name (i.e., is it step_name as you do on line 203, or is it counter_name, as done on line 223?).
It is also a little messy that counter_name can be either a string (this branch) or a CounterName object (the else branch)...
There was a problem hiding this comment.
-
You can see on line 117 that we're chopping off the dash from prefixes, so now we add the dash here. I'm doing this to add the dash-free prefix to the
CounterStructuredNamethat's reported to the service. This is only necessary while the clients stop using the legacy path. -
You are completely right about the confusion. Good catch. I've updated the code to reflect this.
-
I understand. This is temporary while all the clients start providing state_name. This should take less than a couple weeks once this change is pulled in - and then the legacy path can be removed fully.
| def scoped_state(self, name): | ||
| """Returns a context manager managing transitions for a given state.""" | ||
| cdef ScopedState scoped_state = self.scoped_states_by_name.get(name, None) | ||
| def scoped_state(self, step_name, state_name=None, io_target=None): |
There was a problem hiding this comment.
Please add TODO to make state_name a required parameters after all callers have migrated.
| counter_name = '%s-%s-msecs' % (self.prefix, step_name) | ||
| scoped_state = self.scoped_states_by_name.get(step_name, None) | ||
| else: | ||
| counter_name = CounterName(state_name+'-msecs', |
There was a problem hiding this comment.
nit: space before and after +.
| scoped_state.nsecs = 0 | ||
| pythread.PyThread_release_lock(self.lock) | ||
| self.scoped_states_by_name[name] = scoped_state | ||
| self.scoped_states_by_name[counter_name] = scoped_state |
There was a problem hiding this comment.
Please see above comments regarding the actual type of counter_name.
pabloem
left a comment
There was a problem hiding this comment.
Comments addressed. Thanks for your time. Let me know what you think.
| def scoped_state(self, name): | ||
| """Returns a context manager managing transitions for a given state.""" | ||
| cdef ScopedState scoped_state = self.scoped_states_by_name.get(name, None) | ||
| def scoped_state(self, step_name, state_name=None, io_target=None): |
| if state_name is None: | ||
| # If state_name is None, the worker is still using old style | ||
| # msec counters. | ||
| counter_name = '%s-%s-msecs' % (self.prefix, step_name) |
There was a problem hiding this comment.
-
You can see on line 117 that we're chopping off the dash from prefixes, so now we add the dash here. I'm doing this to add the dash-free prefix to the
CounterStructuredNamethat's reported to the service. This is only necessary while the clients stop using the legacy path. -
You are completely right about the confusion. Good catch. I've updated the code to reflect this.
-
I understand. This is temporary while all the clients start providing state_name. This should take less than a couple weeks once this change is pulled in - and then the legacy path can be removed fully.
| counter_name = '%s-%s-msecs' % (self.prefix, step_name) | ||
| scoped_state = self.scoped_states_by_name.get(step_name, None) | ||
| else: | ||
| counter_name = CounterName(state_name+'-msecs', |
| stage_name=self.prefix, | ||
| step_name=step_name, | ||
| io_target=io_target) | ||
| scoped_state = self.scoped_states_by_name.get(counter_name, None) |
There was a problem hiding this comment.
The CounterName class is a subclass of a namedtuple (which, itself contains only strings or other namedtuples) - therefore it does implement the __hash__, __eq__, and __cmp__. So two objects that are semantically equal will hash to the same value. So this should be safe.
Does that make sense? What do you think?
| scoped_state.nsecs = 0 | ||
| pythread.PyThread_release_lock(self.lock) | ||
| self.scoped_states_by_name[name] = scoped_state | ||
| self.scoped_states_by_name[counter_name] = scoped_state |
charlesccychen
left a comment
There was a problem hiding this comment.
Thanks, this LGTM after nit.
| def scoped_state(self, name): | ||
| """Returns a context manager managing transitions for a given state.""" | ||
| cdef ScopedState scoped_state = self.scoped_states_by_name.get(name, None) | ||
| # TODO(pabloem) - Make state_name required once all callers migrate, |
There was a problem hiding this comment.
nit: Can you change this to follow the project style of # TODO(XXX): YYY?
| stage_name=self.prefix, | ||
| step_name=step_name, | ||
| io_target=io_target) | ||
| scoped_state = self.scoped_states_by_name.get(counter_name, None) |
There was a problem hiding this comment.
pabloem wrote:
TheCounterNameclass is a subclass of a namedtuple (which, itself contains only strings or other namedtuples) - therefore it does implement the__hash__,__eq__, and__cmp__. So two objects that are semantically equal will hash to the same value. So this should be safe.Does that make sense? What do you think?
Thanks! I missed the inheritance--this makes sense. It will be much less confusing once we remove the first branch so that we can say that self.scoped_states_by_name is always keyed by CounterName.
|
R: @bjchambers for merge |
8cf66aa to
1454095
Compare
1454095 to
29829fe
Compare
|
Run Python PostCommit |
|
jenkins: retest this please |
No description provided.