Brace yourself, this is a long one. The following guide was originally intended for client library developers to describe in detail all the important features and functionality we expected in an NSQ client library. While writing it we began to realize that it had value beyond client library developers. It incorporates a comprehensive analysis of […]
Brace yourself, this is a long one.
The following guide was originally intended for client library developers to describe in detail all
the important features and functionality we expected in an NSQ client library.
While writing it we began to realize that it had value beyond client library developers. It
incorporates a comprehensive analysis of most of the capabilities of NSQ (both client and
server) and is therefore interesting and useful for end-users as well (or anyone using or interested
in infrastructure messaging platforms).
NSQ’s design pushes a lot of responsibility onto client libraries in order to maintain overall
cluster robustness and performance.
This guide attempts to outline the various responsibilities well-behaved client libraries need to
fulfill. Because publishing to
nsqd is trivial (just an HTTP POST to the
/put endpoint), this
document focuses on consumers.
By setting these expectations we hope to provide a foundation for achieving consistency across
languages for NSQ users.
At a high level, our philosophy with respect to configuration is to design the system to have the
flexibility to support different workloads, use sane defaults that run well “out of the box”, and
minimize the number of dials.
A client subscribes to a
topic on a
channel over a TCP connection to
nsqd instance(s). You can
only subscribe to one topic per connection so multiple topic consumption needs to be structured
nsqlookupd for discovery is optional so client libraries should support a configuration
where a client connects directly to one or more
nsqd instances or where it is configured to poll
one or more
nsqlookupd instances. When a client is configured to poll
nsqlookupd the polling
interval should be configurable. Additionally, because typical deployments of NSQ are in distributed
environments with many producers and consumers, the client library should automatically add jitter
based on a random % of the configured value. This will help avoid a thundering herd of connections.
For more detail see Discovery.
An important performance knob for clients is the number of messages it can receive before
expects a response. This pipelining facilitates buffered, batched, and asynchronous message
handling. By convention this value is called
max_in_flight and it effects how
RDY state is
managed. For more detail see RDY State.
Being a system that is designed to gracefully handle failure, client libraries are expected to
implement retry handling for failed messages and provide options for bounding that behavior in terms
of number of attempts per message. For more detail see Message Handling.
Relatedly, when message processing fails, the client library is expected to automatically handle
re-queueing the message. NSQ supports sending a delay along with the
REQ command. Client libraries
are expected to provide options for what this delay should be set to initially (for the first
failure) and how it should change for subsequent failures. For more detail see Backoff.
Most importantly, the client library should support some method of configuring callback handlers for
message processing. The signature of these callbacks should be simple, typically accepting a single
parameter (an instance of a “message object”).
An important component of NSQ is
nsqlookupd, which provides a discovery service for consumers to
locate producers of a given topic at runtime.
Although optional, using
nsqlookupd greatly reduces the amount of configuration required to
maintain and scale a large distributed NSQ cluster.
When a client uses
nsqlookupd for discovery, the client library should manage the process of
nsqlookupd instances for an up-to-date set of
nsqd producers and should manage the
connections to those producers.
nsqlookupd instance is straightforward. Perform an HTTP request to the lookup endpoint
with a query parameter of the topic the client is attempting to discover (i.e.
/lookup?topic=clicks). The response format is JSON:
tcp_port should be used to connect to an
nsqd producer. Because, by
nsqlookupd instances don’t coordinate their lists of producers, the client library should
union the lists it received from all
nsqlookupd queries to build the final list of
producers to connect to. The
broadcast_address:tcp_port combination should be used as the unique
key for this union.
A periodic timer should be used to repeatedly poll the configured
nsqlookupd so that clients will
automatically discover new producers. The client library should automatically initiate connections
to all newly found
When client library execution begins it should bootstrap this polling process by kicking off an
initial set of requests to the configured
Once a client has an
nsqd producer to connect to (via discovery or manual configuration), it
should open a TCP connection to
broadcast_address:port. A separate TCP connection should be made
nsqd for each topic the client wants to consume.
When connecting to an
nsqd instance, the client library should send the following data, in order:
IDENTIFYcommand (and payload) and read/verify response
SUBcommand (specifying desired topic) and read/verify response
RDYcount of 1 (see RDY State).
(low-level details on the protocol are available in the spec)
Client libraries should automatically handle reconnection as follows:
If the client is configured with a specific list of
nsqd instances, reconnection should be
handled by delaying the retry attempt in an exponential backoff manner (i.e. try to reconnect in
8s, 16s, 32s, etc., up to a max).
If the client is configured to discover instances via
nsqlookupd, reconnection should be
handled automatically based on the polling interval (i.e. if a client disconnects from an
the client library should only attempt to reconnect if that instance is discovered by a
nsqlookupd polling round). This ensures that clients can learn about producers that
are introduced to the topology and ones that are removed (or failed).
IDENTIFY command can be used to set
nsqd side metadata, modify client settings,
and negotiate features. It satisfies two needs:
nsqdinteracts with it (currently this
nsqdresponds to the
IDENTIFYcommand with a JSON payload that includes important server
After connecting, based on the user’s configuration, a client library should send an
command (the body of an
IDENTIFY command is a JSON payload), e.g:
feature_negotiation field indicates that the client can accept a JSON payload in return. The
long_id are arbitrary text fields that are used by
heartbeat_interval configures the interval between heartbeats on a per-client
nsqd will respond
OK if it does not support feature negotiation (introduced in
More detail on the use of the
max_rdy_count field is in the RDY State section.
Once a client is in a subscribed state, data flow in the NSQ protocol is asynchronous. For
consumers, this means that in order to build truly robust and performant client libraries they
should be structured using asynchronous network IO loops and/or “threads” (the scare quotes are used
to represent both OS-level threads and userland threads, like coroutines).
Additionally clients are expected to respond to periodic heartbeats from the
they’re connected to. By default this happens at 30 second intervals. The client can respond with
any command but, by convention, it’s easiest to simply respond with a
NOP whenever a heartbeat
is received. See the protocol spec for specifics on how to identify heartbeats.
A “thread” should be dedicated to reading data off the TCP socket, unpacking the data from the
frame, and performing the multiplexing logic to route the data as appropriate. This is also
conveniently the best spot to handle heartbeats. At the lowest level, reading the protocol
involves the following sequential steps:
Due to their asynchronous nature, it would take a bit of extra state tracking in order to
correlate protocol errors with the commands that generated them. Instead, we took the “fail fast”
approach so the overwhelming majority of protocol-level error handling is fatal. This means that if
the client sends an invalid command (or gets itself into an invalid state) the
nsqd instance it’s
connected to will protect itself (and the system) by forcibly closing the connection (and, if
possible, sending an error to the client). This, coupled with the connection handling mentioned
above, makes for a more robust and stable system.
The only errors that are not fatal are:
E_FIN_FAILED– The client tried to send a
FINcommand for an invalid message ID.
E_REQ_FAILED– The client tried to send a
REQcommand for an invalid message ID.
E_TOUCH_FAILED– The client tried to send a
TOUCHcommand for an invalid message ID.
Because these errors are most often timing issues, they are not considered fatal. These situations
typically occur when a message times out on the
nsqd side and is re-queued and delivered to
another client. The original recipient is no longer allowed to respond on behalf of that message.
When the IO loop unpacks a data frame containing a message, it should route that message to the
configured handler for processing.
nsqd producer expects to receive a reply within its configured message timeout (default: 60
seconds). There are a few possible scenarios:
nsqdautomatically re-queues the message.
In the first 3 cases, the client library should send the appropriate command on the client’s behalf
FIN command is the simplest of the bunch. It tells
nsqd that it can safely discard the
FIN can also be used to discard a message that you do not want to process or retry.
REQ command tells
nsqd that the message should be re-queued (with an optional parameter
specifying the amount of time to defer additional attempts). If the optional parameter is not
specified by the client, the client library should automatically calculate the duration in relation
to the number of attempts to process the message (a multiple is typically sufficient). The client
library should discard messages that exceed the configured max attempts. When this occurs, a
user-supplied callback should be executed to notify and enable special handling.
If the message handler requires more time than the configured message timeout, the
can be used to reset the timer on the
nsqd side. This can be done repeatedly until the message is
REQ, up to the
nsqd producer’s configured max timeout. Client libraries should
TOUCH on behalf of the client.
nsqd instance receives no response, the message will time out and be automatically
re-queued for delivery to an available client.
Finally, a property of each message is the number of attempts. Client libraries should compare this
value against the configured max and discard messages that have exceeded it. When a message is
discarded there should be a callback fired. Typical default implementations of this callback might
include writing to a directory on disk, logging, etc. The user should be able to override the
Because messages are pushed from
nsqd to clients we needed a way to manage the flow of data in
userland rather than relying on low-level TCP semantics. A client’s
RDY state is NSQ’s flow
As outlined in the configuration section, a consumer is configured with a
max_in_flight. This is a concurrency and performance knob, e.g. some downstream systems are able
to more-easily batch process messages and benefit greatly from a higher
When a client connects to
nsqd (and subscribes) it is placed in an initial
RDY state of
messages will be sent to the client.
Client libraries have a few responsibilities:
max_in_flightto all connections.
RDYcounts for all connections (
There are a few considerations when choosing an appropriate
RDY count for a connection (in order
to evenly distribute
max_in_flightmay be lower than your number of connections
To kickstart message flow a client library needs to send an initial
RDY count. Because the
eventual number of connections is often not known ahead of time it should start with a value of
so that the client library does not unfairly favor the first connection(s).
Additionally, after each message is processed, the client library should evaluate whether or not
it’s time to update
RDY state. An update should be triggered if the current value is
0 or if it
is below ~25% of the last value sent.
The client library should always attempt to evenly distribute
RDY count across all connections.
Typically, this is implemented as
max_in_flight / num_conns.
max_in_flight < num_conns this simple formula isn’t sufficient. In this state,
client libraries should perform a dynamic runtime evaluation of producer “liveness” by measuring the
duration of time since it last received a message for a connection. After a configurable expiration,
it should re-distribute whatever
RDY count is available to a new (random) set of producers. By
doing this, you guarantee that you’ll (eventually) find producers with messages. Clearly this has a
The client library should maintain a ceiling for the maximum number of messages in flight for a
given consumer. Specifically, the aggregate sum of each connection’s
RDY count should never exceed
Below is example code in Python to determine whether or not the proposed RDY count is valid for a
nsqd is configurable with a
--max-rdy-count (see feature
negotiation for more information on the handshake a client can perform to
ascertain this value). If the client sends a
RDY count that is outside of the acceptable range its
connection will be forcefully closed. For backwards compatibility, this value should be assumed to
2500 if the
nsqd instance does not support feature negotiation.
Finally, the client library should provide an API method to indicate message flow starvation. It is
insufficient for clients (in their message handlers) to simply compare the number of messages they
have in-flight vs. their configured
max_in_flight in order to decide to “process a batch”. There
are two cases when this is problematic:
max_in_flight > 1, due to variable
num_conns, there are
max_in_flightis not evenly divisible by
num_conns. Because the contract states
max_in_flight, you must round down, and you end up with cases
RDYcounts is less than
RDYcount, those active producers only have a
In both cases, a client will never actually receive
max_in_flight # of messages. Therefore, the
client library should expose a method
is_starved that will evaluate whether any of the
connections are starved, as follows:
is_starved method should be used by message handlers to reliably identify when to process a
batch of messages.
The question of what to do when message processing fails is a complicated one to answer. The
message handling section detailed client library behavior that would defer
the processing of failed messages for some (increasing) duration of time. The other piece of the
puzzle is whether or not to reduce throughput. The interplay between these two pieces of
functionality is crucial for overall system stability.
By slowing down the rate of processing, or “backing off”, the consumer allows the downstream system
to recover from transient failure. However, this behavior should be configurable as it isn’t always
desirable, such as situations where latency is prioritized.
Backoff should be implemented by sending
RDY 0 to the appropriate producers, stopping message
flow. The duration of time to remain in this state should be calculated based on the number of
repeated failures (exponential). Similarly, successful processing should reduce this duration until
the reader is no longer in a backoff state.
While a reader is in a backoff state, after the timeout expires, the client library should only ever
RDY 1 regardless of
max_in_flight. This effectively “tests the waters” before returning to
full throttle. Additionally, during a backoff timeout, the client library should ignore any success
or failure results with respect to calculating backoff duration (i.e. it should only take into
account one result per backoff timeout).
Distributed systems are fun.
The interactions between the various components of an NSQ cluster work in concert to provide a
platform on which to build robust, performant, and stable infrastructure. We hope this guide shed
some light as to how important the client’s role is.
Message– a high-level message object, which exposes stateful methods for responding to the
TOUCH, etc.) as well as metadata such as attempts and timestamp.
Connection– a high-level wrapper around a TCP connection to a specific
RDYstate, negotiated features, and various timings.
Reader– the front-facing API a user interacts with, which handles discovery, creates
RDYstate, parses raw incoming data,
Messageobjects, and dispatches messages to handlers.
We’re happy to help support anyone interested in building client libraries for NSQ. We’re looking
for contributors to continue to expand our language support as well as flesh out functionality in
existing libraries. The community has already open sourced client libraries for 7 languages:
you have any specific questions, feel free to ping me on twitter