Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ export class Message {
getPartitionKey(): string;
getOrderingKey(): string;
getProducerName(): string;
isReplicated(): boolean;
getReplicatedFrom(): string;
getEncryptionContext(): EncryptionContext | null;
}

Expand Down
25 changes: 25 additions & 0 deletions src/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("getPartitionKey", &Message::GetPartitionKey),
InstanceMethod("getOrderingKey", &Message::GetOrderingKey),
InstanceMethod("getProducerName", &Message::GetProducerName),
InstanceMethod("isReplicated", &Message::IsReplicated),
InstanceMethod("getReplicatedFrom", &Message::GetReplicatedFrom),
InstanceMethod("getEncryptionContext", &Message::GetEncryptionContext)});

constructor = Napi::Persistent(func);
Expand Down Expand Up @@ -172,6 +174,29 @@ Napi::Value Message::GetProducerName(const Napi::CallbackInfo &info) {
return Napi::String::New(env, pulsar_message_get_producer_name(this->cMessage.get()));
}

Napi::Value Message::IsReplicated(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
if (!ValidateCMessage(env)) {
return env.Null();
}

const char *replicatedFrom = pulsar_message_get_replicated_from(this->cMessage.get());
return Napi::Boolean::New(env, replicatedFrom && replicatedFrom[0] != '\0');
}

Napi::Value Message::GetReplicatedFrom(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
if (!ValidateCMessage(env)) {
return env.Null();
}

const char *replicatedFrom = pulsar_message_get_replicated_from(this->cMessage.get());
if (!replicatedFrom) {
return Napi::String::New(env, "");
}
return Napi::String::New(env, replicatedFrom);
}

Napi::Value Message::GetEncryptionContext(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
if (!ValidateCMessage(env)) {
Expand Down
2 changes: 2 additions & 0 deletions src/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Message : public Napi::ObjectWrap<Message> {
Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
Napi::Value GetOrderingKey(const Napi::CallbackInfo &info);
Napi::Value GetProducerName(const Napi::CallbackInfo &info);
Napi::Value IsReplicated(const Napi::CallbackInfo &info);
Napi::Value GetReplicatedFrom(const Napi::CallbackInfo &info);
Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info);
Napi::Value GetEncryptionContext(const Napi::CallbackInfo &info);
bool ValidateCMessage(Napi::Env env);
Expand Down
2 changes: 2 additions & 0 deletions tests/end_to_end.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
const msg = await consumer.receive();
consumer.acknowledge(msg);
expect(msg.getProducerName()).toBe(producerName);
expect(msg.isReplicated()).toBe(false);
expect(msg.getReplicatedFrom()).toBe('');
results.push(msg.getData().toString());
}
expect(lodash.difference(messages, results)).toEqual([]);
Expand Down Expand Up @@ -318,7 +320,7 @@
});

let consumer2Recv = 0;
while (true) {

Check warning on line 323 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

Unexpected constant condition
try {
const msg = await consumer2.receive(3000);
await new Promise((resolve) => setTimeout(resolve, 10));
Expand Down Expand Up @@ -363,7 +365,7 @@
topic,
startMessageId: Pulsar.MessageId.earliest(),
receiverQueueSize: 10,
listener: async (message, reader) => {

Check warning on line 368 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'reader' is defined but never used

Check warning on line 368 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
await new Promise((resolve) => setTimeout(resolve, 10));
reader1Recv += 1;
},
Expand Down Expand Up @@ -395,7 +397,7 @@
await client.close();
});

test('Message Listener error handling', async () => {

Check warning on line 400 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

Test has no assertions
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
Expand Down Expand Up @@ -431,7 +433,7 @@
subscription: 'sync',
subscriptionType: 'Shared',
subscriptionInitialPosition: 'Earliest',
listener: (message, messageConsumer) => {

Check warning on line 436 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'messageConsumer' is defined but never used

Check warning on line 436 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
throw new Error('consumer1 callback expected error');
},
});
Expand All @@ -441,7 +443,7 @@
subscription: 'async',
subscriptionType: 'Shared',
subscriptionInitialPosition: 'Earliest',
listener: async (message, messageConsumer) => {

Check warning on line 446 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'messageConsumer' is defined but never used

Check warning on line 446 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
throw new Error('consumer2 callback expected error');
},
});
Expand Down
2 changes: 2 additions & 0 deletions tstest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ import Pulsar = require('./index');
const publishTime: number = message1.getPublishTimestamp();
const eventTime: number = message1.getEventTimestamp();
const redeliveryCount: number = message1.getRedeliveryCount();
const isReplicated: boolean = message1.isReplicated();
const replicatedFrom: string = message1.getReplicatedFrom();
const partitionKey: string = message1.getPartitionKey();

const message3: Pulsar.Message = await reader1.readNext();
Expand Down
Loading