Skip to content

Pulsar integration

Apache Pulsar is an open-source, distributed messaging and streaming platform that can serve also the purpose as a queue.

Multiple processes of the same service

You've written your service that utilizes Hermes🌿 and now you want to deploy it on production. The problem you will encounter is the fact that you should have only one Hermes instance working for single partition. If you run two Hermeses for a single partition, then you will bump into a race condition where the two Hermeses will be trying to send the same event.

It's the issue of accessing the same resource by multiple processes or resource synchronization. One of the solutions is a mutex (binary variation of a semaphore).

Mutex based on an Exclusive topic

We can use Apache Pulsar and an Exclusive subscription to simulate a mutex behaviour.

ts
import { setTimeout } from 'node:timers/promises'
import { Client, Consumer } from 'pulsar-client'

const DEFAULT_MUTEX_NAME = 'mutex' as const
const CHECK_MUTEX_EVERY_N_MINUTES = 30 * 1000

export class PulsarMutex {
  constructor(
    private _client: Client,
    private _mutexTopic: string = `public/default/mutex`,
    private _waitAfterFailedSubscription = CHECK_MUTEX_EVERY_N_MINUTES,
  ) {}

  private _mutex: Consumer | null = null

  async lock(): Promise<void> {
    while (true) {
      try {
        this._mutex = await this._client.subscribe({
          topic: this._mutexTopic,
          subscription: DEFAULT_MUTEX_NAME,
          subscriptionType: 'Exclusive',
        })

        return
      } catch {
        await setTimeout(this._waitAfterFailedSubscription)
      }
    }
  }

  async unlock(): Promise<void> {
    if (this._mutex && this._mutex.isConnected()) {
      await this._mutex.unsubscribe()
    }
  }
}

Defining some events

ts
export type DomainEvent<Name extends string, Data> = Readonly<{
  name: Name
  data: Data
}>
export type MedicineAssigned = DomainEvent<
  'MedicineAssigned',
  {
    medicineId: string
    patientId: string
  }
>
export type MedicineFinished = DomainEvent<
  'MedicineFinished',
  {
    medicineId: string
    patientId: string
  }
>
export type MedicineEvent = MedicineAssigned | MedicineFinished

One partition example

First, define a function that will continously send events based on Hermes instance:

ts
import { OutboxConsumer } from '@arturwojnar/hermes-mongodb'
import { ObjectId } from 'mongodb'
import { setTimeout } from 'node:timers/promises'
import { MedicineEvent } from '../common/events'

export const doPublishing = async (outbox: OutboxConsumer<MedicineEvent>) => {
  while (true) {
    const medicineId = new ObjectId().toString()
    const patientId = new ObjectId().toString()

    try {
      await outbox.publish({
        name: 'MedicineAssigned',
        data: {
          medicineId,
          patientId,
        },
      })
    } catch {
      await setTimeout(1000)
      continue
    }

    console.info(`Event published for medicine ${medicineId} nad patient ${patientId}.`)

    await setTimeout(1000)
  }
}

Secondly, we want to know whether the events are actually sent to the Pulsar:

ts
import { setTimeout } from 'node:timers/promises'
import { Consumer } from 'pulsar-client'

export const doReceiving = async (subscription: Consumer) => {
  while (true) {
    try {
      const message = await subscription.receive()
      const event = JSON.parse(message.getData().toString('utf-8'))

      console.info(`Consumed event for medicine ${event.data.medicineId} and patient ${event.data.patientId}`)
    } catch {
      await setTimeout(1000)
    }
  }
}

And the final wrap-up:

ts
import { addDisposeOnSigterm } from '@arturwojnar/hermes'
import { createOutboxConsumer } from '@arturwojnar/hermes-mongodb'
import { swallow } from '../../packages/hermes/src/utils'
import { MedicineEvent } from '../common/events'
import { MongoClient } from '../node_modules/mongodb/mongodb'
import { Client } from '../node_modules/pulsar-client/index'
import { doPublishing } from './do-publishing'
import { doReceiving } from './do-receiving'
import { PulsarMutex } from './pulsar-mutex'

const MONGODB_URI = `mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true`
const PULSAR_URI = `pulsar://localhost:6650`

const start = async () => {
  const pulsarClient = new Client({ serviceUrl: PULSAR_URI })
  const mutex = new PulsarMutex(pulsarClient)
  const producer = await pulsarClient.createProducer({ topic: `public/default/events` })
  const subscription = await pulsarClient.subscribe({ topic: `public/default/events`, subscription: 'test' })

  addDisposeOnSigterm(async () => {
    await swallow(() => mutex.unlock())
    await swallow(() => subscription.close())
    await swallow(() => producer.close())
    await swallow(() => pulsarClient.close())
  })

  await mutex.lock()

  try {
    const client = new MongoClient(MONGODB_URI)
    const db = client.db('aid-kit')

    await client.connect()

    const outbox = createOutboxConsumer<MedicineEvent>({
      client,
      db,
      publish: async (event) => {
        // Normally, you should choose a corresponding topic for the given event.
        await producer.send({
          data: Buffer.from(JSON.stringify(event)),
        })
      },
    })

    // Hermes automatically registers the dispose on SIGTERM
    await outbox.start()

    doPublishing(outbox).catch(console.error)
    doReceiving(subscription).catch(console.error)
  } catch (error) {
    console.error(error)
    throw error
  }
}

start()