kafka-d ~master

Native driver for Apache Kafka


To use this package, run the following command in your project's root directory:

Manual usage
Put the following dependency into your project's dependences section:

Kafka-d is a D Kafka client that depends on the vibe.d framework.

Build Status

Usage

First, bootstrap the client:

Configuration config;
// adjust config's properties if necessary
Client client = new Client([BrokerAddress("localhost", 9092)], "YOUR_CLIENT_ID", config);
Producing

For a full working example check `examples/producer/src/app.d`

Producer producer = new Producer(client, topic, partition);
string key = "myKey";
string value = "myValue";
producer.pushMessage(cast(ubyte[])key, cast(ubyte[])value);

To avoid double copying of data, you can fill your messages directly to the internal buffer using `reserveMessage() and commitMessage(). The pushMessage()` function uses these two functions internally.

The `reserveMessage(int keySize, int valueSize) function reserves portion of the internal buffer up to specified key and value sizes. Specifying -1 for any of the sizes, makes that field a null value. After the call, the producer.reservedKey and producer.reservedValue can be filled with user data (if the user specified a size greater than 0). The slices have the same sizes as specified in the reserveMessage` call. The messages in the buffer will not be sent until user calls commitMessage(), thus each reserve must be followed by a commit.

The `commitMessage()` commits reserved data in the internal buffer. The message becomes queued to be sent to the broker.

Producer producer = new Producer(client, topic, partition);
/// reserve key of size 4 bytes and value of size 8 bytes
producer.reserveMessage(4, 8);
// now the producer.reservedKey and producer.reservedValue are
// the slices pointing to the internal buffer. They are of type ubyte[] and
// their lengths are 4 and 8 respectively.
producer.reservedKey[] = 0; // fill the key with zeros, it uses D slice syntax
producer.reservedValue[] = 0xFF; // fill the value with ubyte.max
// commit the message
producer.commitMessage();

// reserve another message, this time the key is null (the value can be null too)
producer.reserveMessage(-1, 2);
// the reservedKey is not valid (will be sent as null)
// fill two bytes of the value
producer.reservedValue[0] = 0xBE;
producer.reservedValue[1] = 0xEF;
// commit the message
producer.commitMessage();

Consuming

For each topic and partition, run a worker task:

runWorkerTask((Client client, string topic, int partition) {
    Consumer consumer = new Consumer(client, topic, partition);
    for (;;) {
        Message message = consumer.getMessage();
        // consume the message
    }
}, client, "TOPIC", PARTITION);
Authors:
  • Piotr Szturmaj
  • Yannick Koechlin
Dependencies:
vibe-d
Versions:
0.0.5 2018-Aug-10
0.0.4 2016-May-19
0.0.3 2016-Apr-09
~master 2018-Aug-10
Show all 4 versions
Download Stats:
  • 0 downloads today

  • 0 downloads this week

  • 0 downloads this month

  • 90 downloads total

Score:
1.8
Short URL:
kafka-d.dub.pm