Skip to content

Requried acks not enforced? #91

@lintmint

Description

@lintmint

I have a system with 3 kafka brokers and a program that sets required acks to 2 but it seems to always succeed as long as 1 broker is up.

As I am new to kafka I thought I'd try a different program built from librdkafka's 0008-reqacks.c and it seems to happily chug along too as long as 1 broker is up regardless of what I set reqired acks to.

Any insight would be appreciated.

#define _GNU_SOURCE
#include <sys/time.h>
#include <time.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>

//#include "test.h"

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */


static int msgid_next = 0;
static int fails = 0;
char* brokers = "10.21.51.12:9092,10.21.51.13:9092,10.21.51.14:9092";

/**
 * Delivery reported callback.
 * Called for each message once to signal its delivery status.
 */
static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
                   rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
   int msgid = *(int *)msg_opaque;

   free(msg_opaque);

   if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
      fprintf(stderr, "Message delivery failed: %s\n",
              rd_kafka_err2str(err));

   if (msgid != msgid_next) {
      fails++;
      fprintf(stderr, "Delivered msg %i, expected %i\n",
              msgid, msgid_next);
      return;
   }

   msgid_next = msgid+1;
}



static void test_error_cb (rd_kafka_t *rk, int err,
                           const char *reason, void *opaque) {
   fprintf(stderr, "rdkafka error: %s: %s", rd_kafka_err2str(err), reason);
}


int main (int argc, char **argv) {
   //char *topic = "rdkafkatest1";
   char *topic = "aim_config";
   int partition = 0;
   int r;
   rd_kafka_t *rk;
   rd_kafka_topic_t *rkt;
   rd_kafka_conf_t *conf;
   rd_kafka_topic_conf_t *topic_conf;
   char errstr[512];
   char msg[128];
   int msgcnt = 100;
   int i;
   int reqacks;
   int idbase = 0;

   /* Try different request.required.acks settings (issue #75) */
   for (reqacks = -1 ; reqacks <= 2 ; reqacks++) {
      char tmp[10];

   conf = rd_kafka_conf_new();
   topic_conf = rd_kafka_topic_conf_new();
   rd_kafka_conf_set_error_cb(conf, test_error_cb);

   snprintf(tmp, sizeof(tmp), "%i", reqacks);

   if (rd_kafka_topic_conf_set(topic_conf, "request.required.acks",
                               tmp, errstr, sizeof(errstr)) !=
       RD_KAFKA_CONF_OK)
      fprintf(stderr, "%s", errstr);

      /* Set delivery report callback */
      rd_kafka_conf_set_dr_cb(conf, dr_cb);

      /* Create kafka instance */
      rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                        errstr, sizeof(errstr));
      if (!rk)
         fprintf(stderr, "Failed to create rdkafka instance: %s\n",
                 errstr);

      fprintf(stderr, "Created kafka instance %s with required acks %i\n",
              rd_kafka_name(rk), reqacks);

      if( rd_kafka_brokers_add(rk, brokers) == 0)
      {
         fprintf(stderr, "%s No valid brokers specified\n", __func__);
      }


      rkt = rd_kafka_topic_new(rk, topic, topic_conf);
      if (!rkt)
         fprintf(stderr, "Failed to create topic: %s\n",
                 strerror(errno));

      /* Produce messages */
      for (i = 0 ; i < msgcnt ; i++) {
         int *msgidp = malloc(sizeof(*msgidp));
         *msgidp = idbase + i;
         snprintf(msg, sizeof(msg),
                  "%s test message #%i (acks=%i)",
                  argv[0], *msgidp, reqacks);
         r = rd_kafka_produce(rkt, partition,
                              RD_KAFKA_MSG_F_COPY,
                              msg, strlen(msg), NULL, 0, msgidp);
         if (r == -1)
            fprintf(stderr, "Failed to produce message #%i: %s\n",
                    *msgidp, strerror(errno));
      }

      fprintf(stderr, "Produced %i messages, waiting for deliveries\n",
              msgcnt);

      /* Wait for messages to time out */
      while (rd_kafka_outq_len(rk) > 0)
         rd_kafka_poll(rk, 50);

      if (fails)
         fprintf(stderr, "%i failures, see previous errors", fails);

      if (msgid_next != idbase + msgcnt)
         fprintf(stderr, "Still waiting for messages: "
                 "next %i != end %i\n",
                 msgid_next, msgcnt);
      idbase += i;

      /* Destroy topic */
      rd_kafka_topic_destroy(rkt);

      /* Destroy rdkafka instance */
      fprintf(stderr, "Destroying kafka instance %s\n", rd_kafka_name(rk));
      rd_kafka_destroy(rk);
   }


   /* If we havent failed at this point then
    * there were no threads leaked */
   return 0;
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions