Conversation
use proton::tracker as delivery_tag feat: add publish and publish_with_confirm unittest
src/rgw/rgw_amqp_1.cc
Outdated
| int status_tracker_to_rgw(int s) { | ||
| if(s == AMQP_1_TRACKER_STATUS_ACCEPTED) { | ||
| return RGW_AMQP_1_STATUS_OK; | ||
| } |
There was a problem hiding this comment.
what about the else part?
| @@ -0,0 +1,634 @@ | |||
| // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |||
| // vim: ts=8 sw=2 smarttab ft=cpp | |||
There was a problem hiding this comment.
please convert tabs to 2 spaces
There was a problem hiding this comment.
style: please convert tabs to 2 spaces
src/rgw/rgw_amqp_1.cc
Outdated
| } | ||
| } | ||
|
|
||
| struct reply_callback_with_tag_t { |
There was a problem hiding this comment.
why do you need this struct?
src/rgw/rgw_amqp_1.cc
Outdated
| } | ||
| }; | ||
|
|
||
| typedef std::vector<reply_callback_with_tag_t> CallbackList; |
src/rgw/rgw_amqp_1.cc
Outdated
| } | ||
| } | ||
|
|
||
| struct connection_t : public proton::messaging_handler{ |
There was a problem hiding this comment.
given its complexity, probably better to change that to a class - and have the default be "private"
src/rgw/rgw_amqp_1.cc
Outdated
| } | ||
|
|
||
| void on_connection_open(proton::connection& conn) { | ||
| std::cout << "proton::connection on_connection_open" <<std::endl; |
There was a problem hiding this comment.
please provide into on the connection (ip/port)
src/rgw/rgw_amqp_1.cc
Outdated
| } | ||
|
|
||
| void on_connection_error(proton::connection& conn) { | ||
| std::cout << "proton::connection on_connection_ERROR " <<std::endl; |
There was a problem hiding this comment.
please provide info on the connection and the error
src/rgw/rgw_amqp_1.cc
Outdated
| std::string broker; | ||
| proton::sender sender; | ||
| // proton::connection_options options; | ||
| proton::work_queue* pwork_queue; |
There was a problem hiding this comment.
why do you need the work queue explicit here?
isn't proton working with its own work queue?
src/rgw/rgw_amqp_1.cc
Outdated
| } | ||
|
|
||
| // disconnect from a broker | ||
| bool disconnect(connection_ptr_t& conn) { |
There was a problem hiding this comment.
please remove disconnect() API
src/rgw/rgw_amqp_1.cc
Outdated
| void do_send(const proton::message& m, reply_callback_t cb) { | ||
| // t is the tracker of this message | ||
| auto t = sender.send(m); | ||
| std::lock_guard<std::mutex> lk(lock); |
There was a problem hiding this comment.
why do you need a lock here?
everything should be using one thread
src/rgw/rgw_amqp_1.cc
Outdated
| // ++queued; | ||
| // return RGW_AMQP_1_STATUS_OK; | ||
| // } | ||
| publish_internal(new message_wrapper_t(conn, topic, message, nullptr)); |
There was a problem hiding this comment.
the publish() function is invoked from another thread. it should not call publish_internal() that should just empty the message queue (and assumes it is invoked from the internal thread)
src/rgw/rgw_amqp_1.cc
Outdated
| static const int RGW_AMQP_1_STATUS_OK = 0x0; | ||
|
|
||
| int status_tracker_to_rgw(int s) { | ||
| if(s == AMQP_1_TRACKER_STATUS_ACCEPTED) { |
There was a problem hiding this comment.
style: please add space after the "if"
src/rgw/rgw_amqp_1.cc
Outdated
| proton::work_queue* pwork_queue; | ||
| int queued; | ||
|
|
||
| const boost::optional<std::string> ca_location; |
There was a problem hiding this comment.
ssl is not supported yet, this is just reserved for it.
There was a problem hiding this comment.
please remove anything which is not implemented - we will add that later
|
please squash commits |
src/test/rgw/test_rgw_amqp_1.cc
Outdated
| EXPECT_TRUE(conn); | ||
| auto rc = rgw::amqp_1::publish_with_confirm(conn, "amqp1_0", "callback-message", callback); | ||
| EXPECT_EQ(rc, 0); | ||
| std::this_thread::sleep_for(std::chrono::seconds(1)); |
There was a problem hiding this comment.
you probably need more than one second sleep here
src/test/rgw/test_rgw_amqp_1.cc
Outdated
| num_tag = std::string("multi-sample-msg") + std::to_string(i); | ||
| rgw::amqp_1::publish(conn, to, num_tag); | ||
| } | ||
| auto rc = rgw::amqp_1::publish(conn, to, "this publish should fail with -0x1003"); |
There was a problem hiding this comment.
this test does not necessarily fail
it may pass if broker is down, and notifications are sent before this line is reached
There was a problem hiding this comment.
yes, it depends on the sending speed of proton, so I set up the for-loop to do more than MAX_DEFAULT_QUEUE times to filling the queue, shall I delete this test as well?
|
@EFS86340 could you please close this PR in favor of a PR to ceph/master ? |
Checklist
Show available Jenkins commands
jenkins retest this pleasejenkins test classic perfjenkins test crimson perfjenkins test signedjenkins test make checkjenkins test make check arm64jenkins test submodulesjenkins test dashboardjenkins test apijenkins test docsjenkins render docsjenkins test ceph-volume alljenkins test ceph-volume tox