Context
Publishing an event is typically done on a per object basis. However, this makes the listener of an application event only process those events one by one. This may be an issue when performance matters, and it would be more beneficial to process a set of objects in batches rather than one after the other.
An example would be to have a batch-update of multiple objects for which all of the affected objects individually will be published as an application event.
Current Workaround
A quick hack that I came up with is to collect those events into a list and then process them in batches before the transaction commits:
static class FooListener {
// DB Repository that supports batch processing
private final FooRepository fooRepository;
// Dedicated TransactionSynchronization to keep track of collected events. Save all the events to the db through a batch
@RequiredArgsConstructor
static class FooTransactionSynchronization implements TransactionSynchronization {
private final FooRepository fooRepository;
private final List<Foo> foos = new ArrayList<>();
public void add(Foo foo) {
this.foos.add(foo);
}
@Override
public void beforeCommit(boolean readOnly) {
fooRepository.saveAll(foos);
}
}
// Either create a new Synchronization object for registration of use the existing one to collect an additional event
@EventListener
public void on(Foo foo) {
getTransactionSynchronization().ifPresentOrElse(
fooTransactionSynchronization -> fooTransactionSynchronization.add(foo),
() -> {
FooTransactionSynchronization fooTransactionSynchronization = new FooTransactionSynchronization(fooRepository);
fooTransactionSynchronization.add(foo);
TransactionSynchronizationManager.registerSynchronization(fooTransactionSynchronization);
});
}
// Try find the FooTransactionSynchronization from all currently registered transaction synchronizations
private Optional<FooTransactionSynchronization> getTransactionSynchronization() {
return TransactionSynchronizationManager.getSynchronizations().stream()
.filter(transactionSynchronization -> transactionSynchronization instanceof FooTransactionSynchronization)
.map(FooTransactionSynchronization.class::cast)
.findFirst();
}
}
Why I call it a hack is because it is coupled to the implicit knowledge of the producer, so that I can use an EventListener
for collection and then register a TransactionSynchronization
to have the objects processed in batches. Needless to say that dealing with synchronizations, thread-safety etc. is always tricky and should be outsourced to the framework.
Proposal (TBD)
Introduce an additional attribute in the @EventListener
and @TransactionEventListener
annotations such as batchSize
that would allow one to define the number of events that would be collected prior to the invocation of the consumer method. Of course some details would still be needed to be looked into and further discussed.
// This is only getting invoked after the hundreds time with a collection of 100 Foo objects
@EventListener(batchSize = 100)
void consume(List<Foo> foos) {
...
}
// Still works the same as before
@EventListener
void consume(Foo foo) {
...
}
Expected Benefits
Introducing the support for batch processing events would allow for an increase in performance when processing consumed events in certain circumstances.
That said, I would be open to creating a PR.
Comment From: snicoll
Thanks for the suggestion and we see how that can be useful for certain event architectures there, but this is not within scope of our core application event model.
Comment From: DreamStar92
How is this going now?
What are the plans for the future?
Will it be solved by other spring projects in the future, or will it be planned not to provide such a function and instead be implemented at the user application level?