From 6901de764de92a399c4175f87f401ec02ecd158d Mon Sep 17 00:00:00 2001 From: Yuto Furuta Date: Tue, 11 May 2021 11:06:55 +0900 Subject: [PATCH 1/3] support reader listener --- examples/reader_listener.js | 37 +++++++++++++ src/Reader.cc | 106 ++++++++++++++++++++++++++++++------ src/Reader.h | 9 ++- src/ReaderConfig.cc | 37 ++++++++++++- src/ReaderConfig.h | 6 +- src/ReaderListener.h | 39 +++++++++++++ tests/end_to_end.test.js | 50 +++++++++++++++++ 7 files changed, 261 insertions(+), 23 deletions(-) create mode 100644 examples/reader_listener.js create mode 100644 src/ReaderListener.h diff --git a/examples/reader_listener.js b/examples/reader_listener.js new file mode 100644 index 00000000..63809a60 --- /dev/null +++ b/examples/reader_listener.js @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const Pulsar = require('pulsar-client'); + +(async () => { + // Create a client + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + + // Create a reader + const reader = await client.createReader({ + topic: 'persistent://public/default/my-topic', + startMessageId: Pulsar.MessageId.latest(), + listener: (msg, reader) => { + console.log(msg.getData().toString()); + }, + }); +})(); diff --git a/src/Reader.cc b/src/Reader.cc index 81186280..8e39314e 100644 --- a/src/Reader.cc +++ b/src/Reader.cc @@ -39,18 +39,57 @@ void Reader::Init(Napi::Env env, Napi::Object exports) { constructor.SuppressDestruct(); } -void Reader::SetCReader(pulsar_reader_t *cReader) { this->cReader = cReader; } +struct ReaderListenerProxyData { + pulsar_message_t *cMessage; + Reader *reader; + + ReaderListenerProxyData(pulsar_message_t *cMessage, Reader *reader) : cMessage(cMessage), reader(reader) {} +}; + +void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) { + Napi::Object msg = Message::NewInstance({}, data->cMessage); + Reader *reader = data->reader; + delete data; + + jsCallback.Call({msg, reader->Value()}); +} + +void ReaderListener(pulsar_reader_t *cReader, pulsar_message_t *cMessage, void *ctx) { + ReaderListenerCallback *readerListenerCallback = (ReaderListenerCallback *)ctx; + Reader *reader = (Reader *)readerListenerCallback->reader; + if (readerListenerCallback->callback.Acquire() != napi_ok) { + return; + } + ReaderListenerProxyData *dataPtr = new ReaderListenerProxyData(cMessage, reader); + readerListenerCallback->callback.BlockingCall(dataPtr, ReaderListenerProxy); + readerListenerCallback->callback.Release(); +} + +void Reader::SetCReader(std::shared_ptr cReader) { this->wrapper = cReader; } +void Reader::SetListenerCallback(ReaderListenerCallback *listener) { + if (listener) { + // Maintain reference to reader, so it won't get garbage collected + // since, when we have a listener, we don't have to maintain reference to reader (in js code) + this->Ref(); + + // Pass reader as argument + listener->reader = this; + } -Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info) {} + this->listener = listener; +} + +Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info), listener(nullptr) {} class ReaderNewInstanceWorker : public Napi::AsyncWorker { public: ReaderNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient, - ReaderConfig *readerConfig) + ReaderConfig *readerConfig, std::shared_ptr readerWrapper) : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), deferred(deferred), cClient(cClient), - readerConfig(readerConfig) {} + readerConfig(readerConfig), + readerWrapper(readerWrapper) {} ~ReaderNewInstanceWorker() {} void Execute() { const std::string &topic = this->readerConfig->GetTopic(); @@ -66,17 +105,20 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker { pulsar_result result = pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(), - this->readerConfig->GetCReaderConfig(), &(this->cReader)); - delete this->readerConfig; + this->readerConfig->GetCReaderConfig(), &this->readerWrapper->cReader); if (result != pulsar_result_Ok) { SetError(std::string("Failed to create reader: ") + pulsar_result_str(result)); return; + } else { + this->listener = this->readerConfig->GetListenerCallback(); } + delete this->readerConfig; } void OnOK() { Napi::Object obj = Reader::constructor.New({}); Reader *reader = Reader::Unwrap(obj); - reader->SetCReader(this->cReader); + reader->SetCReader(this->readerWrapper); + reader->SetListenerCallback(this->listener); this->deferred.Resolve(obj); } void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); } @@ -84,15 +126,20 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker { private: Napi::Promise::Deferred deferred; pulsar_client_t *cClient; - ReaderConfig *readerConfig; pulsar_reader_t *cReader; + ReaderConfig *readerConfig; + ReaderListenerCallback *listener; + std::shared_ptr readerWrapper; }; Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) { Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); Napi::Object config = info[0].As(); - ReaderConfig *readerConfig = new ReaderConfig(config); - ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig); + + std::shared_ptr readerWrapper = std::make_shared(); + + ReaderConfig *readerConfig = new ReaderConfig(config, readerWrapper, &ReaderListener); + ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig, readerWrapper); wk->Queue(); return deferred.Promise(); } @@ -133,11 +180,12 @@ class ReaderReadNextWorker : public Napi::AsyncWorker { Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) { Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); if (info[0].IsUndefined()) { - ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader); + ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->wrapper->cReader); wk->Queue(); } else { Napi::Number timeout = info[0].As().ToNumber(); - ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader, timeout.Int64Value()); + ReaderReadNextWorker *wk = + new ReaderReadNextWorker(deferred, this->wrapper->cReader, timeout.Int64Value()); wk->Queue(); } return deferred.Promise(); @@ -145,7 +193,7 @@ Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) { Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) { int value = 0; - pulsar_result result = pulsar_reader_has_message_available(this->cReader, &value); + pulsar_result result = pulsar_reader_has_message_available(this->wrapper->cReader, &value); if (result != pulsar_result_Ok) { Napi::Error::New(info.Env(), "Failed to check if next message is available").ThrowAsJavaScriptException(); return Napi::Boolean::New(info.Env(), false); @@ -158,16 +206,20 @@ Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) { class ReaderCloseWorker : public Napi::AsyncWorker { public: - ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader) + ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader, Reader *reader) : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), deferred(deferred), - cReader(cReader) {} + cReader(cReader), + reader(reader) {} ~ReaderCloseWorker() {} void Execute() { pulsar_result result = pulsar_reader_close(this->cReader); if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); } - void OnOK() { this->deferred.Resolve(Env().Null()); } + void OnOK() { + this->reader->Cleanup(); + this->deferred.Resolve(Env().Null()); + } void OnError(const Napi::Error &e) { this->deferred.Reject( Napi::Error::New(Env(), std::string("Failed to close reader: ") + e.Message()).Value()); @@ -176,13 +228,31 @@ class ReaderCloseWorker : public Napi::AsyncWorker { private: Napi::Promise::Deferred deferred; pulsar_reader_t *cReader; + Reader *reader; }; Napi::Value Reader::Close(const Napi::CallbackInfo &info) { Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader); + ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->wrapper->cReader, this); wk->Queue(); return deferred.Promise(); } -Reader::~Reader() { pulsar_reader_free(this->cReader); } +void Reader::Cleanup() { + if (this->listener) { + this->CleanupListener(); + } +} + +void Reader::CleanupListener() { + this->Unref(); + this->listener->callback.Release(); + this->listener = nullptr; +} + +Reader::~Reader() { + if (this->listener) { + this->CleanupListener(); + } + pulsar_reader_free(this->wrapper->cReader); +} diff --git a/src/Reader.h b/src/Reader.h index 787b732d..8242fa91 100644 --- a/src/Reader.h +++ b/src/Reader.h @@ -22,6 +22,7 @@ #include #include +#include "ReaderConfig.h" class Reader : public Napi::ObjectWrap { public: @@ -30,14 +31,18 @@ class Reader : public Napi::ObjectWrap { static Napi::FunctionReference constructor; Reader(const Napi::CallbackInfo &info); ~Reader(); - void SetCReader(pulsar_reader_t *cReader); + void SetCReader(std::shared_ptr cReader); + void SetListenerCallback(ReaderListenerCallback *listener); + void Cleanup(); private: - pulsar_reader_t *cReader; + std::shared_ptr wrapper; + ReaderListenerCallback *listener; Napi::Value ReadNext(const Napi::CallbackInfo &info); Napi::Value HasNext(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); + void CleanupListener(); }; #endif diff --git a/src/ReaderConfig.cc b/src/ReaderConfig.cc index 040d98f7..dd027b83 100644 --- a/src/ReaderConfig.cc +++ b/src/ReaderConfig.cc @@ -27,8 +27,13 @@ static const std::string CFG_RECV_QUEUE = "receiverQueueSize"; static const std::string CFG_READER_NAME = "readerName"; static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = "subscriptionRolePrefix"; static const std::string CFG_READ_COMPACTED = "readCompacted"; +static const std::string CFG_LISTENER = "listener"; -ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStartMessageId(NULL) { +void FinalizeListenerCallback(Napi::Env env, ReaderListenerCallback *cb, void *) { delete cb; } + +ReaderConfig::ReaderConfig(const Napi::Object &readerConfig, std::shared_ptr readerWrapper, + pulsar_reader_listener readerListener) + : topic(""), cStartMessageId(NULL), listener(nullptr) { this->cReaderConfig = pulsar_reader_configuration_create(); if (readerConfig.Has(CFG_TOPIC) && readerConfig.Get(CFG_TOPIC).IsString()) { @@ -67,12 +72,40 @@ ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStart pulsar_reader_configuration_set_read_compacted(this->cReaderConfig, 1); } } + + if (readerConfig.Has(CFG_LISTENER) && readerConfig.Get(CFG_LISTENER).IsFunction()) { + this->listener = new ReaderListenerCallback(); + Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New( + readerConfig.Env(), readerConfig.Get(CFG_LISTENER).As(), "Reader Listener Callback", + 1, 1, (void *)NULL, FinalizeListenerCallback, listener); + this->listener->callback = std::move(callback); + pulsar_reader_configuration_set_reader_listener(this->cReaderConfig, readerListener, this->listener); + } } -ReaderConfig::~ReaderConfig() { pulsar_reader_configuration_free(this->cReaderConfig); } +ReaderConfig::~ReaderConfig() { + pulsar_reader_configuration_free(this->cReaderConfig); + if (this->listener) { + this->listener->callback.Release(); + } +} pulsar_reader_configuration_t *ReaderConfig::GetCReaderConfig() { return this->cReaderConfig; } std::string ReaderConfig::GetTopic() { return this->topic; } pulsar_message_id_t *ReaderConfig::GetCStartMessageId() { return this->cStartMessageId; } + +ReaderListenerCallback *ReaderConfig::GetListenerCallback() { + ReaderListenerCallback *cb = this->listener; + this->listener = nullptr; + return cb; +} + +CReaderWrapper::CReaderWrapper() : cReader(nullptr) {} + +CReaderWrapper::~CReaderWrapper() { + if (this->cReader) { + pulsar_reader_free(this->cReader); + } +} diff --git a/src/ReaderConfig.h b/src/ReaderConfig.h index 69fc634d..0dacd35e 100644 --- a/src/ReaderConfig.h +++ b/src/ReaderConfig.h @@ -24,19 +24,23 @@ #include #include #include +#include "ReaderListener.h" class ReaderConfig { public: - ReaderConfig(const Napi::Object &readerConfig); + ReaderConfig(const Napi::Object &readerConfig, std::shared_ptr readerWrapper, + pulsar_reader_listener readerListener); ~ReaderConfig(); pulsar_reader_configuration_t *GetCReaderConfig(); pulsar_message_id_t *GetCStartMessageId(); std::string GetTopic(); + ReaderListenerCallback *GetListenerCallback(); private: std::string topic; pulsar_message_id_t *cStartMessageId; pulsar_reader_configuration_t *cReaderConfig; + ReaderListenerCallback *listener; }; #endif diff --git a/src/ReaderListener.h b/src/ReaderListener.h new file mode 100644 index 00000000..a8ff255c --- /dev/null +++ b/src/ReaderListener.h @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef READER_LISTENER_H +#define READER_LISTENER_H + +#include +#include + +struct CReaderWrapper { + pulsar_reader_t *cReader; + CReaderWrapper(); + ~CReaderWrapper(); +}; + +struct ReaderListenerCallback { + Napi::ThreadSafeFunction callback; + + // Using reader as void* since the ReaderListenerCallback is shared between Config and Reader. + void *reader; +}; + +#endif diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 4acf808e..46364e00 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -213,6 +213,56 @@ const Pulsar = require('../index.js'); await client.close(); }); + test('Produce/Read Listener', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + + const topic = 'persistent://public/default/produce-read-listener'; + const producer = await client.createProducer({ + topic, + sendTimeoutMs: 30000, + batchingEnabled: true, + }); + expect(producer).not.toBeNull(); + + let finish; + const results = []; + const finishPromise = new Promise((resolve) => { + finish = resolve; + }); + + const reader = await client.createReader({ + topic, + startMessageId: Pulsar.MessageId.latest(), + listener: (message) => { + const data = message.getData().toString(); + results.push(data); + if (results.length === 10) finish(); + }, + }); + + expect(reader).not.toBeNull(); + + const messages = []; + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + messages.push(msg); + } + await producer.flush(); + + await finishPromise; + expect(lodash.difference(messages, results)).toEqual([]); + + await producer.close(); + await reader.close(); + await client.close(); + }); + test('acknowledgeCumulative', async () => { const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', From a28aade0b2041e7d5c08cfdbf9777bb695e87f09 Mon Sep 17 00:00:00 2001 From: Yuto Furuta Date: Tue, 18 May 2021 13:40:27 +0900 Subject: [PATCH 2/3] fix reader --- src/Reader.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Reader.cc b/src/Reader.cc index 8e39314e..67a86b43 100644 --- a/src/Reader.cc +++ b/src/Reader.cc @@ -108,11 +108,16 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker { this->readerConfig->GetCReaderConfig(), &this->readerWrapper->cReader); if (result != pulsar_result_Ok) { SetError(std::string("Failed to create reader: ") + pulsar_result_str(result)); + if (this->readerConfig) { + delete this->readerConfig; + } return; } else { this->listener = this->readerConfig->GetListenerCallback(); } - delete this->readerConfig; + if (this->readerConfig) { + delete this->readerConfig; + } } void OnOK() { Napi::Object obj = Reader::constructor.New({}); @@ -254,5 +259,4 @@ Reader::~Reader() { if (this->listener) { this->CleanupListener(); } - pulsar_reader_free(this->wrapper->cReader); } From e4c7840824ab8c61f4f8effb8a7ef6ae5a4716c9 Mon Sep 17 00:00:00 2001 From: Yuto Furuta Date: Tue, 18 May 2021 14:53:44 +0900 Subject: [PATCH 3/3] use createReaderCallback --- src/Reader.cc | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/src/Reader.cc b/src/Reader.cc index 67a86b43..885431fa 100644 --- a/src/Reader.cc +++ b/src/Reader.cc @@ -22,6 +22,8 @@ #include "ReaderConfig.h" #include #include +#include +#include Napi::FunctionReference Reader::constructor; @@ -89,34 +91,29 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker { deferred(deferred), cClient(cClient), readerConfig(readerConfig), - readerWrapper(readerWrapper) {} + readerWrapper(readerWrapper), + done(false) {} ~ReaderNewInstanceWorker() {} void Execute() { const std::string &topic = this->readerConfig->GetTopic(); if (topic.empty()) { - SetError(std::string("Topic is required and must be specified as a string when creating reader")); + std::string msg("Topic is required and must be specified as a string when creating reader"); + SetError(msg); return; } if (this->readerConfig->GetCStartMessageId() == nullptr) { - SetError(std::string( - "StartMessageId is required and must be specified as a MessageId object when creating reader")); + std::string msg( + "StartMessageId is required and must be specified as a MessageId object when creating reader"); + SetError(msg); return; } - pulsar_result result = - pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(), - this->readerConfig->GetCReaderConfig(), &this->readerWrapper->cReader); - if (result != pulsar_result_Ok) { - SetError(std::string("Failed to create reader: ") + pulsar_result_str(result)); - if (this->readerConfig) { - delete this->readerConfig; - } - return; - } else { - this->listener = this->readerConfig->GetListenerCallback(); - } - if (this->readerConfig) { - delete this->readerConfig; + pulsar_client_create_reader_async(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(), + this->readerConfig->GetCReaderConfig(), + &ReaderNewInstanceWorker::createReaderCallback, (void *)this); + + while (!done) { + std::this_thread::yield(); } } void OnOK() { @@ -135,6 +132,19 @@ class ReaderNewInstanceWorker : public Napi::AsyncWorker { ReaderConfig *readerConfig; ReaderListenerCallback *listener; std::shared_ptr readerWrapper; + std::atomic done; + static void createReaderCallback(pulsar_result result, pulsar_reader_t *reader, void *ctx) { + ReaderNewInstanceWorker *worker = (ReaderNewInstanceWorker *)ctx; + if (result != pulsar_result_Ok) { + worker->SetError(std::string("Failed to create reader: ") + pulsar_result_str(result)); + } else { + worker->readerWrapper->cReader = reader; + worker->listener = worker->readerConfig->GetListenerCallback(); + } + + delete worker->readerConfig; + worker->done = true; + } }; Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {