When I create an IntegrationFlow setting the sendTimeout
value on SourcePollingChannelAdapterSpec
is being ignored causing the polling thread to block. This does not happen if I create a SourcePollingChannelAdapter
programmatically. For example, programmatically with a @Configuration
class:
@Configuration
public class JdbcPollingConfig {
@Bean
protected MessageChannel jdbcInboundChannel() {
return new QueueChannel(1);
}
@Bean
public ThreadPoolTaskScheduler jdbcTaskExecutor() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(1);
taskScheduler.initialize();
return taskScheduler;
}
@Bean
protected JdbcPollingChannelAdapter jdbcPollingChannelAdapter(final DataSource dataSource) {
final JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
return adapter;
}
@Bean
protected SourcePollingChannelAdapter sourcePollingChannelAdapter(
final JdbcPollingChannelAdapter adapter,
final MessageChannel jdbcInboundChannel,
final ThreadPoolTaskScheduler jdbcTaskExecutor) {
final SourcePollingChannelAdapter spcAdapter = new SourcePollingChannelAdapter();
spcAdapter.setSource(adapter);
spcAdapter.setOutputChannel(jdbcInboundChannel);
spcAdapter.setSendTimeout(500); // Sets the send-to-channel timeout - throws exception on timeout though.
spcAdapter.setTaskScheduler(jdbcTaskExecutor);
spcAdapter.setTrigger( new PeriodicTrigger(Duration.ofSeconds(2L)));
return spcAdapter;
}
}
With the channel queue as configured, this configuration throws an exception as anticipated because there is no consumer of the channel jdbcInboundChannel
. Contrast this using an IntegrationFlow
configuration:
@Configuration
public class JdbcDSLConfig {
@Bean
public QueueChannelSpec jdbcInboundChannel() {
return MessageChannels.queue(1);
}
@Bean
public MessageSource<Object> jdbcMessageSource(final DataSource dataSource) {
return new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
}
@Bean
public ThreadPoolTaskExecutor jdbcExecutor() {
final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(1);
taskExecutor.initialize();
return taskExecutor;
}
@Bean
public IntegrationFlow jdbcInboundFlow(final MessageSource<Object> jdbcMessageSource,
final QueueChannelSpec jdbcInboundChannel,
@Qualifier("jdbcExecutor") final ThreadPoolTaskExecutor jdbcExecutor) {
return IntegrationFlow.from(jdbcMessageSource,
c -> c.poller(Pollers
.fixedDelay(2000)
.sendTimeout(1) // this has no effect.
.taskExecutor(jdbcExecutor)))
.channel(jdbcInboundChannel)
.get();
}
}
This configuration blocks on the second polling of the database even with a sendTimeout set in the configuration.
Comment From: wilkinsona
The code in question as part of Spring Integration.