Spring Batch 如何健壮可重启可追溯 SKIP/RETRY/RESTART策略的应用

前提:你已经有了一定的Spring基础

你已经可以跑动一个简单的Spring batch 的实例


参考:http://www.cnblogs.com/gulvzhe/archive/2011/10/25/2224249.html

http://www.cnblogs.com/cdutedu/p/3789396.html

先盗几个图




JobLauncher 指定一个 JobRepository

JobRepository包含了一些传入JOB的参数,主要有六个表去存储


每个JOB可以对应多个Step


		 ...<batch:step id="aStep" next="bStep">
			<batch:tasklet>
				<batch:chunk reader="aReader" writer="aWriter" processor="aProcessor" commit-interval="1000" />
			</batch:tasklet>
		</batch:step>...

tesklet里面的工作块为chunk,每个chunk执行完commit-interval,即提交一次,可以同时开多个chunk


每个STEP 这样执行


(以下一段摘抄)

从DB或是文件中取出数据的时候,read()操作每次只读取一条记录,之后将读取的这条数据传递给processor(item)处理,框架将重复做这两步操作,直到读取记录的件数达到batch配置信息中”commin-interval”设定值的时候,就会调用一次write操作。然后再重复上图的处理,直到处理完所有的数据。当这个Step的工作完成以后,或是跳到其他Step,或是结束处理。


那么问题来了?读取数据到处理的时候发生了异常如何处理?会不会导致整个批处理中断呢?

有异常没有捕获,到最上层也没有的话整个进程会挂掉,导致整个批处理中断。


显然,为了不影响后面的处理,要么捕获异常,打出日志,跳过。要么,重试(在IO超时或者表锁定的情况下是很有效的-瞬态情况)。

即使有中断,我们也需要重启JOB


以上几种正好对应Spring-Batch中的 SKIP\RETYR\RESTART

一、SKIP

<job id="importProductsJob">
<step id="importProductsStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="100"
skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch
? .item.file.FlatFileParseException" />
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
</job>

skippable-exception-classes 里面配需要SKIP的异常类型

skip-limit 最多可以容错次数,超过这个数,该STEP中断





也可以添加listener去打日志

<bean id="skipListener" class="com.manning
? .sbia.ch08.skip.DatabaseSkipListener">
<constructor-arg ref="dataSource" />
</bean>
<job id="importProductsJob"
xmlns="http://www.springframework.org/schema/batch">
<step id="importProductsStep">
<tasklet>
<chunk reader="reader" writer="writer"
commit-interval="100" skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch.item.file
? .FlatFileParseException" />
</skippable-exception-classes>
</chunk>
<listeners>
<listener ref="skipListener" />
</listeners>
</tasklet>
</step>
</job>

二、Retrying on error

主要针对于IO操作的、并发等,瞬态的错误

类似于SKIP的配置

<job id="importProducsJob">
<step id="importProductsStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="100"
retry-limit="3">
<retryable-exception-classes>
<include class="org.springframework.dao
?.OptimisticLockingFailureException" />
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
</job>

如果你不想重试次数达到后,由于这些错误导致STEP的中断退出,可以混合RETRY和SKIP两者。

<job id="job">
<step id="step">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="100"
retry-limit="3" skip-limit="10">
<retryable-exception-classes>
<include class="org.springframework.dao
? .DeadlockLoserDataAccessException" />
</retryable-exception-classes>
<skippable-exception-classes>
<include class="org.springframework.dao
? .DeadlockLoserDataAccessException" />
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
</job>


你也可以通过自定义的策略来控制重试

<job id="retryPolicyJob"
xmlns="http://www.springframework.org/schema/batch">
<step id="retryPolicyStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="100"
retry-policy="retryPolicy" />
</tasklet>
</step>
</job>
<bean id="retryPolicy" class="org.springframework
?.batch.retry.policy.ExceptionClassifierRetryPolicy">
<property name="policyMap">
<map>
<entry key="org.springframework.dao.ConcurrencyFailureException">
<bean class="org.springframework.batch.retry
?.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</entry>
<entry key="org.springframework.dao
? .DeadlockLoserDataAccessException">
<bean class="org.springframework.batch.retry
? .policy.SimpleRetryPolicy">
<property name="maxAttempts" value="5" />
</bean>
</entry>
</map>
</property>
</bean>

你也可以添加Listener

配置方法类似同SKIP,继承RetryListenerSupport,配置在retry-listeners


你还可以通过RetryTemplate来重试。RetryTemplate配置属性retryPolicy


三、重启

重启的重点是能够保存之前的job repository,自带的reader,都可以,自己写的Reader需要继承接口

Spring batch 默认是会重启的

allow-start-if-complete配置在tasklet上,决定了tasklet会不会在JOB重试的时候,重试该STEP(可能是下个STEP发生了异常)。

start-limit用来控制STEP级别的重试次数,重试次数结束后,JOB中断退出。




处理在STEP中的状态,已经处理了很多ITEM,失败了?
middle of a chunk-oriented step,item级别的重启。首先是reader的重启。
如果你需要你自己写的ITEM的reader,也可以实现重启的话,需要实现ItemStram的接口,将对于item 计数的值,保存到executionContext,重启的时候去读取。





郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。