A Kafka Compatible Broker With A PostgreSQL Storage Engine

3 minute read

Tansu is an Apache Kafka API compatible broker with a PostgreSQL storage engine. Acting as a drop in replacement, existing clients connect to Tansu, producing and fetching messages stored in PostgreSQL. Tansu is in early development, licensed under the GNU AGPL. Written in async 🚀 Rust 🦀.

While retaining API compatibility, the current storage engine implemented for PostgreSQL is very different when compared to Apache Kafka:

  • Messages are not stored in segments, so that retention and compaction polices can be applied immediately.
  • Message ordering is total over all topics, unrestricted to a single topic partition.
  • Brokers do not replicate messages, relying on continous archiving instead.

Our initial use cases are relatively low volume Kafka deployments where total message ordering could be useful. Other non-functional requirements might require a different storage engine. Tansu has been designed to work with multiple storage engines which are in development:

  • A PostgreSQL engine where message ordering is either per topic, or per topic partition (as in Kafka).
  • An object store for S3 or compatible services.
  • A segmented disk store (as in Kafka with broker replication).

We store a Kafka message using the following record schema:

create table record (
  id bigserial primary key not null,
  topic uuid references topic(id),
  partition integer,
  producer_id bigint,
  sequence integer,
  timestamp timestamp,
  k bytea,
  v bytea,
  last_updated timestamp default current_timestamp not null,
  created_at timestamp default current_timestamp not null
);

The k and v are the key and value being stored by the client, with the SQL being used for a fetch looks like:

with sized as (
 select
 record.id,
 timestamp,
 k,
 v,
 sum(coalesce(length(k), 0) + coalesce(length(v), 0)),
 over (order by record.id) as bytes
 from cluster, record, topic
 where
 cluster.name = $1
 and topic.name = $2
 and record.partition = $3
 and record.id >= $4
 and topic.cluster = cluster.id
 and record.topic = topic.id
) select * from sized where bytes < $5;

One of the parameters for the Kafka Fetch API is the maximum number of bytes being returned. We use a with query here to restrict the size of the result set being returned, with a running total of the size.

Tansu is available as a minimal from scratch docker image. The image is hosted with the Github Container Registry. An example compose.yaml, available from here:

docker compose up

Using the regular Apache Kafka CLI you can create topics, produce and consume messages with Tansu:

kafka-topics \
  --bootstrap-server localhost:9092 \
  --partitions=3 \
  --replication-factor=1 \
  --create --topic test

Producer:

echo "hello world" | kafka-console-producer \
    --bootstrap-server localhost:9092 \
    --topic test

Consumer:

kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic test \
  --from-beginning \
  --property print.timestamp=true \
  --property print.key=true \
  --property print.offset=true \
  --property print.partition=true \
  --property print.headers=true \
  --property print.value=true

Or using librdkafka to produce:

echo "Lorem ipsum dolor..." | \
  ./examples/rdkafka_example -P \
  -t test \
  -b localhost:9092 \
  -z gzip

Consumer:

./examples/rdkafka_example \
  -C \
  -t test \
  -b localhost:9092

Tansu is in early development, gaps that we are aware of:

  • Transactions are not currently implemented.
  • While the consumer group protocol is implemented, it isn’t suitable for more than one Tansu broker (while using the PostgreSQL storage engine at present). We intend to fix this soon, and will be part of moving an existing file system segment storage engine on which the group coordinator was originally built.
  • We haven’t looked at the new “server side” consumer coordinator.
  • We split batches into individual records when storing into PostgreSQL. This allows full access to the record data from within SQL, also meaning that we decompress the batch. We create batches on fetch, but don’t currently compress the result.
  • We currently don’t support idempotent messages.
  • We have started looking at the benchmarks from OpenMessaging Benchmark Framework, with the single topic 1kb profile, but haven’t applied any tuning as a result yet.

Please raise an issue if you encounter a problem.