Messaging
Nitric provides two common options for scalable, decoupled, asynchronous messaging between services. Topics for publish/subscribe messaging, where new messages are immediately pushed to subscribers, and Queues for pull messaging where new messages are put on a queue and must be requested.
In some circumstances messages sent to a Topic may also be called Events, while messages sent to a Queue may be called Tasks. The structures of these messages are very similar, but the delivery and retry mechanisms are different. It can be helpful to refer to these messages differently to assist in understanding the context in which they are used.
Topics
A topic is a named resource where events can be published. They can be thought of as a subject that your services are communicating about.
Topics are often the first choice for communication between services, since they offer stateless, scalable and highly decoupled communication.
Subscriptions
A subscription is a binding between a topic and a service. You can think of it as a channel that notifies your services when something new arrives on the topic.
Creating a Topic
Nitric allows you to define named topics. When defining topics, you can give the service permissions for publishing. If permissions are not specified, subscribers can be created.
Here's an example of how to create a topic with permissions to publish messages.
import { topic } from '@nitric/sdk'const userCreatedTopic = topic('user-created').allow('publish')
Publishing a message
To send a message to a topic and notify all subscribers, use the publish()
method on the topic reference. The service must have permissions to publish
to the topic.
The below example publishes a message to a topic called user-created
.
import { topic } from '@nitric/sdk'const userCreatedTopic = topic('user-created').allow('publish')await userCreatedTopic.publish({email: 'new.user@example.com',})
Subscribing to a topic
To execute a function when new messages are published you can create subscribers. The delay between publishing a message and a subscriber being executed is usually only a few milliseconds. This makes subscribers perfect for responding to messages as they happen.
The below code shows a subscription that responds to messages when new users are created.
import { topic } from '@nitric/sdk'import { sendWelcomeEmail } from 'common'const userCreatedTopic = topic('user-created')userCreatedTopic.subscribe(async (ctx) => {// Extract data from the event payload for processingconst { email } = ctx.req.json()sendWelcomeEmail(email)})
Reliable subscribers
If a subscriber encounters an error or is terminated before it finishes processing a message, what happens? Is the event lost?
Nitric deploys topics to cloud services that support "at-least-once delivery". Messages are usually delivered exactly once, in the same order that they're published. However, to prevent lost messages, they're sometimes delivered more than once or out of order.
Typically, retries occur when a subscriber doesn't respond successfully, like when unhandled exceptions occur. You'll typically want to ensure messages aren't processed again by accident or partially processed, leaving the system in an unexpected state.
Building atomic publishers and idempotent subscribers can solve this.
Atomic publishers
Your publishers need to update your database and publish associated events. If a database update fails, the events should never be sent. If the database update succeeds, the events should always publish. The two shouldn't occur independently (i.e. one shouldn't fail while the other succeeds).
One solution to this problem is the Transactional Outbox Pattern.
Idempotent subscribers
Messages from a topic can be delivered more than once, but they should typically only be processed once. To do this your subscribers need to identify and disregard duplicate events.
Usually checking for duplicate payloads or IDs is enough. When you receive an event you've seen before don't process it, skip straight to returning a success
response from your subscriber.
import { topic } from '@nitric/sdk'import { isDuplicate } from '../common'const updates = topic('updates')updates.subscribe((ctx) => {if (isDuplicate(ctx.req)) {return ctx}// not a duplicate, process the event// ...})
If you're checking for duplicate IDs, ensure publishers can't resend failed events with new IDs.
You can read more about idempotent subscribers and patterns to handle it here.
Queues
Queues are another option for asynchronous messaging. Unlike topics, messages sent to a queue won't automatically trigger services to process them. Instead, services dequeue message by requesting them.
This makes queues ideal for batch workloads, often paired with schedules.
Queue disambiguation: Some systems and cloud providers use the term "queue" to refer to a messaging broker with features akin to a topic (i.e. they can push messages to subscribers). In Nitric, a queue is a resource that messages are sent to and pulled from, while a topic is a resource that messages are sent to and pushed from.
Creating a Queue
Nitric allows you to define named queues. When defining queues, you can give the service permissions for enqueueing and dequeueing messages.
Here's an example of how to create a queue with permissions for enqueueing and dequeueing.
import { queue } from '@nitric/sdk'const transactionQueue = queue('transactions').allow('enqueue', 'dequeue')
Enqueue Messages
To send a message to a queue, use the enqueue()
method on the queue reference. The function must have permissions to enqueue
to the queue.
The below example sends a message to a queue called transactions
.
import { queue } from '@nitric/sdk'const transactionQueue = queue('transactions').allow('enqueue')await transactionQueue.enqueue({message: 'hello world',})
Messages can also be sent in batches by providing an array.
import { queue } from '@nitric/sdk'const transactionQueue = queue('transactions').allow('enqueue')await transactionQueue.enqueue([{message: 'batch task 1',},{message: 'batch task 2',},])
Dequeueing and Acknowledging Messages
When you dequeue messages they are not immediately deleted from the queue. Instead, they are leased, which means they are temporarily hidden from other services until the lease expires.
To ensure proper handling, your code should mark a dequeued message as complete
after successfully processing it. This action permanently removes the message from the queue.
If a lease expires before a dequeued message is marked as complete, the message will reappear in the queue and can be dequeued again. This mechanism prevents messages from getting lost in case of failures. If your service encounters an error or is terminated before completing processing of a dequeued message, it will automatically reappear in the queue, ready to be processed again.
By following this approach, you can ensure reliable message processing and minimize the chances of losing data in failure scenarios.
The below example dequeues 10 messages using the dequeue()
method, then acknowledges them as complete using the complete()
method. You'll note that the service has permissions to dequeue messages.
import { queue } from '@nitric/sdk'const transactionQueue = queue('transactions').allow('dequeue')const tasks = await transactionQueue.dequeue(10)for (let task of tasks) {// process your task's dataconsole.log(task.message)// acknowledge when the task is completeawait task.complete()}
Choosing between queues and topics
It's common to ask when to use a queue or a topic. From a publisher's point of view, both queues and topics are almost identical. The difference is primarily on the receiver/subscriber side. Topics push new messages to their subscribers, immediately spinning up workers to process them, while queues rely on the receiver to ask for new messages to process.
For these reasons, we usually default to Topics. Queues are more suitable for batch workloads or situations where there are occasional surges of requests that can be processed at a later time.
Cloud Service Mapping
Each cloud provider comes with a set of default services used when deploying resources. You can find the default services for each cloud provider below.