From 09db24bd9d244ac7b88645f947660dc1d35ab9b0 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 23 Jun 2026 11:31:15 +0800 Subject: [PATCH 1/2] Expose message replication metadata --- index.d.ts | 2 ++ src/Message.cc | 25 +++++++++++++++++++++++++ src/Message.h | 2 ++ tests/end_to_end.test.js | 2 ++ tstest.ts | 2 ++ 5 files changed, 33 insertions(+) diff --git a/index.d.ts b/index.d.ts index 61ade63..3cf4a1b 100644 --- a/index.d.ts +++ b/index.d.ts @@ -192,6 +192,8 @@ export class Message { getPartitionKey(): string; getOrderingKey(): string; getProducerName(): string; + isReplicated(): boolean; + getReplicatedFrom(): string; getEncryptionContext(): EncryptionContext | null; } diff --git a/src/Message.cc b/src/Message.cc index 58909bd..97c8a87 100644 --- a/src/Message.cc +++ b/src/Message.cc @@ -63,6 +63,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("getPartitionKey", &Message::GetPartitionKey), InstanceMethod("getOrderingKey", &Message::GetOrderingKey), InstanceMethod("getProducerName", &Message::GetProducerName), + InstanceMethod("isReplicated", &Message::GetIsReplicated), + InstanceMethod("getReplicatedFrom", &Message::GetReplicatedFrom), InstanceMethod("getEncryptionContext", &Message::GetEncryptionContext)}); constructor = Napi::Persistent(func); @@ -172,6 +174,29 @@ Napi::Value Message::GetProducerName(const Napi::CallbackInfo &info) { return Napi::String::New(env, pulsar_message_get_producer_name(this->cMessage.get())); } +Napi::Value Message::GetIsReplicated(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + + const char *replicatedFrom = pulsar_message_get_replicated_from(this->cMessage.get()); + return Napi::Boolean::New(env, replicatedFrom && replicatedFrom[0] != '\0'); +} + +Napi::Value Message::GetReplicatedFrom(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + + const char *replicatedFrom = pulsar_message_get_replicated_from(this->cMessage.get()); + if (!replicatedFrom) { + return Napi::String::New(env, ""); + } + return Napi::String::New(env, replicatedFrom); +} + Napi::Value Message::GetEncryptionContext(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); if (!ValidateCMessage(env)) { diff --git a/src/Message.h b/src/Message.h index 4d8c4aa..102107d 100644 --- a/src/Message.h +++ b/src/Message.h @@ -46,6 +46,8 @@ class Message : public Napi::ObjectWrap { Napi::Value GetPartitionKey(const Napi::CallbackInfo &info); Napi::Value GetOrderingKey(const Napi::CallbackInfo &info); Napi::Value GetProducerName(const Napi::CallbackInfo &info); + Napi::Value GetIsReplicated(const Napi::CallbackInfo &info); + Napi::Value GetReplicatedFrom(const Napi::CallbackInfo &info); Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info); Napi::Value GetEncryptionContext(const Napi::CallbackInfo &info); bool ValidateCMessage(Napi::Env env); diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 53c503d..d60be82 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -73,6 +73,8 @@ const Pulsar = require('../index'); const msg = await consumer.receive(); consumer.acknowledge(msg); expect(msg.getProducerName()).toBe(producerName); + expect(msg.isReplicated()).toBe(false); + expect(msg.getReplicatedFrom()).toBe(''); results.push(msg.getData().toString()); } expect(lodash.difference(messages, results)).toEqual([]); diff --git a/tstest.ts b/tstest.ts index b1d81f0..5b1a85f 100644 --- a/tstest.ts +++ b/tstest.ts @@ -297,6 +297,8 @@ import Pulsar = require('./index'); const publishTime: number = message1.getPublishTimestamp(); const eventTime: number = message1.getEventTimestamp(); const redeliveryCount: number = message1.getRedeliveryCount(); + const isReplicated: boolean = message1.isReplicated(); + const replicatedFrom: string = message1.getReplicatedFrom(); const partitionKey: string = message1.getPartitionKey(); const message3: Pulsar.Message = await reader1.readNext(); From 5d65ea12e392d27d18ef43d1fdc7e8da5c6c0c3d Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 23 Jun 2026 12:13:36 +0800 Subject: [PATCH 2/2] Address replication metadata review feedback --- src/Message.cc | 4 ++-- src/Message.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Message.cc b/src/Message.cc index 97c8a87..55fa7e3 100644 --- a/src/Message.cc +++ b/src/Message.cc @@ -63,7 +63,7 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("getPartitionKey", &Message::GetPartitionKey), InstanceMethod("getOrderingKey", &Message::GetOrderingKey), InstanceMethod("getProducerName", &Message::GetProducerName), - InstanceMethod("isReplicated", &Message::GetIsReplicated), + InstanceMethod("isReplicated", &Message::IsReplicated), InstanceMethod("getReplicatedFrom", &Message::GetReplicatedFrom), InstanceMethod("getEncryptionContext", &Message::GetEncryptionContext)}); @@ -174,7 +174,7 @@ Napi::Value Message::GetProducerName(const Napi::CallbackInfo &info) { return Napi::String::New(env, pulsar_message_get_producer_name(this->cMessage.get())); } -Napi::Value Message::GetIsReplicated(const Napi::CallbackInfo &info) { +Napi::Value Message::IsReplicated(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); if (!ValidateCMessage(env)) { return env.Null(); diff --git a/src/Message.h b/src/Message.h index 102107d..c84126c 100644 --- a/src/Message.h +++ b/src/Message.h @@ -46,7 +46,7 @@ class Message : public Napi::ObjectWrap { Napi::Value GetPartitionKey(const Napi::CallbackInfo &info); Napi::Value GetOrderingKey(const Napi::CallbackInfo &info); Napi::Value GetProducerName(const Napi::CallbackInfo &info); - Napi::Value GetIsReplicated(const Napi::CallbackInfo &info); + Napi::Value IsReplicated(const Napi::CallbackInfo &info); Napi::Value GetReplicatedFrom(const Napi::CallbackInfo &info); Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info); Napi::Value GetEncryptionContext(const Napi::CallbackInfo &info);