@@ -94,15 +94,23 @@ async def GetProfilingStats(self, request, context):
9494 return reporter_pb2 .GetProfilingStatsReply (
9595 profiling_stats = profiling_stats , std_out = stdout , std_err = stderr )
9696
97- async def ReportOCMetrics (self , request , context ):
98- # This function receives a GRPC containing OpenCensus (OC) metrics
99- # from a Ray process, then exposes those metrics to Prometheus .
97+ async def ReportMetrics (self , request , context ):
98+ # NOTE: Exceptions are not propagated properly
99+ # when we don't catch them here .
100100 try :
101- self ._metrics_agent .record_metric_points_from_protobuf (
102- request .metrics )
103- except Exception :
101+ metrcs_description_required = (
102+ self ._metrics_agent .record_metrics_points (
103+ request .metrics_points ))
104+ except Exception as e :
105+ logger .error (e )
104106 logger .error (traceback .format_exc ())
105- return reporter_pb2 .ReportOCMetricsReply ()
107+
108+ # If metrics description is missing, we should notify cpp processes
109+ # that we need them. Cpp processes will then report them to here.
110+ # We need it when (1) a new metric is reported (application metric)
111+ # (2) a reporter goes down and restarted (currently not implemented).
112+ return reporter_pb2 .ReportMetricsReply (
113+ metrcs_description_required = metrcs_description_required )
106114
107115 @staticmethod
108116 def _get_cpu_percent ():
@@ -117,7 +125,8 @@ def _get_gpu_usage():
117125 try :
118126 gpus = gpustat .new_query ().gpus
119127 except Exception as e :
120- logger .debug (f"gpustat failed to retrieve GPU information: { e } " )
128+ logger .debug (
129+ "gpustat failed to retrieve GPU information: {}" .format (e ))
121130 for gpu in gpus :
122131 # Note the keys in this dict have periods which throws
123132 # off javascript so we change .s to _s
@@ -224,8 +233,12 @@ def _get_all_stats(self):
224233 "cmdline" : self ._get_raylet_cmdline (),
225234 }
226235
227- async def _perform_iteration (self , aioredis_client ):
236+ async def _perform_iteration (self ):
228237 """Get any changes to the log files and push updates to Redis."""
238+ aioredis_client = await aioredis .create_redis_pool (
239+ address = self ._dashboard_agent .redis_address ,
240+ password = self ._dashboard_agent .redis_password )
241+
229242 while True :
230243 try :
231244 stats = self ._get_all_stats ()
@@ -236,8 +249,5 @@ async def _perform_iteration(self, aioredis_client):
236249 reporter_consts .REPORTER_UPDATE_INTERVAL_MS / 1000 )
237250
238251 async def run (self , server ):
239- aioredis_client = await aioredis .create_redis_pool (
240- address = self ._dashboard_agent .redis_address ,
241- password = self ._dashboard_agent .redis_password )
242252 reporter_pb2_grpc .add_ReporterServiceServicer_to_server (self , server )
243- await self ._perform_iteration (aioredis_client )
253+ await self ._perform_iteration ()
0 commit comments