Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions examples/reader_listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const Pulsar = require('pulsar-client');

(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});

// Create a reader
const reader = await client.createReader({
topic: 'persistent://public/default/my-topic',
startMessageId: Pulsar.MessageId.latest(),
listener: (msg, reader) => {
console.log(msg.getData().toString());
},
});
})();
136 changes: 110 additions & 26 deletions src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "ReaderConfig.h"
#include <pulsar/c/result.h>
#include <pulsar/c/reader.h>
#include <atomic>
#include <thread>

Napi::FunctionReference Reader::constructor;

Expand All @@ -39,60 +41,120 @@ void Reader::Init(Napi::Env env, Napi::Object exports) {
constructor.SuppressDestruct();
}

void Reader::SetCReader(pulsar_reader_t *cReader) { this->cReader = cReader; }
struct ReaderListenerProxyData {
pulsar_message_t *cMessage;
Reader *reader;

ReaderListenerProxyData(pulsar_message_t *cMessage, Reader *reader) : cMessage(cMessage), reader(reader) {}
};

void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Reader *reader = data->reader;
delete data;

jsCallback.Call({msg, reader->Value()});
}

void ReaderListener(pulsar_reader_t *cReader, pulsar_message_t *cMessage, void *ctx) {
ReaderListenerCallback *readerListenerCallback = (ReaderListenerCallback *)ctx;
Reader *reader = (Reader *)readerListenerCallback->reader;
if (readerListenerCallback->callback.Acquire() != napi_ok) {
return;
}
ReaderListenerProxyData *dataPtr = new ReaderListenerProxyData(cMessage, reader);
readerListenerCallback->callback.BlockingCall(dataPtr, ReaderListenerProxy);
readerListenerCallback->callback.Release();
}

Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info) {}
void Reader::SetCReader(std::shared_ptr<CReaderWrapper> cReader) { this->wrapper = cReader; }
void Reader::SetListenerCallback(ReaderListenerCallback *listener) {
if (listener) {
// Maintain reference to reader, so it won't get garbage collected
// since, when we have a listener, we don't have to maintain reference to reader (in js code)
this->Ref();

// Pass reader as argument
listener->reader = this;
}

this->listener = listener;
}

Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info), listener(nullptr) {}

class ReaderNewInstanceWorker : public Napi::AsyncWorker {
public:
ReaderNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
ReaderConfig *readerConfig)
ReaderConfig *readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
cClient(cClient),
readerConfig(readerConfig) {}
readerConfig(readerConfig),
readerWrapper(readerWrapper),
done(false) {}
~ReaderNewInstanceWorker() {}
void Execute() {
const std::string &topic = this->readerConfig->GetTopic();
if (topic.empty()) {
SetError(std::string("Topic is required and must be specified as a string when creating reader"));
std::string msg("Topic is required and must be specified as a string when creating reader");
SetError(msg);
return;
}
if (this->readerConfig->GetCStartMessageId() == nullptr) {
SetError(std::string(
"StartMessageId is required and must be specified as a MessageId object when creating reader"));
std::string msg(
"StartMessageId is required and must be specified as a MessageId object when creating reader");
SetError(msg);
return;
}

pulsar_result result =
pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
this->readerConfig->GetCReaderConfig(), &(this->cReader));
delete this->readerConfig;
if (result != pulsar_result_Ok) {
SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
return;
pulsar_client_create_reader_async(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(),
this->readerConfig->GetCReaderConfig(),
&ReaderNewInstanceWorker::createReaderCallback, (void *)this);

while (!done) {
std::this_thread::yield();
}
}
void OnOK() {
Napi::Object obj = Reader::constructor.New({});
Reader *reader = Reader::Unwrap(obj);
reader->SetCReader(this->cReader);
reader->SetCReader(this->readerWrapper);
reader->SetListenerCallback(this->listener);
this->deferred.Resolve(obj);
}
void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }

private:
Napi::Promise::Deferred deferred;
pulsar_client_t *cClient;
ReaderConfig *readerConfig;
pulsar_reader_t *cReader;
ReaderConfig *readerConfig;
ReaderListenerCallback *listener;
std::shared_ptr<CReaderWrapper> readerWrapper;
std::atomic<bool> done;
static void createReaderCallback(pulsar_result result, pulsar_reader_t *reader, void *ctx) {
ReaderNewInstanceWorker *worker = (ReaderNewInstanceWorker *)ctx;
if (result != pulsar_result_Ok) {
worker->SetError(std::string("Failed to create reader: ") + pulsar_result_str(result));
} else {
worker->readerWrapper->cReader = reader;
worker->listener = worker->readerConfig->GetListenerCallback();
}

delete worker->readerConfig;
worker->done = true;
}
};

Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
Napi::Object config = info[0].As<Napi::Object>();
ReaderConfig *readerConfig = new ReaderConfig(config);
ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig);

std::shared_ptr<CReaderWrapper> readerWrapper = std::make_shared<CReaderWrapper>();

ReaderConfig *readerConfig = new ReaderConfig(config, readerWrapper, &ReaderListener);
ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig, readerWrapper);
wk->Queue();
return deferred.Promise();
}
Expand Down Expand Up @@ -133,19 +195,20 @@ class ReaderReadNextWorker : public Napi::AsyncWorker {
Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
if (info[0].IsUndefined()) {
ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader);
ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->wrapper->cReader);
wk->Queue();
} else {
Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader, timeout.Int64Value());
ReaderReadNextWorker *wk =
new ReaderReadNextWorker(deferred, this->wrapper->cReader, timeout.Int64Value());
wk->Queue();
}
return deferred.Promise();
}

Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) {
int value = 0;
pulsar_result result = pulsar_reader_has_message_available(this->cReader, &value);
pulsar_result result = pulsar_reader_has_message_available(this->wrapper->cReader, &value);
if (result != pulsar_result_Ok) {
Napi::Error::New(info.Env(), "Failed to check if next message is available").ThrowAsJavaScriptException();
return Napi::Boolean::New(info.Env(), false);
Expand All @@ -158,16 +221,20 @@ Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) {

class ReaderCloseWorker : public Napi::AsyncWorker {
public:
ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader)
ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader, Reader *reader)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
cReader(cReader) {}
cReader(cReader),
reader(reader) {}
~ReaderCloseWorker() {}
void Execute() {
pulsar_result result = pulsar_reader_close(this->cReader);
if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
}
void OnOK() { this->deferred.Resolve(Env().Null()); }
void OnOK() {
this->reader->Cleanup();
this->deferred.Resolve(Env().Null());
}
void OnError(const Napi::Error &e) {
this->deferred.Reject(
Napi::Error::New(Env(), std::string("Failed to close reader: ") + e.Message()).Value());
Expand All @@ -176,13 +243,30 @@ class ReaderCloseWorker : public Napi::AsyncWorker {
private:
Napi::Promise::Deferred deferred;
pulsar_reader_t *cReader;
Reader *reader;
};

Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader);
ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->wrapper->cReader, this);
wk->Queue();
return deferred.Promise();
}

Reader::~Reader() { pulsar_reader_free(this->cReader); }
void Reader::Cleanup() {
if (this->listener) {
this->CleanupListener();
}
}

void Reader::CleanupListener() {
this->Unref();
this->listener->callback.Release();
this->listener = nullptr;
}

Reader::~Reader() {
if (this->listener) {
this->CleanupListener();
}
}
9 changes: 7 additions & 2 deletions src/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <napi.h>
#include <pulsar/c/client.h>
#include "ReaderConfig.h"

