Streams

NNG provides a common streams API for working with byte-oriented streams. In NNG, streams are bidirectional connections for exchanging a stream of bytes.

Some common examples of streams are TCP connections, UNIX domain sockets, Windows named pipes, and WebSockets.

The API documented here is to facilitate applications that wish to work with these at a lower-level than Scalability Protocols, in a way that is both portable and agnostic about the specific underlying transport mechanism.

tip

When working with Scalability Protocols directly, it is unlikely that there will be any need for using these Streams APIs.

Stream Type

typedef struct nng_stream nng_stream

The base nng_stream type represents a bidirectional, byte-oriented, reliable connection.

note

The nng_stream object is used for raw byte stream connections, and should not be confused with a pipe object created on a socket using the nng_listen, nng_dial or related functions.

Sending and Receiving Data

void nng_stream_send(nng_stream *s, nng_aio *aio);
void nng_stream_recv(nng_stream *s, nng_aio *aio);

The nng_stream_send function starts sending data asynchronously over the stream s. The data is sent from the scatter/gather vector located in the nng_aio aio, which must have been previously set using nng_aio_set_iov.

The nng_stream_recv function starts receiving data [asynchronously over the stream s into the scatter/gather vector located in the nng_aio aio, which must have been previously set using nng_aio_set_iov.

These functions return immediately, with no return value. Completion of the operation is signaled via the aio, and the final result may be obtained via nng_aio_result.

The I/O operation may complete as soon as at least one byte has been transferred, or an error has occurred. Therefore, the number of bytes transferred may be less than requested. The actual number of bytes transferred can be determined with nng_aio_count.

Closing a Stream

void nng_stream_close(nng_stream *s);
void nng_stream_stop(nng_stream *s);
void nng_stream_free(nng_stream *s);

The nng_stream_close function closes a stream, but does not destroy it. This function returns immediately. Operations that are pending against the stream, such as nng_stream_send or nng_stream_recv operations will be canceled asynchronously, if possible. Those operations will result in NNG_ECLOSED.

The nng_stream_stop function not only closes the stream, but waits for any operations pending against it to complete, and for any underlying asynchronous registered I/O to be fully deregistered. As some systems use separate threads for asynchronous I/O, stopping the stream is necessary before those resources can be freed. Until the stream is stopped, there could still be I/O operations in flight, making it unsafe to deallocate memory.

The nng_stream_free function stops the stream like nng_stream_stop, but also deallocates the stream itself.

note

Because nng_stream_stop and nng_stream_free both may block waiting for outstanding I/O to complete or be aborted, these functions are unsafe to call from functions that may not block, such as the completion function registered with an nng_aio when it is created.

Getting Stream Options

int nng_stream_get_bool(nng_stream *s, const char *opt, bool *valp);
int nng_stream_get_int(nng_stream *s, const char *opt, int *valp);
int nng_stream_get_ms(nng_stream *s, const char *opt, nng_duration *valp);
int nng_stream_get_size(nng_stream *s, const char *opt, size_t *valp);
int nng_stream_get_addr(nng_stream *s, const char *opt, nng_sockaddr *valp);
int nng_stream_get_string(nng_stream *s, const char *opt, char **valp);

These functions are used to obtain value of an option named opt from the stream s, and store it in the location referenced by valp.

These functions access an option as a specific type. The transport layer will have details about which options are available, and which type they may be accessed using.

In the case of nng_stream_get_string, the string is created as if by nng_strdup, and must be freed by the caller using nng_strfree when no longer needed.

Stream Factories

typedef struct nng_stream_dialer nng_stream_dialer;
typedef struct nng_stream_listener nng_stream_listener;

The nng_stream_listener object and nng_stream_listener objects can be thought of as factories that create nng_stream streams.

The nng_stream_listener object a handle to a listener, which creates streams by accepting incoming connection requests. In a BSD socket implementation, this is the entity responsible for doing bind, listen and accept. Normally a listener may be used to accept multiple, possibly many, concurrent connections.

The nng_stream_dialer object is a handle to a dialer, which creates streams by making outgoing connection requests. While there isn’t a specific BSD socket analogue, this can be thought of as a factory for TCP sockets created by opening them with socket and then calling connect on them.

Creating a Stream Factory

int nng_stream_dialer_alloc(nng_stream_dialer **dialerp, const char *url);
int nng_stream_dialer_alloc_url(nng_stream_dialer **dialerp, const nng_url *url);
int nng_stream_listener_alloc(nng_stream_listener **lstenerp, const char *url);
int nng_stream_listener_alloc_url(nng_stream_listener **listenerp, const nng_url *url);

The nng_stream_dialer_alloc and nng_stream_dialer_alloc_url functions create a stream dialer, associated the URL specified by url represented as a string, or as an nng_url object, respectively. The dialer is returned in the location dialerp references.

The nng_stream_listener_alloc and nng_stream_listener_alloc_url functions create a stream listener, associated the URL specified by url represented as a string, or as an nng_url object, respectively. The listener is returned in the location listenerp references.

Example 1: Creating a TCP Listener

This shows creating a TCP listener that listens on INADDR_ANY, port 444.

nng_listener listener;
int rv = nng_stream_listener_alloc(&listener, "tcp://:444");

Closing a Stream Factory

void nng_stream_dialer_close(nng_stream_listener *dialer);
void nng_stream_dialer_stop(nng_stream_listener *dialer);
void nng_stream_dialer_free(nng_stream_listener *dialer);
void nng_stream_listener_close(nng_stream_listener *listener);
void nng_stream_listener_stop(nng_stream_listener *listener);
void nng_stream_listener_free(nng_stream_listener *listener);

The nng_stream_dialer_close and nng_stream_listener_close functions close the stream dialer or listener, preventing it from creating new connections. This will generally include closing any underlying file used for creating such connections. However, some requests may still be pending when this function returns, as it does not wait for the shutdown to complete.

The nng_stream_dialer_stop and nng_stream_listener_stop functions performs the same action, but also wait until all outstanding requests are serviced, and the dialer or listener is completely stopped. Because they blocks, these functions must not be called in contexts where blocking is not allowed.

The nng_stream_dialer_free and nng_stream_listener_free function performs the same action as nng_stream_dialer_stop or nng_stream_listener_stop, but also deallocates the dialer or listener, and any associated resources.

tip

A best practice for shutting down an application safely is to stop everything before deallocating. This ensures that no callbacks are running that could reference an object after it is deallocated.

Making Outgoing Connections

void nng_stream_dialer_dial(nng_stream_dialer *dialer, nng_aio *aio);

The nng_stream_dialer_dial initiates an outgoing connection asynchronously, using the nng_aio aio. If it successfully establishes a connection, it creates an nng_stream, which can be obtained as the first output result on aio using the nng_aio_get_output function with index zero.

tip

An nng_stream_dialer can be used multiple times to make multiple concurrent connection requests, but they all must reference the same URL.

Example 3: Connecting to Google

This demonstrates making an outbound connection to “google.com” on TCP port 80. Error handling is elided for clarity.

nng_aio *aio;
nng_stream_dialer *dialer;
nng_stream *stream;

nng_stream_dialer_alloc(&dialer, "tcp://google.com:80");

nng_aio_alloc(&aio, NULL, NULL);

// make a single outbound connection
nng_stream_dialer_dial(dialer, aio);
nng_aio_wait(aio); // wait for the asynch operation to complete
if (nng_aio_result(aio) != 0) {
    // ... handle the error
}
stream = nng_aio_get_output(aio, 0);

Accepting Incoming Connections

int nng_stream_listener_listen(nng_stream_listener *listener);
void nng_stream_listener_accept(nng_stream_listener *listener, nng_aio *aio);

Accepting incoming connections is performed in two steps. The first step, nng_stream_listener_listen is to setup for listening. For a TCP implementation of this, for example, this would perform the bind and the listen steps. This will bind to the address represented by the URL that was specific when the listener was created with nng_stream_listener_alloc.

In the second step, nng_stream_listener_accept accepts an incoming connection on listener asynchronously, using the nng_aio aio. If an incoming connection is accepted, it will be represented as an nng_stream, which can be obtained from the aio as the first output result using the nng_aio_get_output function with index zero.

Example 3: Accepting an Inbound Stream

For clarity this example uses a synchronous approach using nng_aio_wait, but a typical server application would most likely use a callback to accept the incoming stream, and start another instance of nng_stream_listener_accept.

nng_aio *aio;
nng_listener *listener;
nng_stream *stream;

nng_stream_listener_alloc(&listener, "tcp://:8181");
nng_aio_alloc(&aio, NULL, NULL); // normally would use a callback

// listen (binding to the URL in the process)
if (nng_stream_listener_listen(listener)) {
    // ... handle the error
}

// now accept a single incoming connection as a stream object
nng_stream_listener_accept(l, aio);
nng_aio_wait(aio); // wait for the asynch operation to complete
if (nng_aio_result(aio) != 0) {
    // ... handle the error
}
stream = nng_aio_get_output(aio, 0);

Stream Factory Options

int nng_stream_dialer_get_addr(nng_stream_dialer *dialer, const char *opt, nng_sockaddr *valp);
int nng_stream_dialer_get_bool(nng_stream_dialer *dialer, const char *opt, bool *valp);
int nng_stream_dialer_get_int(nng_stream_dialer *dialer, const char *opt, int *valp);
int nng_stream_dialer_get_ms(nng_stream_dialer *dialer, const char *opt, nng_duration *valp);
int nng_stream_dialer_get_size(nng_stream_dialer *dialer, const char *opt, size_t *valp);
int nng_stream_dialer_get_string(nng_stream_dialer *dialer, const char *opt, char **valp);

int nng_stream_listener_get_addr(nng_stream_listener *listener, const char *opt, nng_sockaddr *valp);
int nng_stream_listener_get_bool(nng_stream_listener *listener, const char *opt, bool *valp);
int nng_stream_listener_get_int(nng_stream_listener *listener, const char *opt, int *valp);
int nng_stream_listener_get_ms(nng_stream_listener *listener, const char *opt, nng_duration *valp);
int nng_stream_listener_get_size(nng_stream_listener *listener, const char *opt, size_t *valp);
int nng_stream_listener_get_string(nng_stream_listener *listener, const char *opt, char **valp);

int nng_stream_dialer_set_addr(nng_stream_dialer *dialer, const char *opt, const nng_sockaddr *val);
int nng_stream_dialer_set_bool(nng_stream_dialer *dialer, const char *opt, bool val);
int nng_stream_dialer_set_int(nng_stream_dialer *dialer, const char *opt, int val);
int nng_stream_dialer_set_ms(nng_stream_dialer *dialer, const char *opt, nng_duration val);
int nng_stream_dialer_set_size(nng_stream_dialer *dialer, const char *opt, size_t val);
int nng_stream_dialer_set_string(nng_stream_dialer *dialer, const char *opt, const char *val);

int nng_stream_listener_set_addr(nng_stream_listener *listener, const char *opt, const nng_sockaddr *val);
int nng_stream_listener_set_bool(nng_stream_listener *listener, const char *opt, bool val);
int nng_stream_listener_set_int(nng_stream_listener *listener, const char *opt, int val);
int nng_stream_listener_set_ms(nng_stream_listener *listener, const char *opt, nng_duration val);
int nng_stream_listener_set_size(nng_stream_listener *listener, const char *opt, size_t val);
int nng_stream_listener_set_string(nng_stream_listener *listener, const char *opt, const char *val);

These functions are used to retrieve or change the value of an option named opt from the stream dialer or listener. The nng_stream_dialer_get_ and nng_stream_listener_get_ function families retrieve the value, and store it in the location valp references. The nng_stream_dialer_set_ and nng_stream_listener_set_ function families change the value for the dialer or listener, taking it from val.

These functions access an option as a specific type. The transport layer will have details about which options are available, and which type they may be accessed using.

In the case of nng_stream_dialer_get_string and nng_stream_listener_get_string, the string is created as if by nng_strdup, and must be freed by the caller using nng_strfree when no longer needed.

In the case of nng_stream_dialer_set_string and nng_stream_listener_set_string, the string contents are copied if necessary, so that the caller need not retain the value referenced once the function returns.

In the case of nng_stream_dialer_set_addr and nng_stream_listener_set_addr, the contents of addr are copied if necessary, so that the caller need not retain the value referenced once the function returns.

Example 4: Socket Activation

Some nng_stream_listener objects, depending on the underlying transport and platform, can support a technique known as “socket activation”, where the file descriptor used for listening and accepting is supplied externally, such as by a system service manager. In this case, the application supplies the file descriptor or SOCKET object using the NNG_OPT_LISTEN_FD option, instead of calling nng_stream_listener_listen.

tip

Scalability Protocols transports based upon stream implementations that support socket activation can also benefit from this approach.

nng_stream_listener *listener;
int fd;

// This is a systemd API, not part of NNG.
// See systemd documentation for an explanation.
// fd at this point has already had bind() and listen() called.
fd = SD_LISTEN_FDS_START + 0;

nng_stream_listener_alloc(&listener, "tcp://");
nng_stream_listener_set_int(listener, NNG_OPT_LISTEN_FD, fd);

// can now start doing nng_stream_listener_accept...

TLS Configuration

int nng_stream_dialer_get_tls(nng_stream_listener *dialer, nng_tls_config **tlsp);
int nng_stream_dialer_set_tls(nng_stream_listener *dialer, nng_tls_config *tls);
int nng_stream_listener_get_tls(nng_stream_listener *listener, nng_tls_config **tlsp);
int nng_stream_listener_set_tls(nng_stream_listener *listener, nng_tls_config *tls);

Both nng_stream_dialer and nng_stream_listener objects may support configuration of TLS parameters. The nng_stream_dialer_set_tls and nng_stream_listener_set_tls functions support setting the configuration of a nng_tls_config object supplied by tls on dialer or listener. This must be performed before the listener starts listening with nng_stream_listener_listen, or the dialer starts an outgoing connection as a result of nng_stream_dialer_dial.

The configuration object that was previously established (which may be a default if one was not explicitly configured) can be obtained with the nng_stream_dialer_get_tls and nng_stream_listener_get_tls. They will return a pointer to the nng_tls_config object in question at the location referenced by tlsp.

note

TLS configuration cannot be changed once it has started being used by a listener or dialer. This applies to both configuring a different TLS configuration object, as well as mutating the existing nng_tls_config object.