I had a class that performed a creation of Flux from many Mono’s and then performed processing on that Flux. The first step in that process was
Flux.combineLatest(flux1, flux2, (obj1, obj2) -> Tuples.of(obj1, obj2)
.map(...)
.flatMap(...)
In the unit test where I was testing this class, since I was assembling a Flux in the first place, I wasn’t providing a Flux but a series of Mono’s.
But, this is how I was assembling the flux:
public Flux<RecordB> fluxOfRecordsB() {
return Flux.<Record>create(recordBFluxSink ->
this.<Record>fetchAndEmitRecord(recordBFluxSink,
recordProvider::getRecordFromB));
}
private void fetchAndEmitRecord(
FluxSink<Record> fluxSink,
Supplier<Mono<Record>> getterFunction) {
getterFunction.get()
.delayElement(java.time.Duration.ofMillis(1000))
.subscribeOn(Schedulers.single())
.subscribe(...);
}
so when I was testing I then supplied Mono’s like this:
when(recordProvider.getRecord()).thenAnswer(new Answer<Mono<Record>>() {
@Override
public Mono<RecordA> answer(InvocationOnMock invocationOnMock) throws Throwable {
return monoList.get(callCounter.getAndIncrement());
}
});
And this worked! But, then I thought, ok let’s refactor, so I took the Flux assembling to another class, so that the class that processes the Flux has less things it is doing. So that meant that in my unit test I would not be providing a series of Mono’s but a Flux!
In the above code you can see the line return monoList.get(callCounter.getAndIncrement());
where callCounter
is AtomicInteger that I increment every time so that I do get a different Mono, which is later verified by StepVerifier.
But onwards! After refactoring, I was going to “just” provide a Flux of same values that I previously supplied as Mono’s but that doesn’t work anymore!!
Here’s the test code:
@Test
void givenThis_whenThat_validateResult() {
// given
var recordA1 = RecordA.builder().id("1").build();
var recordA2 = RecordA.builder().id("2").build();
var recordA3 = RecordA.builder().id("3").build();
List<RecordA> recordAList = List.of(recA1);
var fluxOfA = Flux.fromIterable(recordAList);
var recordB1 = RecordB.builder().id("1").build();
var recordB2 = RecordB.builder().id("2").build();
var recordB3 = RecordB.builder().id("3").build();
List<RecordB> recordBList = List.of(recB1);
var fluxOfB = Flux.fromIterable(recordBList);
// when
when(dataStreamer.fluxOfRecordsA()).then(new Answer<Flux<RecordA>>() {
@Override
public Flux<RecordA> answer(InvocationOnMock invocationOnMock) throws Throwable {
return Flux.just(recA1);
}
});
// There is also identical code here as above but for fluxOfRecordsB
when(resultSender.sendResultToApi(any(ResultRecord.class))).thenReturn(Mono.empty());
recordProcessor.processRecords();
// then
verify(resultSender, times(1)).sendResultToApi(result1);
}
To explain but probably not needed, resultSender
is a mocked class that is interacted with from within the processRecords()
method. Below I will provide code for this method. Anyway, I also tried different approach to providing Flux, instead of .then
I tried .thenAnswer
and .thenReturn
. None worked, unit test is still not working. But it is also weird how is it not working.
When I provide a Flux of three records, for both fluxA and fluxB the test fails because I see that the Tuple that is passed down the reactor chain is such that the following tuples are processed:
Tuple(recordA3, recordB1)
Tuple(recordA3, recordB2)
Tuple(recordA3, recordB3)
and that is weird. At first I thought it is my mistake, maybe still is, because if I provide a Flux of 3 items, of course it will take the last as it is combineLatest
but when I did a separate test, like the one below, I really got Tuples I expected:
Tuple(recordA1, recordB1)
Tuple(recordA2, recordB2)
Tuple(recordA3, recordB3)
Here is this separate test below:
@Test
void testCombineLatest() {
// given
var recA1 = RecordA.builder().id("1").done(false).build();
var recA2 = RecordA.builder().id("2").done(false).build();
var recA3 = RecordA.builder().id("3").done(false).build();
List<RecordA> recordAList = List.of(
recA1,
recA2,
recA3);
var fluxOfA = Flux.fromIterable(recordAList);
var recB1 = RecordB.builder().id("1").done(false).build();
var recB2 = RecordB.builder().id("2").done(false).build();
var recB3 = RecordB.builder().id("3").done(false).build();
List<RecordB> recordBList = List.of(
recB1,
recB2,
recB3);
var fluxOfB = Flux.fromIterable(recordBList);
var latestCombo = Flux.combineLatest(
fluxOfA,
fluxOfB,
(recordA, recordB) -> Tuples.of(recordA, recordB));
StepVerifier.create(latestCombo)
.expectNext(Tuples.of(recA1, recB1))
.expectNext(Tuples.of(recA2, recB2))
.expectNext(Tuples.of(recA3, recB3))
.verifyComplete();
}
So I am wondering what did I do wrong that in the first test I never get the expected pairs?