Pulsar integration
Apache Pulsar is an open-source, distributed messaging and streaming platform that can serve also the purpose as a queue.
Multiple processes of the same service
You've written your service that utilizes Hermes
🌿 and now you want to deploy it on production
. The problem you will encounter is the fact that you should have only one Hermes
instance working for single partition. If you run two Hermeses for a single partition, then you will bump into a race condition where the two Hermeses will be trying to send the same event.
It's the issue of accessing the same resource by multiple processes or resource synchronization. One of the solutions is a mutex (binary variation of a semaphore).
Mutex based on an Exclusive topic
We can use Apache Pulsar and an Exclusive subscription to simulate a mutex behaviour.
import { setTimeout } from 'node:timers/promises'
import { Client, Consumer } from 'pulsar-client'
const DEFAULT_MUTEX_NAME = 'mutex' as const
const CHECK_MUTEX_EVERY_N_MINUTES = 30 * 1000
export class PulsarMutex {
constructor(
private _client: Client,
private _mutexTopic: string = `public/default/mutex`,
private _waitAfterFailedSubscription = CHECK_MUTEX_EVERY_N_MINUTES,
) {}
private _mutex: Consumer | null = null
async lock(): Promise<void> {
while (true) {
try {
this._mutex = await this._client.subscribe({
topic: this._mutexTopic,
subscription: DEFAULT_MUTEX_NAME,
subscriptionType: 'Exclusive',
})
return
} catch {
await setTimeout(this._waitAfterFailedSubscription)
}
}
}
async unlock(): Promise<void> {
if (this._mutex && this._mutex.isConnected()) {
await this._mutex.unsubscribe()
}
}
}
Defining some events
export type DomainEvent<Name extends string, Data> = Readonly<{
name: Name
data: Data
}>
export type MedicineAssigned = DomainEvent<
'MedicineAssigned',
{
medicineId: string
patientId: string
}
>
export type MedicineFinished = DomainEvent<
'MedicineFinished',
{
medicineId: string
patientId: string
}
>
export type MedicineEvent = MedicineAssigned | MedicineFinished
One partition example
First, define a function that will continously send events based on Hermes
instance:
import { OutboxConsumer } from '@arturwojnar/hermes-mongodb'
import { ObjectId } from 'mongodb'
import { setTimeout } from 'node:timers/promises'
import { MedicineEvent } from '../common/events'
export const doPublishing = async (outbox: OutboxConsumer<MedicineEvent>) => {
while (true) {
const medicineId = new ObjectId().toString()
const patientId = new ObjectId().toString()
try {
await outbox.publish({
name: 'MedicineAssigned',
data: {
medicineId,
patientId,
},
})
} catch {
await setTimeout(1000)
continue
}
console.info(`Event published for medicine ${medicineId} nad patient ${patientId}.`)
await setTimeout(1000)
}
}
Secondly, we want to know whether the events are actually sent to the Pulsar:
import { setTimeout } from 'node:timers/promises'
import { Consumer } from 'pulsar-client'
export const doReceiving = async (subscription: Consumer) => {
while (true) {
try {
const message = await subscription.receive()
const event = JSON.parse(message.getData().toString('utf-8'))
console.info(`Consumed event for medicine ${event.data.medicineId} and patient ${event.data.patientId}`)
} catch {
await setTimeout(1000)
}
}
}
And the final wrap-up:
import { addDisposeOnSigterm } from '@arturwojnar/hermes'
import { createOutboxConsumer } from '@arturwojnar/hermes-mongodb'
import { swallow } from '../../packages/hermes/src/utils'
import { MedicineEvent } from '../common/events'
import { MongoClient } from '../node_modules/mongodb/mongodb'
import { Client } from '../node_modules/pulsar-client/index'
import { doPublishing } from './do-publishing'
import { doReceiving } from './do-receiving'
import { PulsarMutex } from './pulsar-mutex'
const MONGODB_URI = `mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true`
const PULSAR_URI = `pulsar://localhost:6650`
const start = async () => {
const pulsarClient = new Client({ serviceUrl: PULSAR_URI })
const mutex = new PulsarMutex(pulsarClient)
const producer = await pulsarClient.createProducer({ topic: `public/default/events` })
const subscription = await pulsarClient.subscribe({ topic: `public/default/events`, subscription: 'test' })
addDisposeOnSigterm(async () => {
await swallow(() => mutex.unlock())
await swallow(() => subscription.close())
await swallow(() => producer.close())
await swallow(() => pulsarClient.close())
})
await mutex.lock()
try {
const client = new MongoClient(MONGODB_URI)
const db = client.db('aid-kit')
await client.connect()
const outbox = createOutboxConsumer<MedicineEvent>({
client,
db,
publish: async (event) => {
// Normally, you should choose a corresponding topic for the given event.
await producer.send({
data: Buffer.from(JSON.stringify(event)),
})
},
})
// Hermes automatically registers the dispose on SIGTERM
await outbox.start()
doPublishing(outbox).catch(console.error)
doReceiving(subscription).catch(console.error)
} catch (error) {
console.error(error)
throw error
}
}
start()