Grimsby: An Erlang Port Controller written in Rust
An Erlang Port provides the basic mechanism for communication from Erlang with the external world.
From the Erlang Reference Manual:
The Erlang process creating a port is said to be the port owner, or the connected process of the port. All communication to and from the port must go through the port owner. If the port owner terminates, so does the port (and the external program, if it is written correctly).
The external program resides in another OS process. By default, it reads from standard input (file descriptor 0) and writes to standard output (file descriptor 1). The external program is to terminate when the port is closed.
Ports and Port Drivers — Erlang Reference Manual
An Erlang Port will work exactly as they are designed to, closing the port, terminates the program. A lot of times this is just exactly what you need. Sometimes, however, you need to close standard input (file descriptor 0) and still allow the program to continue running.
For example, the following using erlang:open_port/2
with /bin/cat
works perfectly because standard input remains open:
1> Port = erlang:open_port(
{spawn_executable, "/bin/cat"},
[binary, eof, use_stdio, exit_status, stream]).
2> erlang:port_command(Port, "hello world!").
true
3> flush().
Shell got {Port, {data, <<"hello world!">>}}
ok
4> erlang:port_close(Port).
true
5> flush().
ok
Whereas /usr/bin/sum
will not, because it requires standard input to
be closed before responding with the checksum for the whole input:
1> Port = erlang:open_port(
{spawn_executable, "/usr/bin/sum"},
[binary, eof, use_stdio, exit_status, stream]).
2> erlang:port_command(Port, "hello world!").
true
3> flush().
ok
4> erlang:port_close(Port).
truesum:
stdout: Broken pipe
5> flush().
%% ...nothing has been received...
ok
The erlang:port_close/1
closes standard input, but also closes
standard output (and error). The data
message is not received with
the checksum as a result (it is lost as part of the broken pipe).
Grimsby is an Erlang Port written in Rust that can close its standard input while retaining standard output (and error).
With grimsby, an executable can be spawned, closing stdin if and when necessary, while capturing output on both stdout and stderr:
1> {ok, Spawn} = grimsby_command_sup:start_child(
#{executable => "/usr/bin/sum"}).
%% send iodata to the spawned process...
2> ok = grimsby_command:send(
Spawn,
["hello", <<" world!">>]).
%% close stdin...
3> ok = grimsby_command:close(Spawn).
%% important to wait for the spawned process to exit...
4> {ok, 0} = grimsby_command:wait_for_exit(Spawn).
%% output is captured from stdout and stderr as iodata:
5> grimsby_command:info(Spawn).
#{exit => 0,
eof => [stderr, stdin, stdout],
stderr => [],
stdout => [[],<<"3785 1\n">>]}
6> grimsby_command:stop(Spawn).
Grimsby
The module grimsby_port
orchestrates the port protocol between
Erlang and the Rust process using BERT framed with a 4 byte
big endian length (BURP).
Protocol
The following messages are exchanged over the port:
spawn
{spawn,
ChildId :: reference(),
InteractionId :: reference(),
#{executable := string(),
%% optional list of arguments (default [])
args => [string()],
%% optional map of environment variables
envs => #{string() => string()},
%% optional arg0 name
arg0 => string(),
%% optional working diretory of the process
cd => file:filename()}}
On receipt of this message the Rust port controller will spawn a new
process using executable
as the full path to the executable, with
threads monitoring stdin
, stdout
and stderr
, associating
ChildId
with the new process.
In response:
{InteractionId :: reference(), ok}
If the process has spawned without error, where InteractionId
is the
correlating reference for the request.
{InteractionId :: reference(), {error, term()}}
If the process failed to spawn for some reason, where InteractionId
is the correlating reference for the request.
eof
{eof, ChildId :: reference(), Stream :: stream()}
This message is received by the Erlang side indicating end of file for
stdout
or stderr
. Where ChildId
is the reference used to
identify the spawned process in the spawn
message.
output
{stdout | stderr, ChildId :: reference(), Output :: binary()}
This message is received by the Erlang side indicating the output from
the spawned process either from stdout
or stderr
. Where ChildId
is the reference used to identify the spawned process in the spawn
message.
exit
{exit, ChildId :: reference(), integer() | signal}
This message is received by the Erlang side indicating that the
spawned process either exited normally with a status code, or has been
killed by a signal. Where ChildId
is the reference used to identify
the spawned process in the spawn
message.
error
{error, ChildId :: reference(), Stream :: stream()}
This message is received by the Erlang side indicating an error on a
stream of the spawned process. Where ChildId
is the reference used
to identify the spawned process in the spawn
message.
send
{send, ChildId :: reference(), InteractionId :: reference(), Data :: binary()}
On receipt of this message the Rust port controller will send Data
to the stdin
of the spawned process identified by ChildId
from the
spawn
message.
In response:
{InteractionId :: reference(), ok}
If the data was queued to be sent to the spawned process. An error may be sent asynchronously if the data cannot be written later.
{InteractionId :: reference(), {error, term()}}
If the data could not be queued to the process.
close
{close, ChildId :: reference(), InteractionId :: reference(), Stream :: stream()}
On receipt of this message the Rust port controller will close the
stream of the spawned process identified by ChildId
from the spawn
message.
In response:
{InteractionId :: reference(), ok}
If the request was queued to be sent to the spawned process.
{InteractionId :: reference(), {error, term()}}
If the request could not be queued to the process.
wait for exit
{wait_for_exit, ChildId :: reference(), InteractionId :: reference()}
On receipt of this message the Rust port controller will wait for the
exit of the spawned process identified by ChildId
from the spawn
message.
In response:
{InteractionId :: reference(), ok}
If the request was queued to be sent to the spawned process.
{InteractionId :: reference(), {error, term()}}
If the request could not be queued to the process.
kill
{kill, ChildId :: reference(), InteractionId :: reference()}
On receipt of this message the Rust port controller will kill the
spawned process identified by ChildId
from the spawn
message.
In response:
{InteractionId :: reference(), ok}
If the request was queued to be sent to the spawned process.
{InteractionId :: reference(), {error, term()}}
If the request could not be queued to the process.
Erlang Port Side
grimsby_port
grimsby_port uses resource allocation on demand with postpone, to open the Erlang Port only once it has a client request to service.
%% init into the "unready" state...
%%
init([]) ->
process_flag(trap_exit, true),
{ok,
unready,
#{requests => gen_statem:reqids_new(),
inflight => #{}}}.
%% postpone any call request, opening the port first...
%% nei(X) -> {next_event, internal, X}.
%%
handle_event({call, _}, _, unready, _) ->
{keep_state_and_data, [nei(open_port), postpone]};
%% open the port and transition to the "ready" state,
%% allowing postponed requests to be consumed...
%%
handle_event(internal, open_port, unready, Data) ->
{next_state,
ready,
Data#{port => erlang:open_port(...))}};
grimsby_command
While grimsby_port is intended for clients wanting an asynchronous callback interface, grimsby_command presents a less callback heavy interface (but still necessarily asynchronous in places).
The init/1
function, issues an asynchronous request to
grimsby_port
to spawn (run) an executable.
init([Arg]) ->
{ok,
unready,
#{requests => gen_statem:send_request(
grimsby_port,
{run, maps:with(
[executable,
args,
envs,
arg0,
cd],
Arg)},
#{request => run},
gen_statem:reqids_new()),
eof => ordsets:new(),
stdout => [],
stderr => []}}.
It remains in the unready
state (postponing any subsequent client
requests), until the run
request has completed:
handle_event(info,
Msg,
unready,
#{requests := Existing} = Data) ->
case gen_statem:check_response(Msg,
Existing,
true) of
{{reply, {ok, SpawnId}},
#{request := run},
Updated} ->
%% run request has spawned a process ok,
%% move into ready state so that we can
%% accept further commands:
%%
{next_state,
ready,
Data#{requests := Updated, spawn => SpawnId}};
{{reply, {{error, _} = Error, _}},
#{request := run},
Updated} ->
%% the run request failed, transition to
%% error replying to subsequent client
%% requests with this error
%%
{next_state, Error, Data#{requests := Updated}}
end;
Rust Port Side
Bert
Erlang has a reasonably small number of types that need to be implemented. For the purposes of this article we shall just examine the tuple type.
pub enum Term {
...
Tuple(Vec<Term>),
...
}
decode
The Binary Erlang Terms are decoded in Rust using the nom parsing crate. In the following code section, the calls to nom mean:
alt((small_tuple_ext, large_tuple_ext))(i)
, decode usingsmall_tuple_ext
, if that fails trylarge_tuple_ext
as an alternativetag([super::SMALL_TUPLE_EXT])(i)?
, only continue decoding if the first byte (u8) matches
fn tuple(i: &[u8]) -> IResult<&[u8], Term> {
// alternate variants of tuple that are
// present in BERT...
alt((small_tuple_ext, large_tuple_ext))(i)
}
fn small_tuple_ext(i: &[u8]) -> IResult<&[u8], Term> {
// if the input doesn't start with
// SMALL_TUPLE try the next alt...
let (i, _) = tag([super::SMALL_TUPLE_EXT])(i)?;
// for a small tuple the length is just 8 bits:
let (i, arity) = u8(i)?;
let (i, contents) = tuple_contents(i, arity as u32)?;
Ok((i, Term::Tuple(contents)))
}
fn large_tuple_ext(i: &[u8]) -> IResult<&[u8], Term> {
// if the input doesn't start with
// LARGE_TUPLE try the next alt...
let (i, _) = tag([super::LARGE_TUPLE_EXT])(i)?;
// for a large tuple the length is 32 bits
let (i, arity) = u32(i)?;
let (i, contents) = tuple_contents(i, arity)?;
Ok((i, Term::Tuple(contents)))
}
fn tuple_contents(
mut i: &[u8],
arity: u32) -> IResult<&[u8], Vec<Term>> {
let mut contents = Vec::new();
for _ in 0..arity {
// parse all alternates of term:
let (remaining, content) = term(i)?;
i = remaining;
contents.push(content);
}
Ok((i, contents))
}
We can use pattern matching in Rust again for dispatching protocol commands. Lets examine matching the following Erlang tuple, received as BERT on the Rust side of the Port:
{spawn, reference(), reference(), #{string() => string()}
In the following code fragment the elements of command spawn
tuple above are in elements
parameter to the function.
fn dispatch(
mapping: &mut BTreeMap<Term, Mapping>,
port_tx: Sender<Term>,
elements: &[Term],
) -> Option<Term> {
// the [..] means we don't care about the length
// of the list in elements
match elements[..] {
...
// match a list of 4 items, where the first
// is an atom "spawn", the last is a Map,
// the middle two can be anything
//
[Term::Atom(ref command),
ref child_id,
ref interaction_id,
Term::Map(ref kv)] if command == "spawn" => {
spawn(mapping,
port_tx,
child_id,
interaction_id,
kv)
}
...
Which looks pretty close to the equivalent Erlang pattern match:
fun dispatch(Mapping,
PortTx,
{spawn, ChId, InId, #{} = KV}) ->
do_spawn(Mapping, PortTx, ChId, InId, KV);
...
encode
An Erlang tuple is encoded from a Vec of
Term. Tuples having no more than 255 elements are encoded as a
SMALL_TUPLE_EXT
(with an unsigned 8 bit length). Tuples with more
than 255 elements are encoded as a LARGE_TUPLE_EXT
with a 32 bit
length.
fn tuple(
w: &mut dyn Write,
elements: &Vec<Term>) -> Result<(), Error> {
match elements.len() {
// a small tuple has a length
// of no more than 255 elements
length @ 0..=255 => {
u8(w, &super::SMALL_TUPLE_EXT)?;
u8(w, &(length as u8))?;
}
length => {
u8(w, &super::LARGE_TUPLE_EXT)?;
u32(w, &(length as u32))?;
}
}
for element in elements {
term(w, element)?;
}
Ok(())
}
fn u8(w: &mut dyn Write, value: &u8) -> Result<(), Error> {
w.write_all(&value.to_be_bytes())
}
fn u32(w: &mut dyn Write, value: &u32) -> Result<(), Error> {
w.write_all(&value.to_be_bytes())
}
Spawn
Each request to spawn
on the Rust side of the Port will call spawn
on std::process::Command creating a
new OS process. A new native OS thread is created for each of stdin,
stdout and stderr, communicating through
channels with the Port.
Future Directions
- Tokio (or similar asynchronous run-time), rather than
spawning a thread for each of
stdin
,stdout
andstderr
.
Examples
Shrugs
shrugs is an Apache licensed self hosted
git server available as a docker container written
in Erlang. It uses Grimsby to invoke git
(specifically: git-receive-pack
, git-upload-archive
and
git-upload-pack
), which require stdin to be closed.
See Also
Exec
exec is a great port manager written in C/C++, which early versions of shrugs used. I was inspired to write grimsby to understand the Rust memory safety model.