Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 9 additions & 92 deletions test/cpp/qps/driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,6 @@ static std::string get_host(const std::string& worker) {
return s;
}

static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
const deque<string>& workers) {
std::unordered_map<string, std::deque<int>> hosts;
for (auto it = workers.begin(); it != workers.end(); it++) {
const string host = get_host(*it);
if (hosts.find(host) == hosts.end()) {
auto stub = WorkerService::NewStub(
CreateChannel(*it, InsecureChannelCredentials()));
grpc::ClientContext ctx;
ctx.set_wait_for_ready(true);
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
GPR_ASSERT(s.ok());
std::deque<int> dq;
for (int i = 0; i < cores.cores(); i++) {
dq.push_back(i);
}
hosts[host] = dq;
}
}
return hosts;
}

static deque<string> get_workers(const string& name) {
char* env = gpr_getenv(name.c_str());
if (!env) return deque<string>();
Expand Down Expand Up @@ -241,9 +217,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
}

// Setup the hosts and core counts
auto hosts_cores = get_hosts_and_cores(workers);

// if num_clients is set to <=0, do dynamic sizing: all workers
// except for servers are clients
if (num_clients <= 0) {
Expand Down Expand Up @@ -271,39 +244,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
CreateChannel(workers[i], InsecureChannelCredentials()));

ServerConfig server_config = initial_server_config;
char* host;
char* driver_port;
char* cli_target;
gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
string host_str(host);
int server_core_limit = initial_server_config.core_limit();
int client_core_limit = initial_client_config.core_limit();

if (server_core_limit == 0 && client_core_limit > 0) {
// In this case, limit the server cores if it matches the
// same host as one or more clients
const auto& dq = hosts_cores.at(host_str);
bool match = false;
int limit = dq.size();
for (size_t cli = 0; cli < num_clients; cli++) {
if (host_str == get_host(workers[cli + num_servers])) {
limit -= client_core_limit;
match = true;
}
}
if (match) {
GPR_ASSERT(limit > 0);
server_core_limit = limit;
}
}
if (server_core_limit > 0) {
auto& dq = hosts_cores.at(host_str);
GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit));
for (int core = 0; core < server_core_limit; core++) {
server_config.add_core_list(dq.front());
dq.pop_front();
}
}

ServerArgs args;
*args.mutable_setup() = server_config;
Expand All @@ -315,11 +255,15 @@ std::unique_ptr<ScenarioResult> RunScenario(
if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
}
gpr_join_host_port(&cli_target, host, init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(host);
gpr_free(driver_port);
gpr_free(cli_target);
// Fill in server targets only if they haven't already been configured.
if ((int)num_servers != client_config.server_targets_size()) {
std::string host;
char* cli_target;
host = get_host(workers[i]);
gpr_join_host_port(&cli_target, host.c_str(), init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(cli_target);
}
}

// Targets are all set by now
Expand All @@ -339,33 +283,6 @@ std::unique_ptr<ScenarioResult> RunScenario(
CreateChannel(worker, InsecureChannelCredentials()));
ClientConfig per_client_config = client_config;

int server_core_limit = initial_server_config.core_limit();
int client_core_limit = initial_client_config.core_limit();
if ((server_core_limit > 0) || (client_core_limit > 0)) {
auto& dq = hosts_cores.at(get_host(worker));
if (client_core_limit == 0) {
// limit client cores if it matches a server host
bool match = false;
int limit = dq.size();
for (size_t srv = 0; srv < num_servers; srv++) {
if (get_host(worker) == get_host(workers[srv])) {
match = true;
}
}
if (match) {
GPR_ASSERT(limit > 0);
client_core_limit = limit;
}
}
if (client_core_limit > 0) {
GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit));
for (int core = 0; core < client_core_limit; core++) {
per_client_config.add_core_list(dq.front());
dq.pop_front();
}
}
}

// Reduce channel count so that total channels specified is held regardless
// of the number of clients available
size_t num_channels =
Expand Down
14 changes: 13 additions & 1 deletion test/cpp/qps/qps_json_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,27 @@ DEFINE_double(error_tolerance, 0.01,
"Defines threshold for stopping the search. When current search "
"range is narrower than the error_tolerance computed range, we "
"stop the search.");
DEFINE_string(benchmark_server_target_override, "",
"Optional uri to override the benchmark server target "
"given to client benchmark workers.");

namespace grpc {
namespace testing {

static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
bool* success) {
std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n";

ClientConfig client_config = scenario.client_config();

if (FLAGS_benchmark_server_target_override != "") {
GPR_ASSERT(scenario.num_servers() == 1);
GPR_ASSERT(client_config.server_targets_size() == 0);
client_config.add_server_targets(FLAGS_benchmark_server_target_override);
}

auto result =
RunScenario(scenario.client_config(), scenario.num_clients(),
RunScenario(client_config, scenario.num_clients(),
scenario.server_config(), scenario.num_servers(),
scenario.warmup_seconds(), scenario.benchmark_seconds(),
scenario.spawn_local_worker_count());
Expand Down