Grimsby: An Erlang Port Controller written in Rust

9 minute read

An Erlang Port provides the basic mechanism for communication from Erlang with the external world.

From the Erlang Reference Manual:

The Erlang process creating a port is said to be the port owner, or the connected process of the port. All communication to and from the port must go through the port owner. If the port owner terminates, so does the port (and the external program, if it is written correctly).

The external program resides in another OS process. By default, it reads from standard input (file descriptor 0) and writes to standard output (file descriptor 1). The external program is to terminate when the port is closed.

Ports and Port DriversErlang Reference Manual

An Erlang Port will work exactly as they are designed to, closing the port, terminates the program. A lot of times this is just exactly what you need. Sometimes, however, you need to close standard input (file descriptor 0) and still allow the program to continue running.

For example, the following using erlang:open_port/2 with /bin/cat works perfectly because standard input remains open:

1> Port = erlang:open_port(
       {spawn_executable, "/bin/cat"},
       [binary, eof, use_stdio, exit_status, stream]).

2> erlang:port_command(Port, "hello world!").
true

3> flush().
Shell got {Port, {data, <<"hello world!">>}}
ok

4> erlang:port_close(Port).
true

5> flush().
ok

Whereas /usr/bin/sum will not, because it requires standard input to be closed before responding with the checksum for the whole input:

1> Port = erlang:open_port(
       {spawn_executable, "/usr/bin/sum"},
       [binary, eof, use_stdio, exit_status, stream]).

2> erlang:port_command(Port, "hello world!").
true

3> flush().
ok

4> erlang:port_close(Port).
truesum: 
stdout: Broken pipe

5> flush().
%% ...nothing has been received...
ok

The erlang:port_close/1 closes standard input, but also closes standard output (and error). The data message is not received with the checksum as a result (it is lost as part of the broken pipe).

Grimsby is an Erlang Port written in Rust that can close its standard input while retaining standard output (and error).

With grimsby, an executable can be spawned, closing stdin if and when necessary, while capturing output on both stdout and stderr:

1> {ok, Spawn} = grimsby_command_sup:start_child(
                   #{executable => "/usr/bin/sum"}).

%% send iodata to the spawned process...
2> ok = grimsby_command:send(
          Spawn,
          ["hello", <<" world!">>]).

%% close stdin...
3> ok = grimsby_command:close(Spawn).

%% important to wait for the spawned process to exit...
4> {ok, 0} = grimsby_command:wait_for_exit(Spawn).

%% output is captured from stdout and stderr as iodata:
5> grimsby_command:info(Spawn).
#{exit => 0,
  eof => [stderr, stdin, stdout],
  stderr => [],
  stdout => [[],<<"3785 1\n">>]}
  
6> grimsby_command:stop(Spawn).

Grimsby

Architecture

The module grimsby_port orchestrates the port protocol between Erlang and the Rust process using BERT framed with a 4 byte big endian length (BURP).

Protocol

The following messages are exchanged over the port:

spawn

{spawn,
 ChildId :: reference(),
 InteractionId :: reference(),
 #{executable := string(),

   %% optional list of arguments (default [])
   args => [string()],

   %% optional map of environment variables
   envs => #{string() => string()},

   %% optional arg0 name
   arg0 => string(),

   %% optional working diretory of the process
   cd => file:filename()}}

On receipt of this message the Rust port controller will spawn a new process using executable as the full path to the executable, with threads monitoring stdin, stdout and stderr, associating ChildId with the new process.

In response:

{InteractionId :: reference(), ok}

If the process has spawned without error, where InteractionId is the correlating reference for the request.

{InteractionId :: reference(), {error, term()}}

If the process failed to spawn for some reason, where InteractionId is the correlating reference for the request.

eof

{eof, ChildId :: reference(), Stream :: stream()}

This message is received by the Erlang side indicating end of file for stdout or stderr. Where ChildId is the reference used to identify the spawned process in the spawn message.

output

{stdout | stderr, ChildId :: reference(), Output :: binary()}

This message is received by the Erlang side indicating the output from the spawned process either from stdout or stderr. Where ChildId is the reference used to identify the spawned process in the spawn message.

exit

{exit, ChildId :: reference(), integer() | signal}

This message is received by the Erlang side indicating that the spawned process either exited normally with a status code, or has been killed by a signal. Where ChildId is the reference used to identify the spawned process in the spawn message.

error

{error, ChildId :: reference(), Stream :: stream()}

This message is received by the Erlang side indicating an error on a stream of the spawned process. Where ChildId is the reference used to identify the spawned process in the spawn message.

send

{send, ChildId :: reference(), InteractionId :: reference(), Data :: binary()}

On receipt of this message the Rust port controller will send Data to the stdin of the spawned process identified by ChildId from the spawn message.

In response:

{InteractionId :: reference(), ok}

If the data was queued to be sent to the spawned process. An error may be sent asynchronously if the data cannot be written later.

{InteractionId :: reference(), {error, term()}}

If the data could not be queued to the process.

close

{close, ChildId :: reference(), InteractionId :: reference(), Stream :: stream()}

On receipt of this message the Rust port controller will close the stream of the spawned process identified by ChildId from the spawn message.

In response:

{InteractionId :: reference(), ok}

If the request was queued to be sent to the spawned process.

{InteractionId :: reference(), {error, term()}}

If the request could not be queued to the process.

wait for exit

{wait_for_exit, ChildId :: reference(), InteractionId :: reference()}

On receipt of this message the Rust port controller will wait for the exit of the spawned process identified by ChildId from the spawn message.

In response:

{InteractionId :: reference(), ok}

If the request was queued to be sent to the spawned process.

{InteractionId :: reference(), {error, term()}}

If the request could not be queued to the process.

kill

{kill, ChildId :: reference(), InteractionId :: reference()}

On receipt of this message the Rust port controller will kill the spawned process identified by ChildId from the spawn message.

In response:

{InteractionId :: reference(), ok}

If the request was queued to be sent to the spawned process.

{InteractionId :: reference(), {error, term()}}

If the request could not be queued to the process.

Erlang Port Side

grimsby_port

grimsby_port uses resource allocation on demand with postpone, to open the Erlang Port only once it has a client request to service.

%% init into the "unready" state...
%%
init([]) ->
    process_flag(trap_exit, true),
    {ok,
     unready,
     #{requests => gen_statem:reqids_new(),
       inflight => #{}}}.

%% postpone any call request, opening the port first...
%% nei(X) -> {next_event, internal, X}.
%%
handle_event({call, _}, _, unready, _) ->
    {keep_state_and_data, [nei(open_port), postpone]};

%% open the port and transition to the "ready" state,
%% allowing postponed requests to be consumed...
%%
handle_event(internal, open_port, unready, Data) ->
    {next_state,
     ready,
     Data#{port => erlang:open_port(...))}};

grimsby_command

While grimsby_port is intended for clients wanting an asynchronous callback interface, grimsby_command presents a less callback heavy interface (but still necessarily asynchronous in places).

The init/1 function, issues an asynchronous request to grimsby_port to spawn (run) an executable.


init([Arg]) ->
    {ok,
     unready,
     #{requests => gen_statem:send_request(
                     grimsby_port,
                     {run, maps:with(
                              [executable,
                               args,
                               envs,
                               arg0,
                               cd],
                              Arg)},
                     #{request => run},
                     gen_statem:reqids_new()),
       eof => ordsets:new(),
       stdout => [],
       stderr => []}}.

It remains in the unready state (postponing any subsequent client requests), until the run request has completed:


