Atomikos integration with spring parallel processing throwing error

Background :

I am upgrading my spring boot project from 2.1 to 2.7+ to avoid security vulnerabilities and existing project used traditional way of creating spring bean object for database and transaction manager. Atomikos was older version and it was working fine with distributed transaction in parallel mode.

In 2.7+ we thought to use spring’s suggested approach to configure atomikos and I configured data source configuration like below for all 3 MSSQL connections.

build.gradle dependency :

    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation group: 'com.atomikos', name: 'transactions-spring-boot-starter', version: '5.0.9'
    implementation group: 'com.atomikos', name: 'transactions-hibernate4', version: '5.0.9'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-mail'
    implementation 'org.springframework:spring-context'
    implementation 'org.springframework:spring-aop'
    implementation 'org.springframework:spring-orm'
    implementation 'org.springframework:spring-tx'

application.properties –

Note d1 , d2, d3 are different database just a dummy name 
spring.jta.atomikos.datasource.d1.xaDataSourceClassName=com.microsoft.sqlserver.jdbc.SQLServerXADataSource
spring.jta.atomikos.datasource.d1.uniqueResourceName=d1-database
spring.jta.atomikos.datasource.d1.xaProperties.URL=xxxxxx
spring.jta.atomikos.datasource.d1.xaProperties.user=xxxx
spring.jta.atomikos.datasource.d1.xaProperties.password=xxxx
spring.jta.atomikos.datasource.d1.max-pool-size=10
spring.jta.atomikos.datasource.d1.pool-size=8

spring.jta.atomikos.datasource.d2.xaDataSourceClassName=com.microsoft.sqlserver.jdbc.SQLServerXADataSource
spring.jta.atomikos.datasource.d2.uniqueResourceName=d2-database
spring.jta.atomikos.datasource.d2.xaProperties.URL=xxxxxx
spring.jta.atomikos.datasource.d2.xaProperties.user=xxxx
spring.jta.atomikos.datasource.d2.xaProperties.password=xxxx
spring.jta.atomikos.datasource.d2.max-pool-size=10
spring.jta.atomikos.datasource.d2.pool-size=8

spring.jta.atomikos.datasource.xa-properties.AtomikosJtaPlatform.transactionManager=atomikosTransactionManager

jta.properties on classpath

com.atomikos.icatch.output_dir=/home/dnom-java/atomikos/ 
com.atomikos.icatch.log_base_dir=/home/dnom-java/atomikos/
com.atomikos.icatch.log_base_name=dental-policy-diary-atomikos
atomikos.properties.max-timeout=600000
com.atomikos.icatch.max_timeout=600000

Below is my java code configuration

public class AtomikosJtaPlatform extends AbstractJtaPlatform {

    private static final long serialVersionUID = 1L;

    static TransactionManager transactionManager;
    static UserTransaction transaction;

    @Override
    protected TransactionManager locateTransactionManager() {
        return transactionManager;
    }

    @Override
    protected UserTransaction locateUserTransaction() {
        return transaction;
    }
}

AtmokiosConfig

@Configuration
@EnableTransactionManagement
public class AtomikosConfig {
    
    @Bean(name = "txService", initMethod = "init", destroyMethod = "shutdownWait")
    public UserTransactionService userTransactionService() {
        return new UserTransactionServiceImp();
    }

    @Bean(name = "userTransaction")
    public UserTransaction userTransaction() throws Throwable {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(600000);
        return userTransactionImp;
    }

    @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
    public TransactionManager atomikosTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        AtomikosJtaPlatform.transactionManager = userTransactionManager;
        return userTransactionManager;
    }

    @Bean(name = "atomikosTxMgr")
    @Primary
    @DependsOn({ "userTransaction", "atomikosTransactionManager" })
    public PlatformTransactionManager transactionManager( ) throws Throwable {
        UserTransaction userTransaction = userTransaction();
        AtomikosJtaPlatform.transaction = userTransaction;
        TransactionManager atomikosTransactionManager = atomikosTransactionManager();
        return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
    }
    
    @Bean
    PersistenceAnnotationBeanPostProcessor persistenceAnnotationBeanPostProcessor() {
        return new PersistenceAnnotationBeanPostProcessor();
    }

}

Here is my datasource configuration and it is same for d1, d2, d3. They all are similar just different config property

@Configuration
@DependsOn("atomikosTxMgr")
@EnableJpaRepositories(entityManagerFactoryRef = "d1DSFactory", transactionManagerRef = "atomikosTxMgr")
public class D1DataSourceConfig {

    // Atomikos not accepting default spring jpa properties. These are work around using same default properties. 
    @Value("${spring.jpa.show-sql}")
    String showSql;

    @Value("${spring.jpa.properties.hibernate.format_sql}")
    String formatSql;

    @Bean
    @ConfigurationProperties("spring.jta.atomikos.datasource.d1")
    @Primary
    public DataSource d1DS() {
        return new AtomikosDataSourceBean();
    }

    @Bean(name = "d1DSFactory")
    @DependsOn("atomikosTxMgr")
    @Primary
    public LocalContainerEntityManagerFactoryBean drsDSFactory(DataSource d1DS) {
        LocalContainerEntityManagerFactoryBean emf = new LocalContainerEntityManagerFactoryBean();
        emf.setPersistenceUnitName("D1");
        emf.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
        emf.setPackagesToScan("com.xyz.pqr.entity");
        emf.setJtaDataSource(d1DS);
        emf.setJpaPropertyMap(getJpaProperties());
        return emf;
    }

    private Map<String, Object> getJpaProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put("hibernate.dialect", "org.hibernate.dialect.SQLServerDialect");
        props.put("hibernate.current_session_context_class", "jta");
        props.put("javax.persistence.transactionType", "jta");
                props.put("hibernate.show_sql", showSql);
        props.put("hibernate.format_sql", formatSql);
        props.put("hibernate.transaction.jta.platform",  AtomikosJtaPlatform.class.getName());
        props.put("hibernate.search.autoregister_listeners", false);
        props.put("hibernate.connection.release_mode", "after_transaction");
        return props;
    }
}

Service class


@Service
public class CustomEventService { 
@PersistenceContext(unitName="d1-database")
    private EntityManager d1EntityManager;
@PersistenceContext(unitName="d2-database")
    private EntityManager d2EntityManager;
@PersistenceContext(unitName="d3-database")
    private EntityManager d3EntityManager;

@Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW)
public void processIssueEvent(CustomEvent event) {
TypedQuery<CustomChangeEvent> query = d1EntityManager.createNamedQuery("getUnprocessedChangeEvents",
                CustomChangeEvent.class);
        query.setParameter("company", company);
        query.setParameter("product", product)
        List<CustomChangeEvent> unprocessedChangeEvents = query.getResultList();
}
}
        // more methods having call to d2, and d3 databse. 

when I start my application below it property that gets printed on console


USING: com.atomikos.icatch.tm_unique_name = 127.0.0.1.tm
USING: com.atomikos.icatch.default_jta_timeout = 10000
USING: com.atomikos.icatch.serial_jta_transactions = true
USING: java.naming.factory.initial = com.sun.jndi.rmi.registry.RegistryContextFactory
USING: com.atomikos.icatch.oltp_retry_interval = 10000
USING: java.naming.provider.url = rmi://localhost:1099
USING: com.atomikos.icatch.checkpoint_interval = 500
USING: com.atomikos.icatch.client_demarcation = false
USING: com.atomikos.icatch.trust_client_tm = false
USING: com.atomikos.icatch.force_shutdown_on_vm_exit = false
USING: com.atomikos.icatch.rmi_export_class = none
USING: com.atomikos.icatch.enable_logging = true
USING: atomikos.properties.max-timeout = 600000
USING: com.atomikos.icatch.max_timeout = 600000
USING: com.atomikos.icatch.oltp_max_retries = 5
USING: com.atomikos.icatch.log_base_dir = /home/dnom-java/atomikos/
USING: com.atomikos.icatch.allow_subtransactions = true
USING: com.atomikos.icatch.automatic_resource_registration = true
USING: com.atomikos.icatch.log_base_name = dental-policy-diary-atomikos
USING: com.atomikos.icatch.default_max_wait_time_on_shutdown = 9223372036854775807
USING: com.atomikos.icatch.forget_orphaned_log_entries_delay = 86400000
USING: com.atomikos.icatch.output_dir = /home/dnom-java/atomikos/
USING: com.atomikos.icatch.threaded_2pc = false
USING: com.atomikos.icatch.recovery_delay = 10000
USING: com.atomikos.icatch.max_actives = 50

Problem:
When I run below code into a different service class with parallel() then it throws error. However without parallel() Everything just works fine. We need to bring parallel() processing either using parallel() or spring @Async at job is taking outs to finish it is is sequential.
Tried multiple approaches but none of them worked.

unprocessedEvents.stream().parallel().forEach(unprocessedEvent -> customEventService.processIssueEvent(unprocessedEvent )); 

Error: when I added @Async or parallel()

    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[na:na]
Caused by: org.hibernate.exception.SQLGrammarException: could not execute query
    at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:103) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:37) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.loader.Loader.doList(Loader.java:2871) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.loader.Loader.doList(Loader.java:2850) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.loader.Loader.listIgnoreQueryCache(Loader.java:2682) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.loader.Loader.list(Loader.java:2677) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.loader.hql.QueryLoader.list(QueryLoader.java:540) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.hql.internal.ast.QueryTranslatorImpl.list(QueryTranslatorImpl.java:400) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.engine.query.spi.HQLQueryPlan.performList(HQLQueryPlan.java:219) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.internal.SessionImpl.list(SessionImpl.java:1459) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.query.internal.AbstractProducedQuery.doList(AbstractProducedQuery.java:1649) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.query.internal.AbstractProducedQuery.list(AbstractProducedQuery.java:1617) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at org.hibernate.query.Query.getResultList(Query.java:165) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]
    at com.pm.policydiaries.jobs.service.dao.impl.InsproDaoImpl.getActivePolicyIssueDentalPolicy(InsproDaoImpl.java:166) ~[main/:na]
    at com.pm.policydiaries.jobs.service.dao.impl.InsproDaoImpl$$FastClassBySpringCGLIB$$9524048f.invoke(<generated>) ~[main/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.23.jar:5.3.23]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793) ~[spring-aop-5.3.23.jar:5.3.23]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.23.jar:5.3.23]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.23.jar:5.3.23]
    at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137) ~[spring-tx-5.3.23.jar:5.3.23]
    ... 30 common frames omitted
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The Microsoft Distributed Transaction Coordinator (MS DTC) has cancelled the distributed transaction.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) ~[mssql-jdbc-8.2.0.jre11.jar:na]
    at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5431) ~[mssql-jdbc-8.2.0.jre11.jar:na]
    at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1770) ~[mssql-jdbc-8.2.0.jre11.jar:na]
    at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1028) ~[mssql-jdbc-8.2.0.jre11.jar:na]
    at org.hibernate.loader.Loader.getRowsFromResultSet(Loader.java:1043) ~[hibernate-core-5.6.12.Final.jar:5.6.12.Final]

I tried enabling @Async and customer thread of 5 thread run the service method of @Async on top of it. That also gave me the same error.

Any help is appreciated.

Leave a Comment