[Metrics] Cache metrics ports in a file at each node#13501
[Metrics] Cache metrics ports in a file at each node#13501rkooo567 merged 5 commits intoray-project:masterfrom
Conversation
| "ports_by_node.json") | ||
|
|
||
| # Maps a Node.unique_id to a dict that maps port names to port numbers. | ||
| ports_by_node: Dict[str, Dict[str, int]] = defaultdict(dict) |
There was a problem hiding this comment.
This syntax is not available in python 3.6 I think?
There was a problem hiding this comment.
My Dev environment is 3.6 so I think it's okay!
There was a problem hiding this comment.
Hmm not sure what was the issue before, but I definitely remember this caused some issues... Let's see if CI says something
python/ray/node.py
Outdated
| self._metrics_export_port = self._get_cached_port( | ||
| "metrics_export_port", default_port=ray_params.metrics_export_port) | ||
|
|
||
| self._metrics_export_port = ray_params.metrics_export_port |
There was a problem hiding this comment.
We don't need this block right?
There was a problem hiding this comment.
Oh, didn't we say that whatever approach we do for metrics_agent_port we should do for metrics_export_port too? I'm not fully familiar with the second one yet so it's your call. Also, I'll do the rename of metrics_agent to dashboard_agent in a separate PR so that this diff is easier to read.
There was a problem hiding this comment.
You already assigned a port of this in line 190 right? Am I missing something?
There was a problem hiding this comment.
Line 193-195 will make line 190 meaningless IIUC?
There was a problem hiding this comment.
Ah I see, you're totally right, I forgot to remove lines 193-195. I'll do that
|
|
|
Yeah, something is not working as expected in either import ray
from ray.cluster_utils import Cluster
from ray.util.metrics import Count
from ray.test_utils import wait_for_condition, fetch_prometheus
num_iterations = 2
counters = [None] * num_iterations
for i in range(num_iterations):
# Start a cluster, record and test a metric, and shut it down.
print("STARTING UP")
cluster = Cluster()
cluster.add_node()
cluster.wait_for_nodes()
ray.init(address=cluster.address)
counter_name = f"test_driver_counter_{i}"
counters[i] = Count(counter_name, description="desc")
counters[i].record(i)
def test():
node_info_list = ray.nodes()
prom_addresses = []
for node_info in node_info_list:
metrics_export_port = node_info["MetricsExportPort"]
addr = node_info["NodeManagerAddress"]
prom_addresses.append(f"{addr}:{metrics_export_port}")
components_dict, metric_names, metric_samples = fetch_prometheus(
prom_addresses)
return any(counter_name in full_name for full_name in metric_names)
wait_for_condition(test, timeout=30)
print("SHUTTING DOWN")
ray.shutdown()
cluster.shutdown() |
|
It seems the issue is just related to So to summarize, calling |
edoakes
left a comment
There was a problem hiding this comment.
I'm a little bit confused -- is this for the address="auto" case or for the "new driver" case? If it's for the address="auto" case the most logical solution would be to store the port mapping in the GCS and fetch it on startup. I'm not sure we want this behavior for the "new driver" case at all.
I think you mentioned there's some reason not to do that, could you remind me what that is?
|
No, I think I might be confused--what's the difference between the two cases? This PR is for when more than one driver script runs on the same Ray cluster and on the same node, so there's a new driver that calls Or maybe the source of confusion is that this PR currently fails tests because of a separate issue #13532 which involves the stats not shutting down correctly (in particular, in between different test functions when running pytest) |
|
The difference is that in the Assuming this is just solving the first case, the solution seems reasonable but ideally we would store this in the GCS instead of a local file for consistency. |
|
Ah, I think I see what you're saying. The cache is keyed by session, so in the case of calling ray.init(), ray.shutdown(), and then ray.init(), the ports from the first ray.init() are not read by the second ray.init() (it's a new session, so it has a separate cache). Does that address the concern? |
|
Btw, @allenyin55 said this should be fixed ASAP. Can Is there anything else we should handle from this PR? |
|
Before merging this PR, the PR #13565 needs to be merged, so that tests on the current PR pass (since shutdown() is called between each test). After that, this PR is ready to be merged. |
|
Is #13565 regression? Otherwise, we can just merge this PR first right? |
|
Relevant tests failing @architkulkarni |
|
cc @edoakes @architkulkarni Can you just refactor the test to not do ray.init & ray.shutdown again and again and just merge the PR first? (so that we can unblock Allen). Lingxuan also thinks the current fix can cause some issues, so it'd take some time to resolve the blocker. |
|
Good idea. I moved the relevant test to the beginning of the file so that it runs first, before any |
|
Only |
* cache metric ports in a file at each node * remove old assignment of export port * lint * lint * move e2e test to top of file to avoid shutdown bug
…ject#13501)" This reverts commit 4d32c72.
Why are these changes needed?
Different drivers started on the same node need to use the same metrics port. Before this PR, each subsequent driver script after the first would choose a new unused port for the metrics agent, so metrics defined in these driver scripts would not display.
Related issue number
Closes #13372
Checks
scripts/format.shto lint the changes in this PR.