Tansu: An Apache Kafka Compatible API Server written in Rust

5 minute read

Tansu is a GNU AGPL licensed Apache Kafka compatible server written in 100% 🦀 Rust. Tansu is still in early development (github). Please expect breaking changes, sharp edges and bugs.

Using the Apache Kafka CLI tools, we can create topics, produce messages and use the console group consumer, fetching and committing offsets back to Tansu.

Working (with plenty of sharp edges):

  • topic creation with multiple partitions
  • topic metadata
  • produce (records rather than message sets)
  • fetch (records rather than message sets)
  • consumer group coordinator: join, sync, heartbeat, leave, offset commit/fetch
  • coordination of cluster state via Raft

In progress:

  • Replication of topic segments: local, S3, …
  • Transactions
  • Many APIs have a canned response (e.g., describe cluster) at present
  • Anything beyond the script found in this README

The following section is about how the Apache Kafka protocol is implemented in Tansu. Subsequent articles will cover other details including the group consumer implementation.

Protocol

The Apache Kafka protocol is described by a number of JSON documents and a set of primitive types. Each API is separately versioned allowing the protocol to evolve while maintaining backward compatibility with older clients. That evolution has included moving to a flexible format (using varint rather than fixed length) and slightly more recently to optional tagged fields. An optional handshake asks the server which protocol versions it can support, allowing the client to adapt where necessary.

A message is stored in Kafka using the produce API. The client must poll the server to receive messages using the fetch API. The actual message data is described by a separate message format which includes control batches (used by transactions), records and headers.

In Kafka, the server is relatively simple with a lot of responsibility delegated to the client. For example, the partition used when producing a message is determined by the client, and clients being responsible for operating much of the consumer group protocol.

In Tansu, the protocol layer is built from JSON message definitions found in Apache Kafka 3.8.0, which are transformed into Rust structs and enums with lots of help from the prettyplease, proc-macro2, quote and syn crates, which all come together in build.rs. Originally, this was all structured as a function like proc macro, but I couldn’t get Rust Analyser to play nicely with it. The code is currently split into Tansu Kafka Model (the public model that couldn’t be part of the proc macro crate) and Tansu Kafka Sans IO (formerly the proc macro, that now just includes the generated output), which should probably now be combined into sans IO.

The protocol structs and enums are Kafka API version agnostic, with heavy usage of Option for those parts that are only present in certain versions. Originally, the proc macro output a separate enum variant for each version of a message, but this became complex when writing the server side code and matching the 17 different variants of a fetch request (and 17 more for the response). Using the serde crate, the data format is implemented by the deserializer and serializer.

When deserializing a request, the version is part of the header, which then determines which format the message takes and whether tagged fields can also be present. A response must use the same API version as the request.

Tagged fields are handled by serializing or deserializing into a mezzanine type for that API and then using a serde container attribute try from the actual struct that contains the optional tagged fields.

Using the frame header as an example (the header allows tagged fields, but none are currently defined by Kafka):

#[derive(Deserialize, .., Serialize)]
#[serde(try_from = "HeaderMezzanine")]
#[serde(into = "HeaderMezzanine")]
pub enum Header {
    Request {
        api_key: i16,
        api_version: i16,
        correlation_id: i32,
        client_id: Option<String>,
    },
    Response {
        correlation_id: i32,
    },
}

With the header mezzanine type:

#[derive(Deserialize, .., Serialize)]
pub enum HeaderMezzanine {
    Request {
        api_key: i16,
        api_version: i16,
        correlation_id: i32,
        client_id: Option<String>,
        tag_buffer: Option<TagBuffer>,
    },
    Response {
        correlation_id: i32,
        tag_buffer: Option<TagBuffer>,
    },
}

In Tansu, each API request and response are represented by variants of the Body enum. Using a fetch request as an example:

  • replica_id, is only used in versions: 0-14
  • max_wait_ms, is valid for versions: 0+
  • cluster_id is a tagged field in versions: 12+
  • replica_state is a tagged field in versions: 15+
#[serde(from = "mezzanine::Body")]
#[serde(into = "mezzanine::Body")]
#[derive(Deserialize, .., Serialize)]
pub enum Body
    FetchRequest {
        replica_id: Option<i32>,
        max_wait_ms: i32,
        ..lots of other non tag fields

        cluster_id: Option<String>,
        replica_state: Option<ReplicaState>,
    }
}

Where optional tagged field replica_state is defined as:

#[derive(Deserialize, .., Serialize)]
pub struct ReplicaState {
   pub replica_id: i32,
   pub log_end_offset: i64,
   pub last_fetch_timestamp: Option<i64>,
   pub last_caught_up_timestamp: Option<i64>,
}

When Body is deserialized or serialized by serde, it is converted into a mezzanine type where the cluster_id and replica_state are converted into a tag buffer, which is then serialized:

mod mezzanine {
    #[derive(Deserialize, .., Serialize)]
    pub enum Body
        FetchRequest {
            replica_id: Option<i32>,
            max_wait_ms: i32,
            ..lots of other non tag fields

            tag_buffer: Option<TagBuffer>,
        }
    }
}

A similar approach is used for compression of message batches used in the fetch and produce API. The deflated batch structure is defined as:

#[derive(Deserialize, .., Serialize)]
pub struct Batch {
    pub base_offset: i64,
    pub batch_length: i32,
    ..
    pub record_count: u32,
    pub record_data: Bytes,
}

The record_data represents the compressed records (using gzip, lz4, snappy or zstd). The inflated batch structure is defined as:

#[derive(Deserialize, .., Serialize)]
#[serde(try_from = "deflated::Batch")]
pub struct Batch {
    pub base_offset: i64,
    pub batch_length: i32,
    ..
    pub records: Vec<Record>,
}

Switching between the deflated or inflated batches is as simple as a try into or from. An inflated batch is automatically deflated by serde when serialized. The server doesn’t need to inflate batches during a fetch or produce, so record set data is kept using the bytes crate. The server can inflate batches, separately during key compaction.

The key takeaways here are:

  • the protocol types are automatically generated from the Apache Kakfa protocol definitions in 100% Rust.
  • the protocol crate is sans IO, serializing to the Write trait, and deserializing to the Read trait.
  • the protocol follows serde conventions of implementing the Apache Kafka protocol in the deserializer and serializer, adapting the output depending on the version of the API being used.
  • tag buffers and compression are implemented using a mezzanine type to convert into a strictly typed Rust structures, using serde container attributes.