diff --git a/.github/workflows/docker-validate.yml b/.github/workflows/docker-validate.yml index 68ce475d..7a89d352 100644 --- a/.github/workflows/docker-validate.yml +++ b/.github/workflows/docker-validate.yml @@ -11,7 +11,7 @@ permissions: env: # Pinned commit for cpp-example-collection smoke build (https://github.com/livekit-examples/cpp-example-collection) - CPP_EXAMPLE_COLLECTION_REF: f231c0c75028d1dcf13edcecd369d030d2c7c8d4 + CPP_EXAMPLE_COLLECTION_REF: 46083ea4e5d3def8e44a53148c2c7800131efca0 jobs: validate-x64: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 55ff58df..b998be8f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -63,13 +63,11 @@ jobs: - os: macos-26-xlarge name: macos-arm64 build_cmd: ./build.sh release-tests - # E2E not possible on GHA Mac runner currently - e2e-testing: false + e2e-testing: true - os: macos-26-large name: macos-x64 build_cmd: ./build.sh release-tests --macos-arch x86_64 - # E2E not possible on GHA Mac runner currently - e2e-testing: false + e2e-testing: true # Pinned to Windows 2022 for current VS 17 implementation - os: windows-2022 name: windows-x64 diff --git a/cpp-example-collection b/cpp-example-collection index 46083ea4..839e38fa 160000 --- a/cpp-example-collection +++ b/cpp-example-collection @@ -1 +1 @@ -Subproject commit 46083ea4e5d3def8e44a53148c2c7800131efca0 +Subproject commit 839e38fa3079c1c63efeeb73ca476bd1c42afc51 diff --git a/src/tests/integration/test_data_track.cpp b/src/tests/integration/test_data_track.cpp index f0d3dc63..54fcbc3c 100644 --- a/src/tests/integration/test_data_track.cpp +++ b/src/tests/integration/test_data_track.cpp @@ -25,15 +25,12 @@ #include #include -#include #include #include #include -#include #include "../common/test_common.h" #include "ffi_client.h" -#include "lk_log.h" namespace livekit::test { @@ -45,9 +42,12 @@ constexpr char kTrackNamePrefix[] = "data_track_e2e"; constexpr auto kTrackWaitTimeout = 10s; constexpr auto kPollingInterval = 10ms; constexpr int kResubscribeIterations = 10; +constexpr std::size_t kSinglePacketPayloadBytes = 8192; constexpr int kPublishManyTrackCount = 256; constexpr auto kPublishManyTimeout = 5s; constexpr std::size_t kLargeFramePayloadBytes = 196608; +constexpr auto kTransportFrameTimeout = 15s; +constexpr std::uint8_t kTransportPayloadValue = 0xFA; constexpr char kE2EESharedSecret[] = "password"; constexpr int kE2EEFrameCount = 5; constexpr int kTimestampFrameAttempts = 200; @@ -287,24 +287,15 @@ void runEncryptedDataTrackRoundTrip(KeyDerivationFunction key_derivation_functio class DataTrackE2ETest : public LiveKitTestBase {}; -class DataTrackTransportTest : public DataTrackE2ETest, - public ::testing::WithParamInterface> {}; +class DataTrackTransportTest : public DataTrackE2ETest, public ::testing::WithParamInterface {}; class DataTrackKeyDerivationTest : public DataTrackE2ETest, public ::testing::WithParamInterface {}; TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { - const auto publish_fps = std::get<0>(GetParam()); - const auto payload_len = std::get<1>(GetParam()); + const auto payload_len = GetParam(); const auto track_name = makeTrackName("transport"); - // How long to publish frames for. - constexpr auto PUBLISH_DURATION = 10s; - - // Percentage of total frames that must be received on the subscriber end in - // order for the test to pass. - constexpr float MIN_PERCENTAGE = 0.90f; - std::vector room_configs(2); room_configs[0].room_options.single_peer_connection = false; room_configs[1].room_options.single_peer_connection = false; @@ -316,104 +307,57 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { auto& publisher_room = rooms[0]; const auto publisher_identity = publisher_room->localParticipant()->identity(); - auto track = requirePublishedTrack(publisher_room->localParticipant(), track_name); - std::cerr << "Track published\n"; + auto local_track = requirePublishedTrack(publisher_room->localParticipant(), track_name); + ASSERT_TRUE(local_track->isPublished()); + EXPECT_FALSE(local_track->info().uses_e2ee); + EXPECT_EQ(local_track->info().name, track_name); auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); - std::cerr << "Got remote track: " << remote_track->info().sid << "\n"; - ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; EXPECT_TRUE(remote_track->isPublished()); EXPECT_FALSE(remote_track->info().uses_e2ee); EXPECT_EQ(remote_track->info().name, track_name); EXPECT_EQ(remote_track->publisherIdentity(), publisher_identity); - const auto frame_count = - static_cast(std::llround(std::chrono::duration(PUBLISH_DURATION).count() * publish_fps)); - - auto publish = [&]() { - if (!track->isPublished()) { - throw std::runtime_error("Publisher failed to publish data track"); - } - if (track->info().uses_e2ee) { - throw std::runtime_error("Unexpected E2EE on test data track"); - } - if (track->info().name != track_name) { - throw std::runtime_error("Published track name mismatch"); - } - - const auto frame_interval = std::chrono::duration_cast( - std::chrono::duration(1.0 / publish_fps)); - auto next_send = std::chrono::steady_clock::now(); - - std::cout << "Publishing " << frame_count << " frames with payload length " << payload_len << '\n'; - for (size_t index = 0; index < frame_count; ++index) { - std::vector payload(payload_len, static_cast(index)); - requirePushSuccess(track->tryPush(std::move(payload)), "Failed to push data frame"); - - next_send += frame_interval; - std::this_thread::sleep_until(next_send); - } - - track->unpublishDataTrack(); - }; - auto subscribe_result = remote_track->subscribe(); if (!subscribe_result) { FAIL() << describeDataTrackError(subscribe_result.error()); } auto subscription = subscribe_result.value(); - std::promise receive_count_promise; - auto receive_count_future = receive_count_promise.get_future(); - - auto subscribe = [&]() { - size_t received_count = 0; + std::atomic keep_publishing{true}; + auto publisher = std::async(std::launch::async, [&]() { DataTrackFrame frame; - while (subscription->read(frame) && received_count < frame_count) { - if (frame.payload.empty()) { - throw std::runtime_error("Received empty data frame"); - } - - const auto first_byte = frame.payload.front(); - if (!std::all_of(frame.payload.begin(), frame.payload.end(), - [first_byte](std::uint8_t byte) { return byte == first_byte; })) { - throw std::runtime_error("Received frame with inconsistent payload"); - } - if (frame.user_timestamp.has_value()) { - throw std::runtime_error("Received unexpected user timestamp in transport test"); - } - - ++received_count; + frame.payload.assign(payload_len, kTransportPayloadValue); + while (keep_publishing.load()) { + requirePushSuccess(local_track->tryPush(frame), "Failed to push data frame"); + std::this_thread::sleep_for(50ms); } + }); - receive_count_promise.set_value(received_count); - }; - - // Launch both publisher and subscriber - auto pub_fut = std::async(std::launch::async, publish); - auto sub_fut = std::async(std::launch::async, subscribe); - - // Wait for both, with a combined deadline (the timeout(...) wrapper). - const auto deadline = std::chrono::steady_clock::now() + PUBLISH_DURATION + 25s; - - const bool pub_ok = pub_fut.wait_until(deadline) == std::future_status::ready; - const bool sub_ok = sub_fut.wait_until(deadline) == std::future_status::ready; - - if (!pub_ok || !sub_ok) { - ADD_FAILURE() << "Timed out waiting for data frames"; + DataTrackFrame frame; + std::exception_ptr read_error; + try { + frame = readFrameWithTimeout(subscription, kTransportFrameTimeout); + } catch (...) { + read_error = std::current_exception(); } - // Equivalent of `try_join!`'s ? — re-throws any exception from either task - pub_fut.get(); - sub_fut.get(); + const bool remote_track_published_after_read = remote_track->isPublished(); + keep_publishing.store(false); + subscription->close(); + local_track->unpublishDataTrack(); - const auto received_count = receive_count_future.get(); - const auto received_percent = static_cast(received_count) / static_cast(frame_count); - std::cout << "Received " << received_count << "/" << frame_count << " frames (" << received_percent * 100.0f << "%)" - << '\n'; + publisher.get(); + if (read_error) { + std::rethrow_exception(read_error); + } - EXPECT_GE(received_percent, MIN_PERCENTAGE) << "Received " << received_count << "/" << frame_count << " frames"; + ASSERT_EQ(frame.payload.size(), payload_len); + EXPECT_TRUE(std::all_of(frame.payload.begin(), frame.payload.end(), + [](std::uint8_t byte) { return byte == kTransportPayloadValue; })); + EXPECT_FALSE(frame.user_timestamp.has_value()); + EXPECT_TRUE(remote_track_published_after_read); } TEST_F(DataTrackE2ETest, UnpublishUpdatesPublishedStateEndToEnd) { @@ -853,20 +797,23 @@ TEST_F(DataTrackE2ETest, PreservesUserTimestampOnEncryptedDataTrack) { local_track->unpublishDataTrack(); } -std::string dataTrackParamName(const ::testing::TestParamInfo>& info) { - if (std::get<0>(info.param) > 100.0) { - return "HighFpsSinglePacket"; +std::string dataTrackPayloadParamName(const ::testing::TestParamInfo& info) { + if (info.param == kSinglePacketPayloadBytes) { + return "SinglePacket"; + } + if (info.param == kLargeFramePayloadBytes) { + return "MultiPacket"; } - return "LowFpsMultiPacket"; + return "Payload" + std::to_string(info.param); } std::string keyDerivationParamName(const ::testing::TestParamInfo& info) { return keyDerivationFunctionName(info.param); } -INSTANTIATE_TEST_SUITE_P(DataTrackScenarios, DataTrackTransportTest, - ::testing::Values(std::make_tuple(120.0, size_t{8192}), std::make_tuple(10.0, size_t{196608})), - dataTrackParamName); +INSTANTIATE_TEST_SUITE_P(DataTrackPayloads, DataTrackTransportTest, + ::testing::Values(kSinglePacketPayloadBytes, kLargeFramePayloadBytes), + dataTrackPayloadParamName); INSTANTIATE_TEST_SUITE_P(KeyDerivationFunctions, DataTrackKeyDerivationTest, ::testing::Values(KeyDerivationFunction::PBKDF2, KeyDerivationFunction::HKDF),