Hadooken handles all underlying stuff for you to consume messages from kafka bus.
Add this line to your application's Gemfile:
gem 'hadooken'And then execute:
$ bundle
Or install it yourself as:
$ gem install hadooken
From the path of your project run the following;
$ bundle exec hadooken
other available commands are:
$ bundle exec hadooken start
$ bundle exec hadooken stop
$ bundle exec hadooken restart
Normally hadooken assumes that there is a configuration file located at config/hadooken.yml but you can change this behaviour while starting it like so:
$ bundle exec hadooken -c config/a-config-file.yml
Other configuration options can be provided as argument are:
-dor--daemonto daemonize hadooken-eor--environmentto change environment-lor--logfileto change location of log file-por--pidfileto change location of pid file-vor--versionto print out current version of hadooken you have-hor--helpto list above options
Configurable options via configuration yml file:
- group_name: Name of the consumer group(Same group always read from same partition).
- daemon: To run as daemon.
- environment<String|Symbol>: Environment to run.
- logfile: Location of the log file. Required if daemon is true.
- pidfile: Location of the pid file. Required if daemon is true.
- workers:
- key: Name of the Worker
- value:
- type<enum(single_thread|multi_thread)> Either run the worker with multiple threads or with single thread
- threads: Number of threads, relevant only in multi_thread mode.
- topics:
- key: Name of the topic
- value: Name of the consumer class
- kafka:
- client: The client library to be used to connect Kafka. Default is Kafka.
- parameters valid for zendesk/ruby-kafka client
- test:
- schema_path: The path of the JSON schema files.
- require_env: Custom path to require.
- heartbeat:
- topic: The name of the topic that heartbeat messages will be published
- frequency: Publish frequency in seconds
Also you can configure hadooken via ruby script! Create a file under initializerz directory of rails and fill it like so:
require 'hadooken'
Hadooken.configure do |c|
c.error_capturer = -> (error, context) { puts error.class }
c.heartbeat = { topic: :consumer_heartbeat, frequency: 0.1 }
c.logfile = 'tmp/hadooken.log'
c.pidfile = 'tmp/hadooken.pid'
c.daemon = true
c.workers = {
default: {
type: :multi_thread,
threads: 16,
topics: {
foo: "Bar"
}
}
}
endHadooken comes with the Publisher DSL that you can use for producing messages.
class FooPublisher < Hadooken::Publisher
self.topic = 'foo'
self.message_name = 'foo_created'
self.version = '1.2'
self.serializer = FooBarSerializer
end
FooPublisher.publish(foo) # Will send the payload generated for `foo` object to Kafka- topic: The name of the topic that message will be sent to. This attribute is required.
- message_name: The name of the message. Default to publisher class name substracted
Publisherand underscored. The default message name forFooUpdatedPublisherclass would befoo_updated. - version: Version of the message. Default to
"1.0". - serializer: The serializer class which will serialize the given object. The serializer class should respond to
as_jsonmethod like ActiveModelSerializers. Default value for this attribute is, class name withoutPublishersuffix but withSerializersuffix. An example default value would beFooUpdatedSerializerforFooUpdatedPublisherclass.
After mapping topics with the consumer classes Hadooken will call the correct method in mapped consumer class whenever it receives a message from Kafka.
Imagine you have the following configuration;
Hadooken.configure do |c|
c.workers = {
default: {
type: :single_thread,
topics: {
user: "UserConsumer"
}
}
}
endAnd following publisher class;
class UserCreatedPublisher < Hadooken::Publisher
self.topic = "user"
self.message_name = "user_created"
endAnd the following consumer;
class UserConsumer < Hadooken::Consumer
def user_created
puts data
end
endThen, whenever you send a message using the UserCreatedPublisher, Hadooken will create an instance of UserConsumer and call user_created method of that instance.
You can also change the method dispatching by registering message names with method names like so;
class UserConsumer < Hadooken::Consumer
register :user_created, :consume_user_created_message
def consume_user_created_message
puts data
end
endYou can also use callback support of Hadooken::Consumer class like so:
class UserConsumer < Hadooken::Consumer
register :user_updated, :consume_user_updated_message
before_consume :fetch_user, only: :user_updated
def consume_user_updated_message
puts data
end
private
def fetch_user
...
end
endFor more information about the consumer and it's API please have a look at the lib/hadooken/consumer.rb.
Hadooken provides test helpers for RSpec and uses noop No-op kafka client if you require hadooken/test or hadooken/test/rspec which means, the messages won't go to Kafka instances but rather stays in in-memory queue.
If you require RSpec helpers, you can use the following test helpers;
- have_version: Tests if the version of the published message is correct. An example usage of the test helper would be;
it { is_expected.to have_version("1.0") } - have_name: Tests if the name of the published message is correct. An example usage of the test helper would be;
it { is_expected.to have_name("expected_name") } - be_delivered_to: Tests if the topic of the published message is correct. An example usage of the test helper would be;
it { is_expected.to be_delivered_to("expected_topic") } - have_schema: Tests if the schema of the published message is correct. By default, Hadooken assumes the schema files are located under
spec/fixtures/schemasdirectory and uses the given schema name to find the correct file. You can change it by setting the schema_path configuration like so;Hadooken.configuration.test[:schema_path] = "...". An example usage of the test helper would be;it { is_expected.to have_schema("schema.json") }.
- In cluster mode with multiple workers if one of the topics you've registered has just one partition this will crash the entire worker. Not the entire cluster but this should be fixed.
- Consumer constantization should be done in one place(while booting), for now we are doing this whenever we need, does not effect the performance of consumers that bad though.
- In cluster mode, send consumer data just once.
- Write unit test(In progress)