Skip to content

Commit d90a02d

Browse files
committed
Add football job sample with parallel steps
1 parent 02524d7 commit d90a02d

File tree

2 files changed

+179
-0
lines changed

2 files changed

+179
-0
lines changed
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.samples.football;
17+
18+
import javax.sql.DataSource;
19+
20+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
21+
import org.springframework.batch.core.configuration.annotation.EnableJdbcJobRepository;
22+
import org.springframework.batch.core.job.Job;
23+
import org.springframework.batch.core.job.builder.FlowBuilder;
24+
import org.springframework.batch.core.job.builder.JobBuilder;
25+
import org.springframework.batch.core.job.flow.Flow;
26+
import org.springframework.batch.core.job.flow.support.SimpleFlow;
27+
import org.springframework.batch.core.repository.JobRepository;
28+
import org.springframework.batch.core.step.Step;
29+
import org.springframework.batch.core.step.builder.StepBuilder;
30+
import org.springframework.batch.infrastructure.item.database.JdbcCursorItemReader;
31+
import org.springframework.batch.infrastructure.item.database.builder.JdbcCursorItemReaderBuilder;
32+
import org.springframework.batch.infrastructure.item.file.FlatFileItemReader;
33+
import org.springframework.batch.infrastructure.item.file.builder.FlatFileItemReaderBuilder;
34+
import org.springframework.batch.samples.common.DataSourceConfiguration;
35+
import org.springframework.batch.samples.football.internal.*;
36+
import org.springframework.context.annotation.Bean;
37+
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.context.annotation.Import;
39+
import org.springframework.core.io.ClassPathResource;
40+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
41+
import org.springframework.jdbc.support.JdbcTransactionManager;
42+
43+
@Configuration
44+
@EnableBatchProcessing
45+
@EnableJdbcJobRepository
46+
@Import(DataSourceConfiguration.class)
47+
public class FootballJobParallelStepsConfiguration {
48+
49+
// step 1 configuration
50+
51+
@Bean
52+
public FlatFileItemReader<Player> playerFileItemReader() {
53+
return new FlatFileItemReaderBuilder<Player>().name("playerFileItemReader")
54+
.resource(new ClassPathResource("org/springframework/batch/samples/football/data/player-small1.csv"))
55+
.delimited()
56+
.names("ID", "lastName", "firstName", "position", "birthYear", "debutYear")
57+
.fieldSetMapper(new PlayerFieldSetMapper())
58+
.build();
59+
}
60+
61+
@Bean
62+
public PlayerItemWriter playerWriter(DataSource dataSource) {
63+
PlayerItemWriter playerItemWriter = new PlayerItemWriter();
64+
JdbcPlayerDao playerDao = new JdbcPlayerDao();
65+
playerDao.setDataSource(dataSource);
66+
playerItemWriter.setPlayerDao(playerDao);
67+
return playerItemWriter;
68+
}
69+
70+
@Bean
71+
public Step playerLoad(JobRepository jobRepository, JdbcTransactionManager transactionManager,
72+
FlatFileItemReader<Player> playerFileItemReader, PlayerItemWriter playerWriter) {
73+
return new StepBuilder("playerLoad", jobRepository).<Player, Player>chunk(2)
74+
.transactionManager(transactionManager)
75+
.reader(playerFileItemReader)
76+
.writer(playerWriter)
77+
.build();
78+
}
79+
80+
// step 2 configuration
81+
82+
@Bean
83+
public FlatFileItemReader<Game> gameFileItemReader() {
84+
return new FlatFileItemReaderBuilder<Game>().name("gameFileItemReader")
85+
.resource(new ClassPathResource("org/springframework/batch/samples/football/data/games-small.csv"))
86+
.delimited()
87+
.names("id", "year", "team", "week", "opponent", "completes", "attempts", "passingYards", "passingTd",
88+
"interceptions", "rushes", "rushYards", "receptions", "receptionYards", "totalTd")
89+
.fieldSetMapper(new GameFieldSetMapper())
90+
.build();
91+
}
92+
93+
@Bean
94+
public JdbcGameDao gameWriter(DataSource dataSource) {
95+
JdbcGameDao jdbcGameDao = new JdbcGameDao();
96+
jdbcGameDao.setDataSource(dataSource);
97+
return jdbcGameDao;
98+
}
99+
100+
@Bean
101+
public Step gameLoad(JobRepository jobRepository, JdbcTransactionManager transactionManager,
102+
FlatFileItemReader<Game> gameFileItemReader, JdbcGameDao gameWriter) {
103+
return new StepBuilder("gameLoad", jobRepository).<Game, Game>chunk(2)
104+
.transactionManager(transactionManager)
105+
.reader(gameFileItemReader)
106+
.writer(gameWriter)
107+
.build();
108+
}
109+
110+
// step 3 configuration
111+
112+
@Bean
113+
public JdbcCursorItemReader<PlayerSummary> playerSummarizationSource(DataSource dataSource) {
114+
String sql = """
115+
SELECT GAMES.player_id, GAMES.year_no, SUM(COMPLETES),
116+
SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
117+
SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
118+
SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
119+
from GAMES, PLAYERS where PLAYERS.player_id =
120+
GAMES.player_id group by GAMES.player_id, GAMES.year_no
121+
""";
122+
return new JdbcCursorItemReaderBuilder<PlayerSummary>().name("playerSummarizationSource")
123+
.ignoreWarnings(true)
124+
.sql(sql)
125+
.dataSource(dataSource)
126+
.rowMapper(new PlayerSummaryMapper())
127+
.build();
128+
}
129+
130+
@Bean
131+
public JdbcPlayerSummaryDao summaryWriter(DataSource dataSource) {
132+
JdbcPlayerSummaryDao jdbcPlayerSummaryDao = new JdbcPlayerSummaryDao();
133+
jdbcPlayerSummaryDao.setDataSource(dataSource);
134+
return jdbcPlayerSummaryDao;
135+
}
136+
137+
@Bean
138+
public Step summarizationStep(JobRepository jobRepository, JdbcTransactionManager transactionManager,
139+
JdbcCursorItemReader<PlayerSummary> playerSummarizationSource, JdbcPlayerSummaryDao summaryWriter) {
140+
return new StepBuilder("summarizationStep", jobRepository).<PlayerSummary, PlayerSummary>chunk(2)
141+
.transactionManager(transactionManager)
142+
.reader(playerSummarizationSource)
143+
.writer(summaryWriter)
144+
.build();
145+
}
146+
147+
// job configuration
148+
149+
@Bean
150+
public Job job(JobRepository jobRepository, Step playerLoad, Step gameLoad, Step summarizationStep) {
151+
Flow playerLoadFlow = new FlowBuilder<SimpleFlow>("playerLoadFlow").start(playerLoad).build();
152+
Flow gameLoadFlow = new FlowBuilder<SimpleFlow>("gameLoadFlow").start(gameLoad).build();
153+
Flow parallelFlow = new FlowBuilder<SimpleFlow>("parallelFlow").start(playerLoadFlow)
154+
.split(new SimpleAsyncTaskExecutor())
155+
.add(gameLoadFlow)
156+
.build();
157+
158+
Flow mainFlow = new FlowBuilder<SimpleFlow>("mainFlow").start(parallelFlow).next(summarizationStep).build();
159+
return new JobBuilder("footballJob", jobRepository).start(mainFlow).build().build();
160+
}
161+
162+
}

spring-batch-samples/src/test/java/org/springframework/batch/samples/football/FootballJobFunctionalTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,21 @@ void testLaunchJobWithJavaConfiguration() throws Exception {
7272
assertTrue(count > 0);
7373
}
7474

75+
@Test
76+
void testLaunchJobWithParallelStepsWithJavaConfiguration() throws Exception {
77+
// given
78+
ApplicationContext context = new AnnotationConfigApplicationContext(
79+
FootballJobParallelStepsConfiguration.class);
80+
JobOperator jobOperator = context.getBean(JobOperator.class);
81+
Job job = context.getBean(Job.class);
82+
83+
// when
84+
jobOperator.start(job, new JobParameters());
85+
86+
// then
87+
int count = JdbcTestUtils.countRowsInTable(new JdbcTemplate(context.getBean(DataSource.class)),
88+
"PLAYER_SUMMARY");
89+
assertTrue(count > 0);
90+
}
91+
7592
}

0 commit comments

Comments
 (0)