Je travaille sur une application Spring Batch qui lira les données non traitées du tableau A, traitera les données, insérera les données traitées dans le tableau B, puis mettra à jour la ligne du tableau A sur PROCESSED. Cependant, bien que l'insertion des données dans la table B fonctionne correctement, je continue à recevoir une exception DeadlockLoserDataAccessException chaque fois que j'essaye de mettre à jour la table A. Je pense que cela est dû au curseur du JDBCCursorItemReader qui a été utilisé pour lire la table A est en train de gêner Mise à jour de la table. Comment pourrais-je résoudre ce problème?

J'utilise un JDBCCursorItemReader et un CompositeItemWriter dans le Spring Batch. La taille du morceau est 1.

0
Karson074 20 nov. 2018 à 23:42

3 réponses

Meilleure réponse

Je pense que cela est dû au fait que le curseur du JDBCCursorItemReader qui a été utilisé pour lire la table A empêche la mise à jour de la table. Comment pourrais-je résoudre ce problème?

Cela ne devrait pas poser de problème si la lecture, l'insertion et la mise à jour se trouvent dans la même transaction (ce qui est le cas lorsque vous utilisez une étape orientée bloc).

J'utilise un JDBCCursorItemReader et un CompositeItemWriter dans le Spring Batch. La taille du morceau est 1.

Voici un exemple rapide (autonome) avec la même configuration que celle que vous avez mentionnée:

import java.util.Arrays;
import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public JdbcCursorItemReader<Person> itemReader() {
        return new JdbcCursorItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .sql("select id, name from person where processed = false")
                .beanRowMapper(Person.class)
                .saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
                .build();
    }

    @Bean
    public ItemProcessor<Person, Person> itemProcessor() {
        return item -> new Person(item.getId(), item.getName().toUpperCase());
    }

    @Bean
    public CompositeItemWriter<Person> itemWriter() {
        return new CompositeItemWriterBuilder<Person>()
                .delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
                .ignoreItemStream(true)
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Person> peopleItemWriter() {
        return new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource())
                .beanMapped()
                .sql("insert into people (name) values (:name)")
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Person> personItemUpdater() {
        return new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource())
                .beanMapped()
                .sql("update person set processed = true where id = :id")
                .build();
    }

    @Bean
    public Step step() {
        return steps.get("step")
                .<Person, Person>chunk(1)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step())
                .build();
    }

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    public static void main(String[] args) throws Exception {

        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
        jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
        jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
        jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
        jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");

        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());

        Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
        Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
        System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
        System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
        Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
        System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
    }

    public static class Person {

        private int id;

        private String name;

        public Person() {
        }

        public Person(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

}

Il lit les personnes dans une table Person (TableA dans votre cas), met leur nom en majuscule et écrit le résultat dans une table People (TableB dans votre cas). Ensuite, il met à jour l'indicateur processed sur la table Person.

Si vous exécutez l'exemple, vous devriez voir:

nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2

Sans aucune exception de verrouillage.

J'espère que cela t'aides.

0
Mahmoud Ben Hassine 21 nov. 2018 à 10:09

L'architecture est ETL comme lire des données à partir d'une source, les traiter et les écrire sur une cible. J'essaie d'éviter cette logique de mise à jour dans mes processus car elle introduit une surcharge importante et les problèmes que vous avez décrits. Alors peut-être pourriez-vous repenser l'architecture ...

Sinon, je recommande vraiment d'avoir un index approprié pour la mise à jour - en fonction de la condition de recherche que vous utilisez. Cela rendra la mise à jour non seulement moins chère, mais le SQL n'aura qu'à accéder à une seule ligne - évitant ainsi des analyses de table supplémentaires pour la mise à jour.

-1
MichaelTiefenbacher 20 nov. 2018 à 20:55

Je vous suggère de reconcevoir la logique de transaction pour «verrouiller» les lignes nécessaires de TABLEA en les marquant comme «TRAITÉES» au tout début de votre transaction, et ne pas les mettre à jour une fois de plus à la fin de la transaction. Voir l'exemple ci-dessous.

-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.

create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys; 
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;

-- Every session starts its transaction with locking its own set of rows (only one in the example), 
-- which becomes invisible for the same statement issued by other concurrent transactions 
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
0
Mark Barinstein 21 nov. 2018 à 14:33