Skip to content

Gsoc21 amqp 1#2

Open
wangxuw wants to merge 14 commits intomasterfrom
gsoc21-amqp_1
Open

Gsoc21 amqp 1#2
wangxuw wants to merge 14 commits intomasterfrom
gsoc21-amqp_1

Conversation

@wangxuw
Copy link
Owner

@wangxuw wangxuw commented Jun 15, 2021

Checklist

  • References tracker ticket
  • Updates documentation if necessary
  • Includes tests for new functionality or reproducer for bug

Show available Jenkins commands
  • jenkins retest this please
  • jenkins test classic perf
  • jenkins test crimson perf
  • jenkins test signed
  • jenkins test make check
  • jenkins test make check arm64
  • jenkins test submodules
  • jenkins test dashboard
  • jenkins test api
  • jenkins test docs
  • jenkins render docs
  • jenkins test ceph-volume all
  • jenkins test ceph-volume tox

@wangxuw wangxuw marked this pull request as ready for review June 15, 2021 10:13
use proton::tracker as delivery_tag
feat: add publish and publish_with_confirm unittest
int status_tracker_to_rgw(int s) {
if(s == AMQP_1_TRACKER_STATUS_ACCEPTED) {
return RGW_AMQP_1_STATUS_OK;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the else part?

@@ -0,0 +1,634 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please convert tabs to 2 spaces

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: please convert tabs to 2 spaces

}
}

struct reply_callback_with_tag_t {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this struct?

}
};

typedef std::vector<reply_callback_with_tag_t> CallbackList;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed?

}
}

struct connection_t : public proton::messaging_handler{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given its complexity, probably better to change that to a class - and have the default be "private"

}

void on_connection_open(proton::connection& conn) {
std::cout << "proton::connection on_connection_open" <<std::endl;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please provide into on the connection (ip/port)

}

void on_connection_error(proton::connection& conn) {
std::cout << "proton::connection on_connection_ERROR " <<std::endl;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please provide info on the connection and the error

std::string broker;
proton::sender sender;
// proton::connection_options options;
proton::work_queue* pwork_queue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need the work queue explicit here?
isn't proton working with its own work queue?

}

// disconnect from a broker
bool disconnect(connection_ptr_t& conn) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove disconnect() API

void do_send(const proton::message& m, reply_callback_t cb) {
// t is the tracker of this message
auto t = sender.send(m);
std::lock_guard<std::mutex> lk(lock);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need a lock here?
everything should be using one thread

// ++queued;
// return RGW_AMQP_1_STATUS_OK;
// }
publish_internal(new message_wrapper_t(conn, topic, message, nullptr));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the publish() function is invoked from another thread. it should not call publish_internal() that should just empty the message queue (and assumes it is invoked from the internal thread)

static const int RGW_AMQP_1_STATUS_OK = 0x0;

int status_tracker_to_rgw(int s) {
if(s == AMQP_1_TRACKER_STATUS_ACCEPTED) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: please add space after the "if"

proton::work_queue* pwork_queue;
int queued;

const boost::optional<std::string> ca_location;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we support ssl?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ssl is not supported yet, this is just reserved for it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove anything which is not implemented - we will add that later

@yuvalif
Copy link
Collaborator

yuvalif commented Jul 18, 2021

please squash commits

EXPECT_TRUE(conn);
auto rc = rgw::amqp_1::publish_with_confirm(conn, "amqp1_0", "callback-message", callback);
EXPECT_EQ(rc, 0);
std::this_thread::sleep_for(std::chrono::seconds(1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably need more than one second sleep here

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 3s proper? or 5s?

num_tag = std::string("multi-sample-msg") + std::to_string(i);
rgw::amqp_1::publish(conn, to, num_tag);
}
auto rc = rgw::amqp_1::publish(conn, to, "this publish should fail with -0x1003");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test does not necessarily fail
it may pass if broker is down, and notifications are sent before this line is reached

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it depends on the sending speed of proton, so I set up the for-loop to do more than MAX_DEFAULT_QUEUE times to filling the queue, shall I delete this test as well?

@yuvalif
Copy link
Collaborator

yuvalif commented Jul 27, 2021

@EFS86340 could you please close this PR in favor of a PR to ceph/master ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants