Background :
I am upgrading my spring boot project from 2.1 to 2.7+ to avoid security vulnerabilities and existing project used traditional way of creating spring bean object for database and transaction manager. Atomikos was older version and it was working fine with distributed transaction in parallel mode.
In 2.7+ we thought to use spring’s suggested approach to configure atomikos and I configured data source configuration like below for all 3 MSSQL connections.
build.gradle dependency :
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation group: 'com.atomikos', name: 'transactions-spring-boot-starter', version: '5.0.9'
implementation group: 'com.atomikos', name: 'transactions-hibernate4', version: '5.0.9'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-mail'
implementation 'org.springframework:spring-context'
implementation 'org.springframework:spring-aop'
implementation 'org.springframework:spring-orm'
implementation 'org.springframework:spring-tx'
application.properties –
Note d1 , d2, d3 are different database just a dummy name
spring.jta.atomikos.datasource.d1.xaDataSourceClassName=com.microsoft.sqlserver.jdbc.SQLServerXADataSource
spring.jta.atomikos.datasource.d1.uniqueResourceName=d1-database
spring.jta.atomikos.datasource.d1.xaProperties.URL=xxxxxx
spring.jta.atomikos.datasource.d1.xaProperties.user=xxxx
spring.jta.atomikos.datasource.d1.xaProperties.password=xxxx
spring.jta.atomikos.datasource.d1.max-pool-size=10
spring.jta.atomikos.datasource.d1.pool-size=8
spring.jta.atomikos.datasource.d2.xaDataSourceClassName=com.microsoft.sqlserver.jdbc.SQLServerXADataSource
spring.jta.atomikos.datasource.d2.uniqueResourceName=d2-database
spring.jta.atomikos.datasource.d2.xaProperties.URL=xxxxxx
spring.jta.atomikos.datasource.d2.xaProperties.user=xxxx
spring.jta.atomikos.datasource.d2.xaProperties.password=xxxx
spring.jta.atomikos.datasource.d2.max-pool-size=10
spring.jta.atomikos.datasource.d2.pool-size=8
spring.jta.atomikos.datasource.xa-properties.AtomikosJtaPlatform.transactionManager=atomikosTransactionManager
jta.properties on classpath
com.atomikos.icatch.output_dir=/home/dnom-java/atomikos/
com.atomikos.icatch.log_base_dir=/home/dnom-java/atomikos/
com.atomikos.icatch.log_base_name=dental-policy-diary-atomikos
atomikos.properties.max-timeout=600000
com.atomikos.icatch.max_timeout=600000
Below is my java code configuration
public class AtomikosJtaPlatform extends AbstractJtaPlatform {
private static final long serialVersionUID = 1L;
static TransactionManager transactionManager;
static UserTransaction transaction;
@Override
protected TransactionManager locateTransactionManager() {
return transactionManager;
}
@Override
protected UserTransaction locateUserTransaction() {
return transaction;
}
}
AtmokiosConfig
@Configuration
@EnableTransactionManagement
public class AtomikosConfig {
@Bean(name = "txService", initMethod = "init", destroyMethod = "shutdownWait")
public UserTransactionService userTransactionService() {
return new UserTransactionServiceImp();
}
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(600000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
AtomikosJtaPlatform.transactionManager = userTransactionManager;
return userTransactionManager;
}
@Bean(name = "atomikosTxMgr")
@Primary
@DependsOn({ "userTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager( ) throws Throwable {
UserTransaction userTransaction = userTransaction();
AtomikosJtaPlatform.transaction = userTransaction;
TransactionManager atomikosTransactionManager = atomikosTransactionManager();
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
@Bean
PersistenceAnnotationBeanPostProcessor persistenceAnnotationBeanPostProcessor() {
return new PersistenceAnnotationBeanPostProcessor();
}
}
Here is my datasource configuration and it is same for d1, d2, d3. They all are similar just different config property
@Configuration
@DependsOn("atomikosTxMgr")
@EnableJpaRepositories(entityManagerFactoryRef = "d1DSFactory", transactionManagerRef = "atomikosTxMgr")
public class D1DataSourceConfig {
// Atomikos not accepting default spring jpa properties. These are work around using same default properties.
@Value("${spring.jpa.show-sql}")
String showSql;
@Value("${spring.jpa.properties.hibernate.format_sql}")
String formatSql;
@Bean
@ConfigurationProperties("spring.jta.atomikos.datasource.d1")
@Primary
public DataSource d1DS() {
return new AtomikosDataSourceBean();
}
@Bean(name = "d1DSFactory")
@DependsOn("atomikosTxMgr")
@Primary
public LocalContainerEntityManagerFactoryBean drsDSFactory(DataSource d1DS) {
LocalContainerEntityManagerFactoryBean emf = new LocalContainerEntityManagerFactoryBean();
emf.setPersistenceUnitName("D1");
emf.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
emf.setPackagesToScan("com.xyz.pqr.entity");
emf.setJtaDataSource(d1DS);
emf.setJpaPropertyMap(getJpaProperties());
return emf;
}
private Map<String, Object> getJpaProperties() {
Map<String, Object> props = new HashMap<>();
props.put("hibernate.dialect", "org.hibernate.dialect.SQLServerDialect");
props.put("hibernate.current_session_context_class", "jta");
props.put("javax.persistence.transactionType", "jta");
props.put("hibernate.show_sql", showSql);
props.put("hibernate.format_sql", formatSql);
props.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
props.put("hibernate.search.autoregister_listeners", false);
props.put("hibernate.connection.release_mode", "after_transaction");
return props;
}
}
Service class
@Service
public class CustomEventService {
@PersistenceContext(unitName="d1-database")
private EntityManager d1EntityManager;
@PersistenceContext(unitName="d2-database")
private EntityManager d2EntityManager;
@PersistenceContext(unitName="d3-database")
private EntityManager d3EntityManager;
@Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW)
public void processIssueEvent(CustomEvent event) {
TypedQuery<CustomChangeEvent> query = d1EntityManager.createNamedQuery("getUnprocessedChangeEvents",
CustomChangeEvent.class);
query.setParameter("company", company);
query.setParameter("product", product)
List<CustomChangeEvent> unprocessedChangeEvents = query.getResultList();
}
}
// more methods having call to d2, and d3 databse.
when I start my application below it property that gets printed on console
USING: com.atomikos.icatch.tm_unique_name = 127.0.0.1.tm
USING: com.atomikos.icatch.default_jta_timeout = 10000
USING: com.atomikos.icatch.serial_jta_transactions = true
USING: java.naming.factory.initial = com.sun.jndi.rmi.registry.RegistryContextFactory
USING: com.atomikos.icatch.oltp_retry_interval = 10000
USING: java.naming.provider.url = rmi://localhost:1099
USING: com.atomikos.icatch.checkpoint_interval = 500
USING: com.atomikos.icatch.client_demarcation = false
USING: com.atomikos.icatch.trust_client_tm = false
USING: com.atomikos.icatch.force_shutdown_on_vm_exit = false
USING: com.atomikos.icatch.rmi_export_class = none
USING: com.atomikos.icatch.enable_logging = true
USING: atomikos.properties.max-timeout = 600000
USING: com.atomikos.icatch.max_timeout = 600000
USING: com.atomikos.icatch.oltp_max_retries = 5
USING: com.atomikos.icatch.log_base_dir = /home/dnom-java/atomikos/
USING: com.atomikos.icatch.allow_subtransactions = true
USING: com.atomikos.icatch.automatic_resource_registration = true
USING: com.atomikos.icatch.log_base_name = dental-policy-diary-atomikos
USING: com.atomikos.icatch.default_max_wait_time_on_shutdown = 9223372036854775807
USING: com.atomikos.icatch.forget_orphaned_log_entries_delay = 86400000
USING: com.atomikos.icatch.output_dir = /home/dnom-java/atomikos/
USING: com.atomikos.icatch.threaded_2pc = false
USING: com.atomikos.icatch.recovery_delay = 10000
USING: com.atomikos.icatch.max_actives = 50
Problem:
When I run below code into a different service class with parallel()
then it throws error. However without parallel()
Everything just works fine. We need to bring parallel()
processing either using parallel()
or spring @Async
at job is taking outs to finish it is is sequential.
Tried multiple approaches but none of them worked.
unprocessedEvents.stream().parallel().forEach(unprocessedEvent -> customEventService.processIssueEvent(unprocessedEvent ));
Error: when I added @Async
or parallel()
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[na:na]
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[na:na]
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[na:na]
Caused by: org.hibernate.exception.SQLGrammarException: could not execute query
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:103) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:37) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.loader.Loader.doList(Loader.java:2871) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.loader.Loader.doList(Loader.java:2850) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.loader.Loader.listIgnoreQueryCache(Loader.java:2682) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.loader.Loader.list(Loader.java:2677) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.loader.hql.QueryLoader.list(QueryLoader.java:540) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.hql.internal.ast.QueryTranslatorImpl.list(QueryTranslatorImpl.java:400) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.engine.query.spi.HQLQueryPlan.performList(HQLQueryPlan.java:219) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.internal.SessionImpl.list(SessionImpl.java:1459) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.query.internal.AbstractProducedQuery.doList(AbstractProducedQuery.java:1649) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.query.internal.AbstractProducedQuery.list(AbstractProducedQuery.java:1617) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at org.hibernate.query.Query.getResultList(Query.java:165) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
at com.pm.policydiaries.jobs.service.dao.impl.InsproDaoImpl.getActivePolicyIssueDentalPolicy(InsproDaoImpl.java:166) ~[main/:na]
at com.pm.policydiaries.jobs.service.dao.impl.InsproDaoImpl$$FastClassBySpringCGLIB$$9524048f.invoke(<generated>) ~[main/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.23.jar:5.3.23]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793) ~[spring-aop-5.3.23.jar:5.3.23]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.23.jar:5.3.23]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.23.jar:5.3.23]
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137) ~[spring-tx-5.3.23.jar:5.3.23]
... 30 common frames omitted
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The Microsoft Distributed Transaction Coordinator (MS DTC) has cancelled the distributed transaction.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) ~[mssql-jdbc-8.2.0.jre11.jar:na]
at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5431) ~[mssql-jdbc-8.2.0.jre11.jar:na]
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1770) ~[mssql-jdbc-8.2.0.jre11.jar:na]
at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1028) ~[mssql-jdbc-8.2.0.jre11.jar:na]
at org.hibernate.loader.Loader.getRowsFromResultSet(Loader.java:1043) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
I tried enabling @Async
and customer thread of 5 thread run the service method of @Async
on top of it. That also gave me the same error.
Any help is appreciated.