找到你要的答案

Q:Fetch data from 20 related tables (through id), combine them to a json File and leverage spring batch for this

Q:从20个相关表获取数据(通过ID),将它们一个JSON文件和杠杆弹簧批量

I have a Person database in SQL Server with tables like address, license, relatives etc. about 20 of them. All the tables have id parameter that is unique per person. There are millions of records in these tables. I need to combine theses records of the person using their common id parameter, and convert to a json table file with some column name changes. This json file then gets pushed to kafka through a producer. If I can get the example with the kafka producer as item writer- fine, but real problem is understanding the strategy and specifics on how to utilize spring batch item reader, processor, and item writer to create the composite json file. This is my first Spring batch application so I am relatively new to this.

I am hoping for the suggestions on the implementation strategy using a composite reader or processor to use person id as the cursor, and query each table using the id for each table , convert the resulting records to json and aggregate it to a composite, relational json file with root table PersonData that feeds to kafka cluster.

Basically I have one data source, same database for the reader. I plan to use Person table to fetch id and other records unique for the person, and use id as the where clause for 19 other tables. convert each resultset from the table to json, and composite the json object at the end and write to kafka.

我有一个表像的地址,在SQL Server数据库许可人、亲属等约20人。所有表都有ID参数,每个人都是唯一的。这些表格中有数以百万计的记录。我需要结合使用共同的ID参数的人这些记录,并转换为一些列的名称变化的JSON表文件。这个JSON文件然后被推到卡夫卡的制作人。如果我能与卡夫卡生产项目的作家,好得到的例子,但真正的问题是了解战略和具体如何利用弹簧批项目的读者,处理器,和项目的作家创造的复合JSON文件。这是我的第一批春季应用程序,所以我比较新的。

我希望在实施策略使用复合器或处理器使用个人ID作为光标的建议,并使用ID每个表的每个表的查询,将结果记录到JSON和聚集到一个复合,关系JSON文件与根表PersonData喂卡夫卡集群。

基本上我有一个数据源,同样的数据库为读者。我计划使用人表获取ID和其他唯一的记录,并使用id作为WHERE子句的其他19个表。将每个结果集从表到JSON和复合JSON对象,最后写卡夫卡。

answer1: 回答1:

We had such an requirement in a project and solved it with the following approach.

  1. In Splitflow, that run parallel, we had a step for ever table that loaded the data of the table in the file, sorted by common id (this is optional, but it is easier for testing, if you have the data in files).

  2. Then we implemented our own "MergeReader". This mergereader had FlatFileItemReaders for every file/table (let's call them dataReaders). All these FlatFileItemReaders were wrapped with a SingleItemPeekableItemReader. The logic for the read method of the MergeReader is as follows:

    public MyContainerPerId read() {
    
       // you need a container to store the items, that belong together
       MyContainerPerId container = new MyContainerPerId();
    
       //  peek through all "dataReaders" to find the lowest actual key
       int lowestId = searchLowestKey();
    
       for (Reader dataReader : dataReaders) {
           // I assume, that more than one entry in a table can belong to
           // the same person id
           wihile (dataReader.peek().getId() == lowestId) {
           {
                 container.add(dataReader.read());
           }
       }
    
       // the container contains all entries from all tables
       // belonging to the same person id    
       return container;
    }
    

If you need restart capability, you have implement ItemStream in a way, that it keeps track of the current readposition for every dataReader.

我们有这样的要求在一个项目,并解决了它与以下方法。

  1. 在Splitflow,平行运行,我们有一个永远的表,装表的数据文件中的步骤,用普通ID排序(这是可选的,但是如果你在文件中的数据很容易测试,)。

  2. Then we implemented our own "MergeReader". This mergereader had FlatFileItemReaders for every file/table (let's call them dataReaders). All these FlatFileItemReaders were wrapped with a SingleItemPeekableItemReader. The logic for the read method of the MergeReader is as follows:

    public MyContainerPerId read() {
    
       // you need a container to store the items, that belong together
       MyContainerPerId container = new MyContainerPerId();
    
       //  peek through all "dataReaders" to find the lowest actual key
       int lowestId = searchLowestKey();
    
       for (Reader dataReader : dataReaders) {
           // I assume, that more than one entry in a table can belong to
           // the same person id
           wihile (dataReader.peek().getId() == lowestId) {
           {
                 container.add(dataReader.read());
           }
       }
    
       // the container contains all entries from all tables
       // belonging to the same person id    
       return container;
    }
    

如果你需要重启的能力,你也就是实现itemstream,它跟踪电流的每一个数据readposition。

answer2: 回答2:

I used the Driving Query Based ItemReaders usage pattern described here to solve this issue.

Reader: just a default implementation of JdbcCursoritemReader with sql to fetch
the unique relational id (e.g. select id from person -)

Processor: Uses this long id as the input and a dao implemented by me using jdbcTemplate from spring fetches data through queries against each of the table for a specific id (e.g. select * from license where id=) and map results in list format to a POJO of Person - then convert to json object (using Jackson) and then to string

Writer: either write the file out with json string or publish json string to a topic in case of kafka use

我驾驶的itemreaders使用模式描述在这里解决这一问题的查询。

Reader: just a default implementation of JdbcCursoritemReader with sql to fetch
the unique relational id (e.g. select id from person -)

Processor: Uses this long id as the input and a dao implemented by me using jdbcTemplate from spring fetches data through queries against each of the table for a specific id (e.g. select * from license where id=) and map results in list format to a POJO of Person - then convert to json object (using Jackson) and then to string

Writer: either write the file out with json string or publish json string to a topic in case of kafka use

sql-server  json  join  spring-batch  apache-kafka