AMQP 0-9-1 TypeScript client both for Node.js and browsers (using WebSocket). This library is intended to replace all other Node.js AMQP libraries.
This library is Promise-based and hence works very well with async/await. It's secure by default, for instance, publishes aren't fulfilled until either the data has been sent on the wire (so that back propagation is respected), or if the channel has Publish Confirms enabled, it isn't fulfilled until the server has acknowledged that the message has been enqueued.
The library was developed so to make it easy for developers who already are familiar with AMQP to write browser apps that communicates directly with an AMQP server over WebSocket.
The library is developed and supported by CloudAMQP, the largest hosted RabbitMQ provider in the world.
npm install @cloudamqp/amqp-client --saveStart node with --enable-source-maps to get proper stacktraces as the library is transpiled from TypeScript.
This library provides two APIs:
- High-level (
AMQPSession): automatic reconnection, consumer recovery — usequeue()/exchange()handles for reconnect-safe operations - Low-level (
AMQPClient/AMQPWebSocketClient): direct channel access withqueueDeclare,basicPublish,basicConsume, etc.
Use AMQPSession.connect(url, options) to get a session with automatic reconnection and consumer recovery. The transport is chosen from the URL scheme (amqp:// / amqps:// → TCP; ws:// / wss:// → WebSocket):
import { AMQPSession } from "@cloudamqp/amqp-client"
const session = await AMQPSession.connect("amqp://localhost")
// Declare a queue and publish a message (waits for broker confirmation)
const q = await session.queue("my-queue")
await q.publish("Hello World", { deliveryMode: 2 })
// Subscribe with a callback — consumer recovers automatically on reconnect.
// Messages are acked after the callback returns. If it throws, the message is
// nacked and requeued. Call msg.ack() / msg.nack() yourself to override.
const sub = await q.subscribe(async (msg) => {
console.log(msg.bodyString())
})
// Or subscribe with an async iterator — messages are acked when the loop advances.
// Call msg.ack() / msg.nack() before the next iteration to override.
const iterSub = await q.subscribe()
for await (const msg of iterSub) {
console.log(msg.bodyString())
}
// Exchanges work the same way
const x = await session.topicExchange("events")
await x.publish("user signed up", { routingKey: "events.user.created" })
// When done
await session.stop()const session = await AMQPSession.connect("amqp://localhost", {
reconnectInterval: 1000, // initial delay before reconnecting (ms)
maxReconnectInterval: 30000, // maximum delay between attempts (ms)
backoffMultiplier: 2, // exponential backoff multiplier
maxRetries: 0, // 0 = infinite retries
})
session.onconnect = () => console.log("Reconnected!")
session.onfailed = (err) => console.error("Gave up:", err?.message)Subscriptions created via queue.subscribe() are automatically re-established after reconnection. Include prefetch in the subscribe params to set QoS on each connection:
const q = await session.queue("my-queue", { durable: true })
const sub = await q.subscribe({ prefetch: 10 }, async (msg) => {
// process msg — acked on return, nacked and requeued on throw
})
// sub.consumerTag and sub.channel reflect the current consumer (updated on reconnect)
// await sub.cancel() // stops consuming and removes from auto-recoveryFor full control over channels and resources, use the transport clients directly:
import { AMQPClient } from "@cloudamqp/amqp-client"
const amqp = new AMQPClient("amqp://localhost")
const conn = await amqp.connect()
const ch = await conn.channel()
// Declare a queue
const q = await ch.queueDeclare("my-queue")
// Publish
await ch.basicPublish("", q.name, "Hello World", { deliveryMode: 2 })
// Consume with a callback
const consumer = await ch.basicConsume(q.name, { noAck: false }, async (msg) => {
console.log(msg.bodyToString())
await msg.ack()
await consumer.cancel()
})
await consumer.wait()
// Or consume with an async iterator
const consumer = await ch.basicConsume(q.name, { noAck: false })
for await (const msg of consumer.messages) {
console.log(msg.bodyToString())
await msg.ack()
break // breaking automatically cancels the consumer
}
await conn.close()This library can be used in the browser to access an AMQP server over WebSockets. For servers such as RabbitMQ that doesn't support WebSockets natively a WebSocket TCP relay have to be used as a proxy. All CloudAMQP servers has this proxy configured. More information can be found in this blog post.
For web browsers a compiled and rolled up version is available at https://github.com/cloudamqp/amqp-client.js/releases.
AMQPSession works with WebSocket URLs out of the box — pass a ws:// or wss:// URL and transport is chosen automatically:
import { AMQPSession } from "@cloudamqp/amqp-client"
const session = await AMQPSession.connect("wss://my.cloudamqp.com/ws/", {
vhost: "my-vhost",
})
const q = await session.queue("my-queue")
const sub = await q.subscribe({ noAck: true }, (msg) => {
console.log(msg.bodyString())
})For lower-level control without reconnection, use AMQPWebSocketClient directly:
<!DOCTYPE html>
<html>
<head>
<script type="module">
import { AMQPWebSocketClient } from "./js/amqp-websocket-client.mjs"
const textarea = document.getElementById("textarea")
const input = document.getElementById("message")
const tls = window.location.protocol === "https:"
const url = `${tls ? "wss" : "ws"}://${window.location.host}`
const amqp = new AMQPWebSocketClient(url, "/", "guest", "guest")
async function start() {
try {
const conn = await amqp.connect()
const ch = await conn.channel()
attachPublish(ch)
const q = await ch.queueDeclare("")
await ch.queueBind(q.name, "amq.fanout", "")
const consumer = await ch.basicConsume(q.name, { noAck: false }, (msg) => {
console.log(msg)
textarea.value += msg.bodyToString() + "\n"
msg.ack()
})
} catch (err) {
console.error("Error", err, "reconnecting in 1s")
disablePublish()
setTimeout(start, 1000)
}
}
function attachPublish(ch) {
document.forms[0].onsubmit = async (e) => {
e.preventDefault()
try {
await ch.basicPublish("amq.fanout", "", input.value, {
contentType: "text/plain",
})
} catch (err) {
console.error("Error", err, "reconnecting in 1s")
disablePublish()
setTimeout(start, 1000)
}
input.value = ""
}
}
function disablePublish() {
document.forms[0].onsubmit = (e) => {
alert("Disconnected, waiting to be reconnected")
}
}
start()
</script>
</head>
<body>
<form>
<textarea id="textarea" rows="10"></textarea>
<br />
<input id="message" />
<button type="submit">Send</button>
</form>
</body>
</html>Messages with a 1-byte body, no properties:
| Client | Publish rate | Consume rate |
|---|---|---|
| amqp-client.js | 300.000 msgs/s | 512.000 msgs/s |
| amqplib | 172.000 msgs/s | 519.000 msgs/s |
Messages with a 1-byte body, and all properties, except headers:
| Client | Publish rate | Consume rate |
|---|---|---|
| amqp-client.js | 144.000 msgs/s | 202.000 msgs/s |
| amqplib | 110.000 msgs/s | 251.000 msgs/s |
Messages with a 1-byte body, and all properties, including headers:
| Client | Publish rate | Consume rate |
|---|---|---|
| amqp-client.js | 70.000 msgs/s | 89.000 msgs/s |
| amqplib | 60.000 msgs/s | 99.000 msgs/s |
The reason amqp-client is somewhat slower to consume is that to maintain browser compatibility for the websocket client, DataView are used for parsing the binary protocol instead of Buffer.
Module comparison
| Client | Runtime dependencies | Lines of code |
|---|---|---|
| amqp-client.js | 0 | 1743 |
| amqplib | 14 | 6720 (w/o dependencies) |
This project uses automated release scripts for version management.
The package.json includes several npm scripts for releasing:
npm run release- Performs a patch version bump (e.g., 3.2.1 → 3.2.2) and creates a releasenpm run release:minor- Performs a minor version bump (e.g., 3.2.1 → 3.3.0) and creates a releasenpm run release:major- Performs a major version bump (e.g., 3.2.1 → 4.0.0) and creates a release
- Tests: All tests are run to ensure everything passes (
preversion) - Version bump: npm automatically updates the version in
package.json - File updates: The version is updated in
src/amqp-base-client.ts, code is formatted, and changelog is updated (version) - Staging: All changes are staged for commit (
version) - Git commit: npm automatically commits all staged changes with a version message
- Git tag: An annotated tag is created with the full changelog content as the tag message (
postversion) - Push: Both the commit and tags are pushed to the remote repository (
postversion) - CI deployment: The GitHub Actions workflow automatically publishes the new version to npm
Technical Note: This release process leverages npm's built-in version lifecycle hooks (
preversion,version,postversion). Thenpm versioncommand automatically handles the git commit after running our customversionscript, which is why we stage changes withgit add -Arather than committing manually.
Before releasing:
- Add your changes to the
## [Unreleased]section inCHANGELOG.md - All tests should pass (
npm test) - The working directory should be clean (no uncommitted changes)
When a new tag is pushed (e.g., v3.3.0), the GitHub Actions workflow (.github/workflows/release.yml) automatically:
- Builds the project
- Publishes the package to npm with public access and provenance
- Creates a GitHub release with browser bundle artifacts
The git tag contains the complete changelog section for that version, including version header, all changes, and PR links. This makes it easy to see what changed in each release directly from the git tag.