Mechanik20051988/gh 5645 several iproto threads new#5877
Conversation
e77b431 to
5003a9b
Compare
alyapunov
left a comment
There was a problem hiding this comment.
A left several comments. I think there are ways to improve this patchset.
src/rmean.c
Outdated
| int64_t mean = 0; | ||
| for (size_t j = 1; j <= RMEAN_WINDOW; j++) | ||
| mean += rmean->stats[name].value[j]; | ||
| mean += __atomic_load_n(&rmean->stats[name].value[j], __ATOMIC_ACQUIRE); |
There was a problem hiding this comment.
I think we may assume that values 1..RMEAN_WINDOW are always accessed from main thread and thus should not be synchronized.
src/rmean.c
Outdated
|
|
||
| rmean->stats[name].value[0] += value; | ||
| rmean->stats[name].total += value; | ||
| __atomic_add_fetch(&rmean->stats[name].value[0], value, __ATOMIC_RELEASE); |
There was a problem hiding this comment.
Why not __ATOMIC_RELAXED? I don't see a reason of using __ATOMIC_RELEASE here.
src/rmean.c
Outdated
| for (size_t j = 0; j < RMEAN_WINDOW + 1; j++) | ||
| rmean->stats[i].value[j] = 0; | ||
| rmean->stats[i].total = 0; | ||
| __atomic_store_n(&rmean->stats[i].value[j], 0, __ATOMIC_RELEASE); |
There was a problem hiding this comment.
I think that only value[0] should have a fence, since only it can be modified from other thread.
src/rmean.c
Outdated
| rmean_roll(int64_t *value, double dt) | ||
| { | ||
| value[0] /= dt; | ||
| int64_t tmp = __atomic_load_n(&value[0], __ATOMIC_ACQUIRE); |
There was a problem hiding this comment.
That function was not a good piece of code, and you make it worse.
What's a point of several atomic accesses of value[0] in this function? If you are expecting that somebody else is changing it - what would be a correct way to handle it?
I don't like thoughtless changing of access to everything suspicious to store+RELEASE and load+ACQUIRE. A code should do everything it must and not more.
src/rmean.h
Outdated
| rmean_total(struct rmean *rmean, size_t name) | ||
| { | ||
| return rmean->stats[name].total; | ||
| return __atomic_load_n(&rmean->stats[name].total, __ATOMIC_ACQUIRE); |
There was a problem hiding this comment.
Similarly, it think __ATOMIC_RELAXED would be enough. Not a big deal btw.
src/box/iproto.cc
Outdated
| static inline void | ||
| iproto_thread_init_routes(struct iproto_thread *iproto_thread) | ||
| { | ||
| assert(iproto_thread != NULL); |
There was a problem hiding this comment.
I didn't verify each line of this function. I hope it's correct.
src/box/iproto.cc
Outdated
| return mempool_count(&iproto_msg_pool); | ||
| size_t count = 0; | ||
| for( auto& iproto_thread : iproto_threads ) | ||
| count += mempool_count(&iproto_thread->iproto_msg_pool); |
There was a problem hiding this comment.
What about synchronization here?
src/box/iproto.cc
Outdated
| return mempool_count(&iproto_connection_pool); | ||
| size_t count = 0; | ||
| for( auto& iproto_thread : iproto_threads ) | ||
| count += mempool_count(&iproto_thread->iproto_connection_pool); |
There was a problem hiding this comment.
What about synchronization here?
src/box/iproto.cc
Outdated
| size_t mem = 0; | ||
| for( auto& iproto_thread : iproto_threads ) | ||
| mem += slab_cache_used(&iproto_thread->net_cord.slabc) + | ||
| slab_cache_used(&iproto_thread->net_slabc); |
There was a problem hiding this comment.
What about synchronization here?
src/box/iproto.cc
Outdated
| if (uri != NULL) { | ||
| if (evio_service_bind(&binary, uri) != 0) | ||
| diag_raise(); | ||
| for( auto& iproto_thread : iproto_threads ) { |
There was a problem hiding this comment.
I don't like this solutions. Actually I don't know how to do it good, but I didn't think a lot. I see the following problems to be fixed:
- You set here
iproto_thread'smembers and it's not obvious why it is safe. Please don't explain me, I understand, but you should make the code understandable by itself. - This new meesage
IPROTO_CFG_STOPis not documented fine. It seems that doesn't require the service to be started. At the same timeIPROTO_CFG_LISTENrequires (!) service to be stopped. That's very confusing. - You do a lot work right here, while doing a lot if work in
IPROTO_CFG_LISTENhandler. That's not obvious either.
5edb2a0 to
34b285c
Compare
d0fa199 to
eb657a2
Compare
| } | ||
|
|
||
| void | ||
| iproto_listen(const char *uri) |
There was a problem hiding this comment.
Two funcs with the same name - I'd rather rename one of them
| test_run:cmd('start server test with args="8"') | ||
| server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] | ||
| fibers_count = fibers_count * 2 | ||
| assert(test_run:grep_log("test", "stopping input on connection") == nil) |
There was a problem hiding this comment.
Could you please explain what is the purpose of the test and what are you checking?
There was a problem hiding this comment.
The reason why we are making this patch is that: there are users that have specific workloads where iproto thread
is the bottleneck of throughput: iproto thread's code is 100% loaded while TX thread's core is not. I don't know how to check full iproto thread loading in test, except for indirect signs, one of which is the message in the log "stopping connection on input". So in test first of all i start instance with one iproto thread, and send requests in a large number of fibers at the same time to this instance. I increase the number of fibers by 4 times until I get "stopping connection on input" in log. Then i restart instance with 8 itproto threads increase the number of fibers by 4 times again and check that there is no such message. Based on this, I conclude that 8 threads can withstand at least 4 times the more load
There was a problem hiding this comment.
If you have any ideas on how to test this better, that would be great!
27b5d3d to
5d87828
Compare
src/box/iproto.cc
Outdated
| evio_service_init(loop(), &iproto_thread->binary, "binary", | ||
| iproto_on_accept, iproto_thread); | ||
|
|
||
| static const unsigned int endpoint_name_max = ENDPOINT_NAME_MAX; |
There was a problem hiding this comment.
Why do you need this var? Why not directly use ENDPOINT_NAME_MAX?
src/box/iproto.cc
Outdated
| iproto_thread->stopped_connections = | ||
| RLIST_HEAD_INITIALIZER(iproto_thread-> | ||
| stopped_connections); | ||
| static const unsigned int endpoint_name_max = 10; |
src/box/iproto.cc
Outdated
| cpipe_set_max_input(&net_pipe, iproto_msg_max / 2); | ||
| /** Initialize the iproto subsystem and start network io thread */ | ||
| void | ||
| iproto_init(unsigned int threads_count) |
There was a problem hiding this comment.
cfg_geti returns int, so better make this function take int param. Also I'd add some check verifying rationality of threads_count value. I mean raise warning/error in case user passes too large value. For instance, I doubt that it makes sense to set value more than 2*cores. I don't insist on this check tho.
9bddee4 to
5c90dc2
Compare
The fields of the rmean structure can be accessed from multiple threads, so we must use atomic operations to get/set fields in this structure. Also in the comments to the functions i wrote in which threads they should be called to correctly access the fields of the rmean structure.
There was two problems with struct rmean: - For correct access for rmean struct fields, this struct should be created in tx thread. - In case when rmean_new return NULL in net_cord_f tarantool hangs and does not terminate in any way except on SIGKILL. Also net_slabc cache was not destroyed. Moved allocation and deallocation of rmean structure to iproto_init/iproto_free respectively. Added slab_cache_destroy for net_slabc for graceful resource releases.
31244da to
bb1c6e4
Compare
bb1c6e4 to
0081b6f
Compare
There are users that have specific workloads where iproto thread is the bottleneck of throughput: iproto thread's code is 100% loaded while TX thread's core is not. For such cases it would be nice to have a capability to create several iproto threads. Closes #5645 @TarantoolBot document Title: implement ability to run multiple iproto threads Implement ability to run multiple iproto threads, which is useful in some specific workloads where iproto thread is the bottleneck of throughput. To specify count of iproto threads, user should used iproto_threads option in box.cfg. For example if user want to start 8 iproto threads, he must enter `box.cfg{iproto_threads=8}`. Default iproto threads count == 1. This option is not dynamic, so user can't change it after first setting, until server restart. Distribution of connections per threads is managed by OS kernel.
0081b6f to
754f64c
Compare
|
Well, hello. I suppose the review wasn't really thorough, was it?
|
iproto: implement ability to run multiple iproto threads
There are users that have specific workloads where iproto thread
is the bottleneck of throughput: iproto thread's code is 100% loaded
while TX thread's core is not. For such cases it would be nice to have
a capability to create several iproto threads.
Closes #5645