Streams
NNG provides a common streams API for working with byte-oriented streams. In NNG, streams are bidirectional connections for exchanging a stream of bytes.
Some common examples of streams are TCP connections, UNIX domain sockets, Windows named pipes, and WebSockets.
The API documented here is to facilitate applications that wish to work with these at a lower-level than Scalability Protocols, in a way that is both portable and agnostic about the specific underlying transport mechanism.
tip
When working with Scalability Protocols directly, it is unlikely that there will be any need for using these Streams APIs.
Stream Type
typedef struct nng_stream nng_stream
The base nng_stream
type represents a bidirectional, byte-oriented, reliable connection.
note
The nng_stream
object is used for raw byte stream connections, and
should not be confused with a pipe object created on a socket using
the nng_listen
, nng_dial
or related functions.
Sending and Receiving Data
void nng_stream_send(nng_stream *s, nng_aio *aio);
void nng_stream_recv(nng_stream *s, nng_aio *aio);
The nng_stream_send
function starts sending data asynchronously over the stream s.
The data is sent from the scatter/gather vector located in the nng_aio
aio,
which must have been previously set using nng_aio_set_iov
.
The nng_stream_recv
function starts receiving data [asynchronously over the stream s
into the scatter/gather vector located in the nng_aio
aio,
which must have been previously set using nng_aio_set_iov
.
These functions return immediately, with no return value.
Completion of the operation is signaled via the aio, and the final
result may be obtained via nng_aio_result
.
The I/O operation may complete as soon as at least one byte has been
transferred, or an error has occurred.
Therefore, the number of bytes transferred may be less than requested.
The actual number of bytes transferred can be determined with nng_aio_count
.
Closing a Stream
void nng_stream_close(nng_stream *s);
void nng_stream_stop(nng_stream *s);
void nng_stream_free(nng_stream *s);
The nng_stream_close
function closes a stream, but does not destroy it.
This function returns immediately. Operations that are pending against the stream, such
as nng_stream_send
or nng_stream_recv
operations will be canceled asynchronously, if possible.
Those operations will result in NNG_ECLOSED
.
The nng_stream_stop
function not only closes the stream, but waits for any operations
pending against it to complete, and for any underlying asynchronous registered I/O to be fully deregistered.
As some systems use separate threads for asynchronous I/O, stopping the stream is necessary before those
resources can be freed. Until the stream is stopped, there could still be I/O operations in flight,
making it unsafe to deallocate memory.
The nng_stream_free
function stops the stream like nng_stream_stop
, but also deallocates the
stream itself.
note
Because nng_stream_stop
and nng_stream_free
both may block waiting for outstanding I/O to complete
or be aborted, these functions are unsafe to call from functions that may not block, such as the
completion function registered with an nng_aio
when it is created.
Getting Stream Options
int nng_stream_get_bool(nng_stream *s, const char *opt, bool *valp);
int nng_stream_get_int(nng_stream *s, const char *opt, int *valp);
int nng_stream_get_ms(nng_stream *s, const char *opt, nng_duration *valp);
int nng_stream_get_size(nng_stream *s, const char *opt, size_t *valp);
int nng_stream_get_addr(nng_stream *s, const char *opt, nng_sockaddr *valp);
int nng_stream_get_string(nng_stream *s, const char *opt, char **valp);
These functions are used to obtain value of an option named opt from the stream s, and store it in the location referenced by valp.
These functions access an option as a specific type. The transport layer will have details about which options are available, and which type they may be accessed using.
In the case of nng_stream_get_string
, the string is created as if by nng_strdup
, and must be freed by
the caller using nng_strfree
when no longer needed.
Stream Factories
typedef struct nng_stream_dialer nng_stream_dialer;
typedef struct nng_stream_listener nng_stream_listener;
The nng_stream_listener
object and nng_stream_listener
objects can be thought of as factories that
create nng_stream
streams.
The nng_stream_listener
object a handle to a listener, which creates streams by accepting incoming connection requests.
In a BSD socket implementation, this is the entity responsible for doing bind
, listen
and accept
.
Normally a listener may be used to accept multiple, possibly many, concurrent connections.
The nng_stream_dialer
object is a handle to a dialer, which creates streams by making outgoing
connection requests. While there isn’t a specific BSD socket analogue, this can be thought of as a factory for TCP sockets
created by opening them with socket
and then calling connect
on them.
Creating a Stream Factory
int nng_stream_dialer_alloc(nng_stream_dialer **dialerp, const char *url);
int nng_stream_dialer_alloc_url(nng_stream_dialer **dialerp, const nng_url *url);
int nng_stream_listener_alloc(nng_stream_listener **lstenerp, const char *url);
int nng_stream_listener_alloc_url(nng_stream_listener **listenerp, const nng_url *url);
The nng_stream_dialer_alloc
and nng_stream_dialer_alloc_url
functions create a stream dialer, associated the
URL specified by url represented as a string, or as an nng_url
object, respectively. The dialer is returned in the location
dialerp references.
The nng_stream_listener_alloc
and nng_stream_listener_alloc_url
functions create a stream listener, associated the
URL specified by url represented as a string, or as an nng_url
object, respectively. The listener is returned in the location
listenerp references.
Example 1: Creating a TCP Listener
This shows creating a TCP listener that listens on INADDR_ANY
, port 444.
nng_listener listener;
int rv = nng_stream_listener_alloc(&listener, "tcp://:444");
Closing a Stream Factory
void nng_stream_dialer_close(nng_stream_listener *dialer);
void nng_stream_dialer_stop(nng_stream_listener *dialer);
void nng_stream_dialer_free(nng_stream_listener *dialer);
void nng_stream_listener_close(nng_stream_listener *listener);
void nng_stream_listener_stop(nng_stream_listener *listener);
void nng_stream_listener_free(nng_stream_listener *listener);
The nng_stream_dialer_close
and nng_stream_listener_close
functions close the stream dialer or listener,
preventing it from creating new connections.
This will generally include closing any underlying file used for creating such connections.
However, some requests may still be pending when this function returns, as it does not wait for the shutdown to complete.
The nng_stream_dialer_stop
and nng_stream_listener_stop
functions performs the same action,
but also wait until all outstanding requests are serviced, and the dialer or listener is completely stopped.
Because they blocks, these functions must not be called in contexts where blocking is not allowed.
The nng_stream_dialer_free
and nng_stream_listener_free
function performs the same action as
nng_stream_dialer_stop
or nng_stream_listener_stop
, but also deallocates the dialer or listener, and any associated resources.
tip
A best practice for shutting down an application safely is to stop everything before deallocating. This ensures that no callbacks are running that could reference an object after it is deallocated.
Making Outgoing Connections
void nng_stream_dialer_dial(nng_stream_dialer *dialer, nng_aio *aio);
The nng_stream_dialer_dial
initiates an outgoing connection asynchronously, using the nng_aio
aio.
If it successfully establishes a connection, it creates an nng_stream
, which can be obtained as the first
output result on aio using the nng_aio_get_output
function with index zero.
tip
An nng_stream_dialer
can be used multiple times to make multiple concurrent connection requests, but
they all must reference the same URL.
Example 3: Connecting to Google
This demonstrates making an outbound connection to “google.com” on TCP port 80. Error handling is elided for clarity.
nng_aio *aio;
nng_stream_dialer *dialer;
nng_stream *stream;
nng_stream_dialer_alloc(&dialer, "tcp://google.com:80");
nng_aio_alloc(&aio, NULL, NULL);
// make a single outbound connection
nng_stream_dialer_dial(dialer, aio);
nng_aio_wait(aio); // wait for the asynch operation to complete
if (nng_aio_result(aio) != 0) {
// ... handle the error
}
stream = nng_aio_get_output(aio, 0);
Accepting Incoming Connections
int nng_stream_listener_listen(nng_stream_listener *listener);
void nng_stream_listener_accept(nng_stream_listener *listener, nng_aio *aio);
Accepting incoming connections is performed in two steps. The first step, nng_stream_listener_listen
is to setup for
listening. For a TCP implementation of this, for example, this would perform the bind
and the listen
steps. This will bind
to the address represented by the URL that was specific when the listener was created with nng_stream_listener_alloc
.
In the second step, nng_stream_listener_accept
accepts an incoming connection on listener asynchronously, using the nng_aio
aio.
If an incoming connection is accepted, it will be represented as an nng_stream
, which can be obtained from the aio as the first
output result using the nng_aio_get_output
function with index zero.
Example 3: Accepting an Inbound Stream
For clarity this example uses a synchronous approach using nng_aio_wait
, but a typical server application
would most likely use a callback to accept the incoming stream, and start another instance of nng_stream_listener_accept
.
nng_aio *aio;
nng_listener *listener;
nng_stream *stream;
nng_stream_listener_alloc(&listener, "tcp://:8181");
nng_aio_alloc(&aio, NULL, NULL); // normally would use a callback
// listen (binding to the URL in the process)
if (nng_stream_listener_listen(listener)) {
// ... handle the error
}
// now accept a single incoming connection as a stream object
nng_stream_listener_accept(l, aio);
nng_aio_wait(aio); // wait for the asynch operation to complete
if (nng_aio_result(aio) != 0) {
// ... handle the error
}
stream = nng_aio_get_output(aio, 0);
Stream Factory Options
int nng_stream_dialer_get_addr(nng_stream_dialer *dialer, const char *opt, nng_sockaddr *valp);
int nng_stream_dialer_get_bool(nng_stream_dialer *dialer, const char *opt, bool *valp);
int nng_stream_dialer_get_int(nng_stream_dialer *dialer, const char *opt, int *valp);
int nng_stream_dialer_get_ms(nng_stream_dialer *dialer, const char *opt, nng_duration *valp);
int nng_stream_dialer_get_size(nng_stream_dialer *dialer, const char *opt, size_t *valp);
int nng_stream_dialer_get_string(nng_stream_dialer *dialer, const char *opt, char **valp);
int nng_stream_listener_get_addr(nng_stream_listener *listener, const char *opt, nng_sockaddr *valp);
int nng_stream_listener_get_bool(nng_stream_listener *listener, const char *opt, bool *valp);
int nng_stream_listener_get_int(nng_stream_listener *listener, const char *opt, int *valp);
int nng_stream_listener_get_ms(nng_stream_listener *listener, const char *opt, nng_duration *valp);
int nng_stream_listener_get_size(nng_stream_listener *listener, const char *opt, size_t *valp);
int nng_stream_listener_get_string(nng_stream_listener *listener, const char *opt, char **valp);
int nng_stream_dialer_set_addr(nng_stream_dialer *dialer, const char *opt, const nng_sockaddr *val);
int nng_stream_dialer_set_bool(nng_stream_dialer *dialer, const char *opt, bool val);
int nng_stream_dialer_set_int(nng_stream_dialer *dialer, const char *opt, int val);
int nng_stream_dialer_set_ms(nng_stream_dialer *dialer, const char *opt, nng_duration val);
int nng_stream_dialer_set_size(nng_stream_dialer *dialer, const char *opt, size_t val);
int nng_stream_dialer_set_string(nng_stream_dialer *dialer, const char *opt, const char *val);
int nng_stream_listener_set_addr(nng_stream_listener *listener, const char *opt, const nng_sockaddr *val);
int nng_stream_listener_set_bool(nng_stream_listener *listener, const char *opt, bool val);
int nng_stream_listener_set_int(nng_stream_listener *listener, const char *opt, int val);
int nng_stream_listener_set_ms(nng_stream_listener *listener, const char *opt, nng_duration val);
int nng_stream_listener_set_size(nng_stream_listener *listener, const char *opt, size_t val);
int nng_stream_listener_set_string(nng_stream_listener *listener, const char *opt, const char *val);
These functions are used to retrieve or change the value of an option named opt from the stream dialer or listener.
The nng_stream_dialer_get_
and nng_stream_listener_get_
function families retrieve the value, and store it in the location valp references.
The nng_stream_dialer_set_
and nng_stream_listener_set_
function families change the value for the dialer or listener, taking it from val.
These functions access an option as a specific type. The transport layer will have details about which options are available, and which type they may be accessed using.
In the case of nng_stream_dialer_get_string
and nng_stream_listener_get_string
, the string is created as if by nng_strdup
, and must be freed by
the caller using nng_strfree
when no longer needed.
In the case of nng_stream_dialer_set_string
and nng_stream_listener_set_string
, the string contents are copied if necessary, so that the caller
need not retain the value referenced once the function returns.
In the case of nng_stream_dialer_set_addr
and nng_stream_listener_set_addr
, the contents of addr are copied if necessary, so that the caller
need not retain the value referenced once the function returns.
Example 4: Socket Activation
Some nng_stream_listener
objects, depending on the underlying transport and platform, can support a technique known as “socket activation”,
where the file descriptor used for listening and accepting is supplied externally, such as by a system service manager.
In this case, the application supplies the file descriptor or SOCKET
object using the NNG_OPT_LISTEN_FD
option,
instead of calling nng_stream_listener_listen
.
tip
Scalability Protocols transports based upon stream implementations that support socket activation can also benefit from this approach.
nng_stream_listener *listener;
int fd;
// This is a systemd API, not part of NNG.
// See systemd documentation for an explanation.
// fd at this point has already had bind() and listen() called.
fd = SD_LISTEN_FDS_START + 0;
nng_stream_listener_alloc(&listener, "tcp://");
nng_stream_listener_set_int(listener, NNG_OPT_LISTEN_FD, fd);
// can now start doing nng_stream_listener_accept...
TLS Configuration
int nng_stream_dialer_get_tls(nng_stream_listener *dialer, nng_tls_config **tlsp);
int nng_stream_dialer_set_tls(nng_stream_listener *dialer, nng_tls_config *tls);
int nng_stream_listener_get_tls(nng_stream_listener *listener, nng_tls_config **tlsp);
int nng_stream_listener_set_tls(nng_stream_listener *listener, nng_tls_config *tls);
Both nng_stream_dialer
and nng_stream_listener
objects may support configuration of TLS parameters.
The nng_stream_dialer_set_tls
and nng_stream_listener_set_tls
functions support setting the
configuration of a nng_tls_config
object supplied by tls on dialer or listener.
This must be performed before the listener starts listening with nng_stream_listener_listen
, or the dialer starts an outgoing connection
as a result of nng_stream_dialer_dial
.
The configuration object that was previously established (which may be a default if one was not explicitly
configured) can be obtained with the nng_stream_dialer_get_tls
and nng_stream_listener_get_tls
.
They will return a pointer to the nng_tls_config
object in question at the location referenced by tlsp.
note
TLS configuration cannot be changed once it has started being used by a listener or dialer. This applies to
both configuring a different TLS configuration object, as well as mutating the existing nng_tls_config
object.