class Reader : public Napi::ObjectWrap<Reader> {
public:
Expand All @@ -30,14 +31,18 @@ class Reader : public Napi::ObjectWrap<Reader> {
static Napi::FunctionReference constructor;
Reader(const Napi::CallbackInfo &info);
~Reader();
void SetCReader(pulsar_reader_t *cReader);
void SetCReader(std::shared_ptr<CReaderWrapper> cReader);
void SetListenerCallback(ReaderListenerCallback *listener);
void Cleanup();

private:
pulsar_reader_t *cReader;
std::shared_ptr<CReaderWrapper> wrapper;
ReaderListenerCallback *listener;

Napi::Value ReadNext(const Napi::CallbackInfo &info);
Napi::Value HasNext(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
void CleanupListener();
};

#endif
37 changes: 35 additions & 2 deletions src/ReaderConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
static const std::string CFG_READER_NAME = "readerName";
static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = "subscriptionRolePrefix";
static const std::string CFG_READ_COMPACTED = "readCompacted";
static const std::string CFG_LISTENER = "listener";

ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStartMessageId(NULL) {
void FinalizeListenerCallback(Napi::Env env, ReaderListenerCallback *cb, void *) { delete cb; }

ReaderConfig::ReaderConfig(const Napi::Object &readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper,
pulsar_reader_listener readerListener)
: topic(""), cStartMessageId(NULL), listener(nullptr) {
this->cReaderConfig = pulsar_reader_configuration_create();

if (readerConfig.Has(CFG_TOPIC) && readerConfig.Get(CFG_TOPIC).IsString()) {
Expand Down Expand Up @@ -67,12 +72,40 @@ ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStart
pulsar_reader_configuration_set_read_compacted(this->cReaderConfig, 1);
}
}

if (readerConfig.Has(CFG_LISTENER) && readerConfig.Get(CFG_LISTENER).IsFunction()) {
this->listener = new ReaderListenerCallback();
Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
readerConfig.Env(), readerConfig.Get(CFG_LISTENER).As<Napi::Function>(), "Reader Listener Callback",
1, 1, (void *)NULL, FinalizeListenerCallback, listener);
this->listener->callback = std::move(callback);
pulsar_reader_configuration_set_reader_listener(this->cReaderConfig, readerListener, this->listener);
}
}

ReaderConfig::~ReaderConfig() { pulsar_reader_configuration_free(this->cReaderConfig); }
ReaderConfig::~ReaderConfig() {
pulsar_reader_configuration_free(this->cReaderConfig);
if (this->listener) {
this->listener->callback.Release();
}
}

pulsar_reader_configuration_t *ReaderConfig::GetCReaderConfig() { return this->cReaderConfig; }

std::string ReaderConfig::GetTopic() { return this->topic; }

pulsar_message_id_t *ReaderConfig::GetCStartMessageId() { return this->cStartMessageId; }

ReaderListenerCallback *ReaderConfig::GetListenerCallback() {
ReaderListenerCallback *cb = this->listener;
this->listener = nullptr;
return cb;
}

CReaderWrapper::CReaderWrapper() : cReader(nullptr) {}

CReaderWrapper::~CReaderWrapper() {
if (this->cReader) {
pulsar_reader_free(this->cReader);
}
}
6 changes: 5 additions & 1 deletion src/ReaderConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,23 @@
#include <pulsar/c/reader.h>
#include <pulsar/c/reader_configuration.h>
#include <pulsar/c/message_id.h>
#include "ReaderListener.h"

class ReaderConfig {
public:
ReaderConfig(const Napi::Object &readerConfig);
ReaderConfig(const Napi::Object &readerConfig, std::shared_ptr<CReaderWrapper> readerWrapper,
pulsar_reader_listener readerListener);
~ReaderConfig();
pulsar_reader_configuration_t *GetCReaderConfig();
pulsar_message_id_t *GetCStartMessageId();
std::string GetTopic();
ReaderListenerCallback *GetListenerCallback();

private:
std::string topic;
pulsar_message_id_t *cStartMessageId;
pulsar_reader_configuration_t *cReaderConfig;
ReaderListenerCallback *listener;
};

#endif
Loading