Testing reactor flux chain that uses combineLatest doesn’t work as expected

I had a class that performed a creation of Flux from many Mono’s and then performed processing on that Flux. The first step in that process was

Flux.combineLatest(flux1, flux2, (obj1, obj2) -> Tuples.of(obj1, obj2)
    .map(...)
    .flatMap(...)

In the unit test where I was testing this class, since I was assembling a Flux in the first place, I wasn’t providing a Flux but a series of Mono’s.

But, this is how I was assembling the flux:

public Flux<RecordB> fluxOfRecordsB() {
    return Flux.<Record>create(recordBFluxSink ->
            this.<Record>fetchAndEmitRecord(recordBFluxSink, 
            recordProvider::getRecordFromB));
}

private void fetchAndEmitRecord(
    FluxSink<Record> fluxSink,
    Supplier<Mono<Record>> getterFunction) {
    getterFunction.get()
        .delayElement(java.time.Duration.ofMillis(1000))
        .subscribeOn(Schedulers.single())
        .subscribe(...);
}

so when I was testing I then supplied Mono’s like this:

when(recordProvider.getRecord()).thenAnswer(new Answer<Mono<Record>>() {
    @Override
    public Mono<RecordA> answer(InvocationOnMock invocationOnMock) throws Throwable {
        return monoList.get(callCounter.getAndIncrement());
    }
});

And this worked! But, then I thought, ok let’s refactor, so I took the Flux assembling to another class, so that the class that processes the Flux has less things it is doing. So that meant that in my unit test I would not be providing a series of Mono’s but a Flux!

In the above code you can see the line return monoList.get(callCounter.getAndIncrement()); where callCounter is AtomicInteger that I increment every time so that I do get a different Mono, which is later verified by StepVerifier.

But onwards! After refactoring, I was going to “just” provide a Flux of same values that I previously supplied as Mono’s but that doesn’t work anymore!!

Here’s the test code:

@Test
void givenThis_whenThat_validateResult() {
    // given
    var recordA1 = RecordA.builder().id("1").build();
    var recordA2 = RecordA.builder().id("2").build();
    var recordA3 = RecordA.builder().id("3").build();
    List<RecordA> recordAList = List.of(recA1);
    var fluxOfA = Flux.fromIterable(recordAList);

    var recordB1 = RecordB.builder().id("1").build();
    var recordB2 = RecordB.builder().id("2").build();
    var recordB3 = RecordB.builder().id("3").build();
    List<RecordB> recordBList = List.of(recB1);
    var fluxOfB = Flux.fromIterable(recordBList);

    // when
    when(dataStreamer.fluxOfRecordsA()).then(new Answer<Flux<RecordA>>() {
        @Override
        public Flux<RecordA> answer(InvocationOnMock invocationOnMock) throws Throwable {
            return Flux.just(recA1);
        }
    });
    // There is also identical code here as above but for fluxOfRecordsB

    when(resultSender.sendResultToApi(any(ResultRecord.class))).thenReturn(Mono.empty());
    recordProcessor.processRecords();

    // then
    verify(resultSender, times(1)).sendResultToApi(result1);
}

To explain but probably not needed, resultSender is a mocked class that is interacted with from within the processRecords() method. Below I will provide code for this method. Anyway, I also tried different approach to providing Flux, instead of .then I tried .thenAnswer and .thenReturn. None worked, unit test is still not working. But it is also weird how is it not working.

When I provide a Flux of three records, for both fluxA and fluxB the test fails because I see that the Tuple that is passed down the reactor chain is such that the following tuples are processed:

Tuple(recordA3, recordB1)
Tuple(recordA3, recordB2)
Tuple(recordA3, recordB3)

and that is weird. At first I thought it is my mistake, maybe still is, because if I provide a Flux of 3 items, of course it will take the last as it is combineLatest but when I did a separate test, like the one below, I really got Tuples I expected:

Tuple(recordA1, recordB1)
Tuple(recordA2, recordB2)
Tuple(recordA3, recordB3)

Here is this separate test below:

@Test
void testCombineLatest() {
    // given
    var recA1 = RecordA.builder().id("1").done(false).build();
    var recA2 = RecordA.builder().id("2").done(false).build();
    var recA3 = RecordA.builder().id("3").done(false).build();
    List<RecordA> recordAList = List.of(
            recA1,
            recA2,
            recA3);
    var fluxOfA = Flux.fromIterable(recordAList);

    var recB1 = RecordB.builder().id("1").done(false).build();
    var recB2 = RecordB.builder().id("2").done(false).build();
    var recB3 = RecordB.builder().id("3").done(false).build();
    List<RecordB> recordBList = List.of(
            recB1,
            recB2,
            recB3);
    var fluxOfB = Flux.fromIterable(recordBList);

    var latestCombo = Flux.combineLatest(
            fluxOfA,
            fluxOfB,
            (recordA, recordB) -> Tuples.of(recordA, recordB));

    StepVerifier.create(latestCombo)
            .expectNext(Tuples.of(recA1, recB1))
            .expectNext(Tuples.of(recA2, recB2))
            .expectNext(Tuples.of(recA3, recB3))
            .verifyComplete();
}

So I am wondering what did I do wrong that in the first test I never get the expected pairs?

Leave a Comment