How to process logically related rows after ItemReader in SpringBatch?
Asked Answered
G

5

23

Scenario

To make it simple, let's suppose I have an ItemReader that returns me 25 rows.

  1. The first 10 rows belong to student A

  2. The next 5 belong to student B

  3. and the 10 remaining belong to student C

I want to aggregate them together logically say by studentId and flatten them to end up with one row per student.

Problem

If I understand correctly, setting the commit interval to 5 will do the following:

  1. Send 5 rows to the Processor (which will aggregate them or do any business logic I tell it to).
  2. After Processed will write 5 rows.
  3. Then it will do it again for the next 5 rows and so on.

If that is true, then for the next five I will have to check the already written ones, get them out aggregate them to the ones that I am currently processing and write them again.

I personally do no like that.

  1. What is the best practice to handle a situation like this in Spring Batch?

Alternative

Sometimes I feel that it is much easier to write a regular Spring JDBC main program and then I have full control of what I want to do. However, I wanted to take advantage of of the job repository state monitoring of the job, ability to restart, skip, job and step listeners....

My Spring Batch Code

My module-context.xml

   <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <description>Example job to get you started. It provides a skeleton for a typical batch application.</description>

    <batch:job id="job1">
        <batch:step id="step1"  >           
            <batch:tasklet transaction-manager="transactionManager" start-limit="100" >             
                 <batch:chunk reader="attendanceItemReader"
                              processor="attendanceProcessor" 
                              writer="attendanceItemWriter" 
                              commit-interval="10" 
                 />

            </batch:tasklet>
        </batch:step>
    </batch:job> 

    <bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> 
        <property name="dataSource">
            <ref bean="sourceDataSource"/>
        </property> 
        <property name="sql"                                                    
                  value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = 'Active' and fas.LOCAL_GRADING_PERIOD = 'G1' and s.student_current_grade_level = 'Gr 9' order by s.student_id"/>
        <property name="preparedStatementSetter" ref="attendanceStatementSetter"/>           
        <property name="rowMapper" ref="attendanceRowMapper"/> 
    </bean> 

    <bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>

    <bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>

    <bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />  

    <bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> 
        <property name="resource" value="file:target/outputs/passthrough.txt"/> 
        <property name="lineAggregator"> 
            <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 
        </property> 
    </bean> 

</beans>

My supporting classes for the Reader.

A PreparedStatementSetter

package edu.kdc.visioncards.preparedstatements;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

public class AttendanceStatementSetter implements PreparedStatementSetter {

    public void setValues(PreparedStatement ps) throws SQLException {

        ps.setInt(1, 7);

    }

}

and a RowMapper

package edu.kdc.visioncards.rowmapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {

    public static final String STUDENT_NAME = "STUDENT_NAME";
    public static final String STUDENT_ID = "STUDENT_ID";
    public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
    public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";

    public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {

        AttendanceDTO dto = new AttendanceDTO();
        dto.setStudentId(rs.getString(STUDENT_ID));
        dto.setStudentName(rs.getString(STUDENT_NAME));
        dto.setAttDays(rs.getInt(ATTENDANCE_DAYS));
        dto.setAttValue(rs.getInt(ATTENDANCE_VALUE));

        return dto;
    }
}

My processor

package edu.kdc.visioncards;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {

    private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();

    public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {

        if(map.containsKey(new Integer(dto.getStudentId()))){

            AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
            attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
            attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());

        }else{
            map.put(new Integer(dto.getStudentId()), dto);
        }
        return map;
    }

}

My concerns from code above

In the Processor, I create a HashMap and as I process the rows I check whether I already have that Student in the Map, if it's not there I add it. If it's already there I grab the it get the values that I am interested in and add them with the row that I am currently processing.

After that, Spring Batch Framework writes to a File according to my configuration

My question is as follows:

  1. I do not want it to go to the writer. I want to process all the remaining rows. How do I keep this Map that I have created in memory for the next set of rows that need to go through this same Processor? Everytime, a row is processed through AttendanceProcessor the Map is initialized. Should I put the Map initialization in a static block?
Grouse answered 12/1, 2012 at 15:22 Comment(0)
P
7

In my application I created a CollectingJdbcCursorItemReader that extends the standard JdbcCursorItemReader and performs exactly what you need. Internally it uses my CollectingRowMapper: an extension of the standard RowMapper that maps multiple related rows to one object.

Here is the code of the ItemReader, the code of CollectingRowMapper interface, and an abstract implementation of it, is available in another answer of mine.

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.jdbc.core.RowMapper;

/**
 * A JdbcCursorItemReader that uses a {@link CollectingRowMapper}.
 * Like the superclass this reader is not thread-safe.
 * 
 * @author Pino Navato
 **/
public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {

    private CollectingRowMapper<T> rowMapper;
    private boolean firstRead = true;


    /**
     * Accepts a {@link CollectingRowMapper} only.
     **/
    @Override
    public void setRowMapper(RowMapper<T> rowMapper) {
        this.rowMapper = (CollectingRowMapper<T>)rowMapper;
        super.setRowMapper(rowMapper);
     }


    /**
     * Read next row and map it to item.
     **/
    @Override
    protected T doRead() throws Exception {
        if (rs == null) {
            throw new ReaderNotOpenException("Reader must be open before it can be read.");
        }

        try {
            if (firstRead) {
                if (!rs.next()) {  //Subsequent calls to next() will be executed by rowMapper
                    return null;
                }
                firstRead = false;
            } else if (!rowMapper.hasNext()) {
                return null;
            }
            T item = readCursor(rs, getCurrentItemCount());
            return item;
        }
        catch (SQLException se) {
            throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
        }
    }

    @Override
    protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
        T result = super.readCursor(rs, currentRow);
        setCurrentItemCount(rs.getRow());
        return result;
    }

}

You can use it just like the classic JdbcCursorItemReader: the only requirement is that you provide it a CollectingRowMapper instead of the classic RowMapper.

Pump answered 21/9, 2017 at 14:21 Comment(5)
Clever solution. I ended up using this for my spring batch app.Nickola
@Pump I've some doubts regarding the above solution. How can I get in touch with you?Unitary
@hbamithkumara, only through this comments section.Pump
Fine.. How can I process the entire chunk at once? Basically I need something like this MyItemProcessor<List<ObjectDao>, List<ObjectDao>> and MyItemWriter<List<ObjectDao> Is this possible?Unitary
@hbamithkumara, do you mean how to process all input data at once? Spring-batch works in chunks because, generally speaking, loading all input data could take too much memory. Furthermore, in Spring-batch design, ItemProcessor and ItemWriter work on single beans loaded by ItemReader, not on DAOs. In my solution each bean contains a list of logically-related items but they are single beans as required by Spring-batch. I think that reading a good tutorial of Spring-batch could help you.Pump
Y
2

I always follow this pattern:

  1. I make my reader scope to be "step", and in @PostConstruct I fetch the results, and put them in a Map
  2. In processor, I convert the associatedCollection into writable list, and send the writable list
  3. In ItemWriter, I persist the writable item(s) depending on the case
Yokum answered 22/7, 2013 at 13:5 Comment(0)
M
1

because you changed your question i add a new answer

if the students are ordered then there is no need for list/map, you could use exactly one studentObject on the processor to keep the "current" and aggregate on it until there is a new one (read: id change)

if the students are not ordered you will never know when a specific student is "finished" and you'd have to keep all students in a map which can't be written until the end of the complete read sequence

beware:

  • the processor needs to know when the reader is exhausted
  • its hard to get it working with any commit-rate and "id" concept if you aggregate items that are somehow identical the processor just can't know if the currently processed item is the last one
  • basically the usecase is either solved at reader level completely or at writer level (see other answer)
private SimpleItem currentItem;
private StepExecution stepExecution;

@Override
public SimpleItem process(SimpleItem newItem) throws Exception {
    SimpleItem returnItem = null;

    if (currentItem == null) {
        currentItem = new SimpleItem(newItem.getId(), newItem.getValue());
    } else if (currentItem.getId() == newItem.getId()) {
        // aggregate somehow
        String value = currentItem.getValue() + newItem.getValue();
        currentItem.setValue(value);
    } else {
        // "clone"/copy currentItem
        returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
        // replace currentItem
        currentItem = newItem;
    }

    // reader exhausted?
    if(stepExecution.getExecutionContext().containsKey("readerExhausted")
            && (Boolean)stepExecution.getExecutionContext().get("readerExhausted")
            && currentItem.getId() == stepExecution.getExecutionContext().getInt("lastItemId")) {
        returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
    }

    return returnItem;
}
Marillin answered 17/1, 2012 at 14:54 Comment(2)
where do you get "readerExhausted" from? Is that a key from SpringBatch? I do not find it anywhere in the docs. Also does StepExecution need a set or SpringBatch just picks it up the way you have it above?Grouse
no you'd have to implement a reader by yourself wich does that, for working with a StepExecutionListener see static.springsource.org/spring-batch/reference/html/…Marillin
M
0

basically you talk about batch processing with changing IDs(1), where the batch has to keep track of the change

for spring/spring-batch we talk about:

  • ItemWriter which checks the list of items for an id change
  • before the change the items are stored in a temporary datastore(2) (List, Map, whatever), and are not written out
  • when the id changes, the aggregating/flattening business code runs on the items in the datastore and one item should be written, now the datastore can be used for the next items with the next id
  • this concept needs a reader which tells the step "i'm exhausted" to properly flush the temporary datastore on end of items (file/database)

here a rough and simple code example

@Override
public void write(List<? extends SimpleItem> items) throws Exception {

    // setup with first sharedId at startup
    if (currentId == null){
        currentId = items.get(0).getSharedId();
    }

    // check for change of sharedId in input
    // keep items in temporary dataStore until id change of input
    // call delegate if there is an id change or if the reader is exhausted
    for (SimpleItem item : items) {
        // already known sharedId, add to tempData
        if (item.getSharedId() == currentId) {
            tempData.add(item);
        } else {
            // or new sharedId, write tempData, empty it, keep new id
            // the delegate does the flattening/aggregating
            delegate.write(tempData);
            tempData.clear();
            currentId = item.getSharedId();
            tempData.add(item);
        }
    }

    // check if reader is exhausted, flush tempData
    if ((Boolean) stepExecution.getExecutionContext().get("readerExhausted")
            && tempData.size() > 0) {
        delegate.write(tempData);
        // optional delegate.clear(); 
    }
}

(1)assuming the items are ordered by an ID (can be composite too)

(2)a hashmap spring bean for thread safety

Marillin answered 12/1, 2012 at 23:40 Comment(2)
so in the processor is where I do you 2nd and 3rd bullet points. What I am not understanding is that when that logic has completed, it will go to a Writer right? In that Writer I do not want to write to any db or file because the next group of rows might still belong to an item in the list that its in memory. How can I keep that list of objects (temporary datastore) that I have created during processing step still accessible when going through the processing step again for the next 5 rows?Grouse
Thanks for the help, I have added my source code and a question after the source code see if it adds clarification to my concernGrouse
C
0

Use Step Execution Listener and store the records as map to the StepExecutionContext , you can then group them in the writer or writer listener and write it at a time

Chanteuse answered 24/2, 2016 at 5:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.