Skip to content

[Bug]: Pulsar tests occasionally fail (TopicNotFoundException : Namespace not found) #1504

@HofmeisterAn

Description

@HofmeisterAn

Testcontainers version

develop

Using the latest Testcontainers version?

Yes

Host OS

Ubuntu 22.04

Host arch

x86

.NET version

9.0.300

Docker version

-

Docker info

-

What happened?

Occasionally, the Pulsar tests fail with the following error in our CI. I'm not a Pulsar expert and haven't found the root cause yet. It would be great if someone with more Pulsar knowledge could either share some insights or take a look at it. The issue is somewhat reproducible using GitHub Codespaces:

cd tests/Testcontainers.Pulsar.Tests/
dotnet build
until ! dotnet test --no-build; do :; done

Note: There are 4 tests in total, and if the CI fails, it's always just the one below.

public async Task ConsumerReceivesSendMessage()
{
// Given
const string helloPulsar = "Hello, Pulsar!";
var topic = $"persistent://public/default/{Guid.NewGuid():D}";
var name = Guid.NewGuid().ToString("D");
var clientBuilder = PulsarClient.Builder().ServiceUrl(new Uri(_pulsarContainer.GetBrokerAddress()));
if (_authenticationEnabled)
{
var authToken = await _pulsarContainer.CreateAuthenticationTokenAsync(Timeout.InfiniteTimeSpan, TestContext.Current.CancellationToken);
_ = clientBuilder.Authentication(new TokenAuthentication(authToken));
}
await using var client = clientBuilder.Build();
await using var producer = client.NewProducer(Schema.String)
.Topic(topic)
.Create();
await using var consumer = client.NewConsumer(Schema.String)
.Topic(topic)
.SubscriptionName(name)
.Create();
// When
_ = await consumer.OnStateChangeTo(ConsumerState.Active, TimeSpan.FromSeconds(10), TestContext.Current.CancellationToken)
.ConfigureAwait(true);
_ = await producer.OnStateChangeTo(ProducerState.Connected, TimeSpan.FromSeconds(10), TestContext.Current.CancellationToken)
.ConfigureAwait(true);
_ = await producer.Send(helloPulsar, cancellationToken: TestContext.Current.CancellationToken)
.ConfigureAwait(true);
var message = await consumer.Receive(TestContext.Current.CancellationToken)
.ConfigureAwait(true);
// Then
Assert.Equal(helloPulsar, Encoding.Default.GetString(message.Data));
}

Relevant log output

[xUnit.net 00:00:40.86]     Testcontainers.Pulsar.PulsarContainerTest+PulsarDefaultConfiguration.ConsumerReceivesSendMessage [FAIL]
  Failed Testcontainers.Pulsar.PulsarContainerTest+PulsarDefaultConfiguration.ConsumerReceivesSendMessage [40 s]
  Error Message:
   DotPulsar.Exceptions.ProducerFaultedException : Producer has faulted
---- DotPulsar.Exceptions.TopicNotFoundException : Namespace not found
  Stack Trace:
     at DotPulsar.Internal.Producer`1.ChoosePartitions(MessageMetadata metadata, CancellationToken cancellationToken)
   at DotPulsar.Internal.Producer`1.InternalSend(MessageMetadata metadata, TMessage message, Boolean sendOpCancelable, TaskCompletionSource`1 tcs, Func`2 onMessageSent, Action`1 onFailed, CancellationToken cancellationToken)
   at DotPulsar.Internal.Producer`1.Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
   at DotPulsar.Extensions.SendExtensions.Send[TMessage](ISend`1 sender, TMessage message, CancellationToken cancellationToken)
   at Testcontainers.Pulsar.PulsarContainerTest.ConsumerReceivesSendMessage() in /home/runner/work/testcontainers-dotnet/testcontainers-dotnet/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs:line 69
   at Testcontainers.Pulsar.PulsarContainerTest.ConsumerReceivesSendMessage() in /home/runner/work/testcontainers-dotnet/testcontainers-dotnet/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs:line 76
   at Testcontainers.Pulsar.PulsarContainerTest.ConsumerReceivesSendMessage() in /home/runner/work/testcontainers-dotnet/testcontainers-dotnet/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs:line 76
   at Testcontainers.Pulsar.PulsarContainerTest.ConsumerReceivesSendMessage() in /home/runner/work/testcontainers-dotnet/testcontainers-dotnet/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs:line 76
--- End of stack trace from previous location ---
----- Inner Stack Trace -----
   at DotPulsar.Internal.Extensions.CommandExtensions.Throw(ServerError error, String message)
   at DotPulsar.Internal.Extensions.CommandExtensions.Throw(CommandLookupTopicResponse command)
   at DotPulsar.Internal.ConnectionPool.FindConnectionForTopic(String topic, CancellationToken cancellationToken)
   at DotPulsar.Internal.ConnectionPool.GetNumberOfPartitions(String topic, CancellationToken cancellationToken)
   at DotPulsar.Internal.Producer`1.Monitor()
   at DotPulsar.Internal.Executor.TryExecuteOnce(Func`1 func, CancellationToken cancellationToken)
   at DotPulsar.Internal.Executor.TryExecuteOnce(Func`1 func, CancellationToken cancellationToken)
   at DotPulsar.Internal.Executor.Execute(Func`1 func, CancellationToken cancellationToken)
   at DotPulsar.Internal.Producer`1.Setup()

Additional information

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is neededmoduleAn official Testcontainers module

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions