horizontal-scaling: sink gRPC service + tests#663
Conversation
214b269 to
4bb4a57
Compare
Signed-off-by: Otto van der Schaaf <ovanders@redhat.com>
4bb4a57 to
1aebb64
Compare
…sink-grpc-service Signed-off-by: Otto van der Schaaf <ovanders@redhat.com>
|
@eric846 please review and assign back to me once done |
source/sink/service_impl.cc
Outdated
| while (request_reader->Read(&request)) { | ||
| ENVOY_LOG(info, "StoreExecutionResponseStream request {}", request.DebugString()); | ||
| const nighthawk::client::ExecutionResponse& response_to_store = request.execution_response(); | ||
| const auto status = sink_->StoreExecutionResultPiece(response_to_store); |
There was a problem hiding this comment.
Can this just be absl::Status?
source/sink/service_impl.cc
Outdated
| break; | ||
| } | ||
| } | ||
| grpc::Status grpc_status = |
There was a problem hiding this comment.
If this absl::Status-to-grpc::Status conversion were a helper function (that also logged the status), it might allow you to return right out of the middle of the loop.
source/sink/service_impl.cc
Outdated
| absl::StatusOr<nighthawk::client::ExecutionResponse> response = | ||
| mergeExecutionResponses(request.execution_id(), responses); | ||
| status.Update(response.status()); | ||
| if (status.ok()) { |
There was a problem hiding this comment.
I think this loop feels more complicated than it is because of the indentation and running status variable. As soon as something goes wrong, we basically exit the function entirely, right? Could you just return immediately whenever a call returns non-ok and flatten the loop to one level of indentation?
There was a problem hiding this comment.
Simplified it for a bit in e12139c, using the suggestion above to create a helper. Let me know what you think.
source/sink/service_impl.cc
Outdated
| if (!merge_target.has_options()) { | ||
| // If no options are set, that means this is the first part of the merge. | ||
| // Set some properties that shouldbe equal amongst all Output instances. | ||
| *(merge_target.mutable_options()) = input_to_merge.options(); |
There was a problem hiding this comment.
Nit: no need for ( ) around the left side I believe.
source/sink/service_impl.cc
Outdated
| } | ||
| } | ||
| // Append all input results into our own results. | ||
| for (const auto& result : input_to_merge.results()) { |
There was a problem hiding this comment.
Could auto just be nighthawk::client::Result?
test/sink/sink_service_test.cc
Outdated
| absl::StatusOr<std::vector<ExecutionResponse>> response_from_mock_sink = | ||
| std::vector<ExecutionResponse>{{}}; | ||
| ExecutionResponse& response = response_from_mock_sink.value().at(0); | ||
| response.mutable_execution_id()->assign(kTestId); |
There was a problem hiding this comment.
Can this just be .set_execution_id(kTestId) as on request_, or is there a distinction? Same question about all uses of .assign(), which I'm seeing for the first time.
There was a problem hiding this comment.
.assign() accepts the abs::string_view we're passing it, with .set_execution_id we will have to create a std::string because passing the string_view directly will not work. Personally I find this slightly nicer to look at, but if you feel strongly about it I can change this here (in this PR, but there are some other spots where this is done too).
There was a problem hiding this comment.
I see, I was afraid it would be string_view not working. No problem keeping .assign()
There was a problem hiding this comment.
i don't have a problem here, but we aren't passing a string_view here, are we? we're just passing a const string.
There was a problem hiding this comment.
Oh yes, changed this in all the appropriate spots in 8492073
test/sink/sink_service_test.cc
Outdated
| response.mutable_execution_id()->assign(kTestId); | ||
| response.mutable_output(); | ||
| request_.set_execution_id(kTestId); | ||
| auto r = stub_->SinkRequestStream(&context_); |
There was a problem hiding this comment.
If this type is something like ClientReaderWriter<nighthawk::StoreExecutionRequest, nighthawk::StoreExecutionResponse> I think we still prefer to spell it out despite the verbosity. Also could you rename r to something more descriptive, possibly stream like https://grpc.io/docs/languages/cpp/basics/#streaming-rpcs?
test/sink/sink_service_test.cc
Outdated
| TEST(ResponseVectorHandling, NoResultsInOutputYieldsNone) { | ||
| ExecutionResponse result; | ||
| std::vector<ExecutionResponse> responses{result, result, result}; | ||
| absl::StatusOr<ExecutionResponse> response = mergeExecutionResponses("", responses); |
There was a problem hiding this comment.
Could we label this /*execution_id=*/"" or just make the value "execution-id"?
|
|
||
| TEST(MergeOutputs, MergeDivergingOptionsInResultsFails) { | ||
| const std::string kTestId = "test-id"; | ||
| std::vector<ExecutionResponse> responses; |
There was a problem hiding this comment.
responses and the variables response_1, response_2 seem unused in the mergeOutput tests. Could the tests just create nighthawk::client::Output variables directly?
source/sink/service_impl.cc
Outdated
| while (stream->Read(&request)) { | ||
| ENVOY_LOG(trace, "Inbound SinkRequest {}", request.DebugString()); | ||
| absl::StatusOr<std::vector<nighthawk::client::ExecutionResponse>> | ||
| status_or_execution_responses = sink_->LoadExecutionResult(request.execution_id()); |
There was a problem hiding this comment.
A style accepted elsewhere for limiting the verbosity of StatusOr:
- omit
statusand_orfrom the variable name (compiler will protect against losing track of what is and isn't aStatusOr) - no need to unpack the value into a second variable
- instead of
.value(), use the overloaded*operator (if implemented) - instead of
.value()., use the overloaded->operator (if implemented)
…sink-grpc-service Signed-off-by: Otto van der Schaaf <ovanders@redhat.com>
Signed-off-by: Otto van der Schaaf <ovanders@redhat.com>
|
Thank you for the review @eric846, this is ready for another pass. |
Signed-off-by: Otto van der Schaaf <ovanders@redhat.com>
source/sink/service_impl.cc
Outdated
| return grpc::Status::OK; | ||
| }; | ||
|
|
||
| grpc::Status SinkServiceImpl::abslStatusToGrpcStatus(const absl::Status& status) { |
There was a problem hiding this comment.
instead of being a private method on this class, can this just be a function in the anonymous namespace?
(unless i'm missing something, this function doesn't use anything on the object itself)
| SinkServiceImpl::SinkServiceImpl(std::unique_ptr<Sink> sink) : sink_(std::move(sink)) {} | ||
|
|
||
| grpc::Status SinkServiceImpl::StoreExecutionResponseStream( | ||
| grpc::ServerContext*, grpc::ServerReader<nighthawk::StoreExecutionRequest>* request_reader, |
There was a problem hiding this comment.
should we add a null pointer check for request_reader? (return invalid argument if it's nullptr)
There was a problem hiding this comment.
Added a RELEASE_ASSERT here (and also in the other gRPC method): this gets generated AFAICT there's no way to hit this in tests.
source/sink/service_impl.cc
Outdated
| nighthawk::StoreExecutionRequest request; | ||
| while (request_reader->Read(&request)) { | ||
| ENVOY_LOG(info, "StoreExecutionResponseStream request {}", request.DebugString()); | ||
| const nighthawk::client::ExecutionResponse& response_to_store = request.execution_response(); |
There was a problem hiding this comment.
nit: can just inline this in the next line
source/sink/service_impl.cc
Outdated
| const absl::Status status = sink_->StoreExecutionResultPiece(response_to_store); | ||
| if (!status.ok()) { | ||
| ENVOY_LOG(error, "StoreExecutionResponseStream failure: {}", status.ToString()); | ||
| return grpc::Status(grpc::StatusCode::INTERNAL, status.ToString()); |
There was a problem hiding this comment.
isn't this a good place for your abslStatusToGrpcStatus?
source/sink/service_impl.cc
Outdated
| *merge_target.mutable_timestamp() = input_to_merge.timestamp(); | ||
| *merge_target.mutable_version() = input_to_merge.version(); | ||
| } else { | ||
| // Options used should not diverge for a executions under a single execution id. |
| } | ||
| // Append all input results into our own results. | ||
| for (const nighthawk::client::Result& result : input_to_merge.results()) { | ||
| merge_target.add_results()->MergeFrom(result); |
There was a problem hiding this comment.
to my knowledge we don't have code anywhere that alters result names based on which nighthawk they came from. As a result, I believe that if you're using three nighthawks, you'd wind up with three results with the name "global" and if those nighthawks each had two workers, you'd also have three of each "worker1" and "worker2" (or however they're named).
There was a problem hiding this comment.
Oh, good catch, this looses some information unintentionally. Thank you for catching that!
I think we could improve here by having a new field in nighthawk::client::Result, where we can tag the originating (listening address of the) associated load gen service, so we can trace back where the result came from.. As it stands, we do know due to the way that this is structured that the results are distinct, but we do not know which service was responsible for what results. I'll propose something to address this.
| // Append all input results into our own results. | ||
| for (const nighthawk::client::Result& result : input_to_merge.results()) { | ||
| merge_target.add_results()->MergeFrom(result); | ||
| } |
There was a problem hiding this comment.
In addition to the above, I'm concerned about this more architecturally. I would have thought the purpose of the full Execution Response would be to allow aggregation of all of the responses into a single cross-nighthawk global result object.
As it is, isn't this now a massive memory risk for the Sink service? Will it run out of memory holding onto all of these execution responses.
Without the ability to combine all results into a single global result, it seems to me that this would be safer if we instead provided a stream of execution responses back. That way, we wouldn't have to hold onto the memory for all of these different (potentially large) protos at the same time.
There was a problem hiding this comment.
Before we act on any of this, I want to get @mum4k 's eyes on it. Because he's been more involved with you on this design, and we don't want to expand scope at all.
There was a problem hiding this comment.
I can see the concern here; however in the current state there are no "raw" statistics being transferred,
just the summaries (one for each worker thead involved + a summary per process/load-gen-service).
The raw statistics have been what I was thinking about mostly when being concerned about memory.
That will be addressed transparantly to this service when we introduce it, but I can see that is hard to assess by just looking at this PR: I have tried to intentionally build up the functionality to keep review more fun and focussed.
We don't tag on an aggregated summary across all load gen services yet, and as such we don't have the raw stats being used yet.
Having said that: if we should anticipate volumes of results to an extent that this is a problem, then your concern is very valid, and my assumption here is off. In that case I'd be happy to iterate. I think that is solvable in various ways without too much effort.
Lastly, if we can, I do have a preference: I think that it is useful to propagate the information at this level by default. The summaries can sometimes be misleading -- at least the way we do them right now. A practical example I've seen in the wild: one node running a cpu hog (prometheus), significantly impacting compute capacity of a load generator running on the same node. It's not a silver bullet, but being able to look at all the details improves chances of catching hints of such things.
test/sink/sink_service_test.cc
Outdated
| absl::StatusOr<std::vector<ExecutionResponse>> response_from_mock_sink = | ||
| std::vector<ExecutionResponse>{{}}; | ||
| ExecutionResponse& response = response_from_mock_sink.value().at(0); | ||
| response.mutable_execution_id()->assign(kTestId); |
There was a problem hiding this comment.
i don't have a problem here, but we aren't passing a string_view here, are we? we're just passing a const string.
Signed-off-by: Otto van der Schaaf <ovanders@redhat.com>
Signed-off-by: Otto van der Schaaf <ovanders@redhat.com>
This wires in a lot of the gRPC Sink service functionality, but not all of it.
I trimmed it down somewhat so we can focus on wiring in most of the flesh of it
in a bite sized review.
Summarising statistics & handling of chunks of raw statistics data will
be added in one or more follow-ups to this.
Signed-off-by: Otto van der Schaaf ovanders@redhat.com