handle_event(info,
             Msg,
             unready,
             #{requests := Existing} = Data) ->
    case gen_statem:check_response(Msg,
                                   Existing,
                                   true) of

        {{reply, {ok, SpawnId}},
         #{request := run},
         Updated} ->
            %% run request has spawned a process ok,
            %% move into ready state so that we can
            %% accept further commands:
            %%
            {next_state,
             ready,
             Data#{requests := Updated, spawn => SpawnId}};

        {{reply, {{error, _} = Error, _}},
         #{request := run},
         Updated} ->
            %% the run request failed, transition to
            %% error replying to subsequent client
            %% requests with this error
            %%
            {next_state, Error, Data#{requests := Updated}}
    end;

Rust Port Side

Bert

Erlang has a reasonably small number of types that need to be implemented. For the purposes of this article we shall just examine the tuple type.

pub enum Term {
    ...
    Tuple(Vec<Term>),
    ...
}
decode

The Binary Erlang Terms are decoded in Rust using the nom parsing crate. In the following code section, the calls to nom mean:

  • alt((small_tuple_ext, large_tuple_ext))(i), decode using small_tuple_ext, if that fails try large_tuple_ext as an alternative
  • tag([super::SMALL_TUPLE_EXT])(i)?, only continue decoding if the first byte (u8) matches
fn tuple(i: &[u8]) -> IResult<&[u8], Term> {
    // alternate variants of tuple that are
    // present in BERT...
    alt((small_tuple_ext, large_tuple_ext))(i)
}

fn small_tuple_ext(i: &[u8]) -> IResult<&[u8], Term> {
    // if the input doesn't start with
    // SMALL_TUPLE try the next alt...
    let (i, _) = tag([super::SMALL_TUPLE_EXT])(i)?;

    // for a small tuple the length is just 8 bits:
    let (i, arity) = u8(i)?;
    let (i, contents) = tuple_contents(i, arity as u32)?;

    Ok((i, Term::Tuple(contents)))
}

fn large_tuple_ext(i: &[u8]) -> IResult<&[u8], Term> {
    // if the input doesn't start with
    // LARGE_TUPLE try the next alt...
    let (i, _) = tag([super::LARGE_TUPLE_EXT])(i)?;

    // for a large tuple the length is 32 bits
    let (i, arity) = u32(i)?;
    let (i, contents) = tuple_contents(i, arity)?;

    Ok((i, Term::Tuple(contents)))
}

fn tuple_contents(
   mut i: &[u8],
   arity: u32) -> IResult<&[u8], Vec<Term>> {
    let mut contents = Vec::new();
    for _ in 0..arity {
        // parse all alternates of term:
        let (remaining, content) = term(i)?;
        i = remaining;
        contents.push(content);
    }
    Ok((i, contents))
}

We can use pattern matching in Rust again for dispatching protocol commands. Lets examine matching the following Erlang tuple, received as BERT on the Rust side of the Port:

{spawn, reference(), reference(), #{string() => string()}

In the following code fragment the elements of command spawn tuple above are in elements parameter to the function.

fn dispatch(
    mapping: &mut BTreeMap<Term, Mapping>,
    port_tx: Sender<Term>,
    elements: &[Term],
) -> Option<Term> {

    // the [..] means we don't care about the length
    // of the list in elements
    match elements[..] {
       ...
        // match a list of 4 items, where the first
        // is an atom "spawn", the last is a Map,
        // the middle two can be anything
        //
        [Term::Atom(ref command),
         ref child_id,
         ref interaction_id,
         Term::Map(ref kv)] if command == "spawn" => {
            spawn(mapping,
                  port_tx,
                  child_id,
                  interaction_id,
                  kv)
        }
        ...

Which looks pretty close to the equivalent Erlang pattern match:

fun dispatch(Mapping,
             PortTx,
             {spawn, ChId, InId, #{} = KV}) ->
  do_spawn(Mapping, PortTx, ChId, InId, KV);

...

encode

An Erlang tuple is encoded from a Vec of Term. Tuples having no more than 255 elements are encoded as a SMALL_TUPLE_EXT (with an unsigned 8 bit length). Tuples with more than 255 elements are encoded as a LARGE_TUPLE_EXT with a 32 bit length.

fn tuple(
   w: &mut dyn Write,
   elements: &Vec<Term>) -> Result<(), Error> {
   
    match elements.len() {
        // a small tuple has a length
        // of no more than 255 elements
        length @ 0..=255 => {
            u8(w, &super::SMALL_TUPLE_EXT)?;
            u8(w, &(length as u8))?;
        }
        length => {
            u8(w, &super::LARGE_TUPLE_EXT)?;
            u32(w, &(length as u32))?;
        }
    }
    for element in elements {
        term(w, element)?;
    }
    Ok(())
}

fn u8(w: &mut dyn Write, value: &u8) -> Result<(), Error> {
    w.write_all(&value.to_be_bytes())
}

fn u32(w: &mut dyn Write, value: &u32) -> Result<(), Error> {
    w.write_all(&value.to_be_bytes())
}

Spawn

Each request to spawn on the Rust side of the Port will call spawn on std::process::Command creating a new OS process. A new native OS thread is created for each of stdin, stdout and stderr, communicating through channels with the Port.

Future Directions

  • Tokio (or similar asynchronous run-time), rather than spawning a thread for each of stdin, stdout and stderr.

Examples

Shrugs

shrugs is an Apache licensed self hosted git server available as a docker container written in Erlang. It uses Grimsby to invoke git (specifically: git-receive-pack, git-upload-archive and git-upload-pack), which require stdin to be closed.

See Also

Exec

exec is a great port manager written in C/C++, which early versions of shrugs used. I was inspired to write grimsby to understand the Rust memory safety model.