1. 概述
Spring Cloud Task的目標(biāo)是為Spring Boot應(yīng)用程序提供創(chuàng)建短運(yùn)行期微服務(wù)的功能。在Spring Cloud Task中,我們可以靈活地動(dòng)態(tài)運(yùn)行任何任務(wù),按需分配資源并在任務(wù)完成后檢索結(jié)果。Tasks是Spring Cloud Data Flow中的一個(gè)基礎(chǔ)項(xiàng)目,允許用戶將幾乎任何Spring Boot應(yīng)用程序作為一個(gè)短期任務(wù)執(zhí)行。
2. 一個(gè)簡(jiǎn)單的任務(wù)應(yīng)用程序
2.1 添加相關(guān)依賴項(xiàng)
首先,我們可以添加具有spring-cloud-task-dependencies的依賴關(guān)系管理部分:
org.springframework.cloud spring-cloud-task-dependencies 1.2.2.RELEASE pom import
此依賴關(guān)系管理通過(guò)import范圍管理具有依賴關(guān)系的版本。
我們需要添加以下依賴項(xiàng):
org.springframework.cloud spring-cloud-starter-task org.springframework.cloud spring-cloud-task-core
現(xiàn)在,要啟動(dòng)我們的Spring Boot應(yīng)用程序,我們需要與相關(guān)父級(jí)一起使用spring-boot-starter。
我們將使用Spring Data JPA作為ORM工具,因此我們還需要為其添加依賴項(xiàng):
org.springframework.boot spring-boot-starter-data-jpa
2.2 @EnableTask注解
要引導(dǎo)Spring Cloud Task的功能,我們需要添加@EnableTask注解:
@SpringBootApplication@EnableTaskpublic class TaskDemo { // …}
該注解在程序中引入了SimpleTaskConfiguration類,該類依次注冊(cè)TaskRepository及其基礎(chǔ)結(jié)構(gòu)。默認(rèn)情況下,內(nèi)存映射用于存儲(chǔ)TaskRepository的狀態(tài)。
TaskRepository的主要信息在TaskExecution類中建模。該類的包含的字段有taskName,startTime,endTime,exitMessage。exitMessage在退出的時(shí)候存儲(chǔ)一些有用信息。
如果退出是由應(yīng)用程序的任何事件中的失敗引起的,則完整的異常堆棧跟蹤將存儲(chǔ)在此處。
Spring Boot提供了一個(gè)接口ExitCodeExceptionMapper,它將未捕獲的異常映射到允許經(jīng)過(guò)詳細(xì)調(diào)試的退出代碼。Cloud Task將信息存儲(chǔ)在數(shù)據(jù)源中以供將來(lái)分析。
2.3 為T(mén)askRepository配置DataSource
一旦任務(wù)結(jié)束,存儲(chǔ)TaskRepository的內(nèi)存映射將消失,我們將丟失與Task事件相關(guān)的數(shù)據(jù)。要想永久存儲(chǔ),我們將使用MySQL作為Spring Data JPA的數(shù)據(jù)源。
數(shù)據(jù)源 在application.yml文件中配置。要配置Spring Cloud Task以使用提供的數(shù)據(jù)源作為T(mén)askRepository的存儲(chǔ),我們需要?jiǎng)?chuàng)建一個(gè)擴(kuò)展DefaultTaskConfigurer的類。
現(xiàn)在,我們可以將配置的Datasource作為構(gòu)造函數(shù)參數(shù)發(fā)送到超類的構(gòu)造函數(shù):
public class HelloWorldTaskConfigurer extends DefaultTaskConfigurer{ public HelloWorldTaskConfigurer(DataSource dataSource){ super(dataSource); }}
為了實(shí)現(xiàn)上述配置,我們需要使用@Autowired批注注釋DataSource的實(shí)例,并將實(shí)例注入上面定義的HelloWorldTaskConfigurer bean的構(gòu)造參數(shù)中 :
@Autowiredprivate DataSource dataSource;@Beanpublic HelloWorldTaskConfigurer getTaskConfigurer() { return new HelloWorldTaskConfigurer(dataSource);}
這樣就完成了將TaskRepository存儲(chǔ)到MySQL數(shù)據(jù)庫(kù)的配置。
2.4 實(shí)現(xiàn)
在Spring Boot中,我們可以在應(yīng)用程序完成啟動(dòng)之前執(zhí)行任何任務(wù)。我們可以使用ApplicationRunner或CommandLineRunner接口來(lái)創(chuàng)建一個(gè)簡(jiǎn)單的Task。
我們需要實(shí)現(xiàn)這些接口的run方法,并將實(shí)現(xiàn)類聲明為bean:
@Component public static class HelloWorldApplicationRunner implements ApplicationRunner { @Override public void run(ApplicationArguments arg0) throws Exception { // TODO Auto-generated method stub LOGGER.info(“Hello World from Spring Cloud Task!”); } }
3. Spring Cloud Task的生命周期
首先,我們?cè)赥askRepository中創(chuàng)建一個(gè)條目。這表明所有bean都已準(zhǔn)備好在Application中使用,并且Runner接口的run方法已準(zhǔn)備好執(zhí)行。
完成run方法的執(zhí)行或ApplicationContext事件的任何失敗后,TaskRepository將使用另一個(gè)條目進(jìn)行更新。
在任務(wù)生命周期中,我們可以在TaskExecutionListener接口中注冊(cè)偵聽(tīng)器。我們需要一個(gè)實(shí)現(xiàn)接口的類,它有三個(gè)方法 – 在Task的各個(gè)事件中觸發(fā)onTaskEnd,onTaksFailed和onTaskStartup。
public class TaskListener implements TaskExecutionListener { private final static Logger LOGGER = Logger.getLogger(TaskListener.class.getName()); @Override public void onTaskEnd(TaskExecution arg0) { // TODO Auto-generated method stub LOGGER.info(“End of Task”); } @Override public void onTaskFailed(TaskExecution arg0, Throwable arg1) { // TODO Auto-generated method stub } @Override public void onTaskStartup(TaskExecution arg0) { // TODO Auto-generated method stub LOGGER.info(“Task Startup”); }}
我們需要在TaskDemo類中聲明實(shí)現(xiàn)類的bean :
@Beanpublic TaskListener taskListener() { return new TaskListener();}
運(yùn)行TaskDemo類,在控制臺(tái)可看到任務(wù)被執(zhí)行:
14:23:29.974 [main] INFO o.s.j.e.a.AnnotationMBeanExporter – Registering beans for JMX exposure on startup14:23:29.978 [main] INFO o.s.c.s.DefaultLifecycleProcessor – Starting beans in phase 014:23:30.103 [main] INFO c.p.spring.cloud.task.TaskListener – Task Startup14:23:30.109 [main] INFO c.p.spring.cloud.task.TaskDemo – Hello World from Spring Cloud Task!14:23:30.113 [main] INFO c.p.spring.cloud.task.TaskListener – End of Task14:23:30.126 [main] INFO o.s.c.a.AnnotationConfigApplicationContext – Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2a798d51: startup date [Fri Oct 12 14:23:28 CST 2018]; root of context hierarchy14:23:30.127 [main] INFO o.s.c.s.DefaultLifecycleProcessor – Stopping beans in phase 014:23:30.128 [main] INFO o.s.j.e.a.AnnotationMBeanExporter – Unregistering JMX-exposed beans on shutdown14:23:30.128 [main] INFO o.s.o.j.LocalContainerEntityManagerFactoryBean – Closing JPA EntityManagerFactory for persistence unit ‘default’14:23:30.129 [main] INFO o.h.tool.hbm2ddl.SchemaExport – HHH000227: Running hbm2ddl schema export14:23:30.129 [main] INFO o.h.tool.hbm2ddl.SchemaExport – HHH000230: Schema export complete14:23:30.132 [main] INFO c.p.spring.cloud.task.TaskDemo – Started TaskDemo in 2.405 seconds (JVM running for 2.771)
4. 與Spring Batch集成
我們可以將Spring Batch Job作為T(mén)ask執(zhí)行,并使用Spring Cloud Task記錄Job執(zhí)行的事件。要啟用此功能,我們需要添加與Boot和Cloud相關(guān)的Batch依賴項(xiàng):
org.springframework.boot spring-boot-starter-batch org.springframework.cloud spring-cloud-task-batch
要將作業(yè)配置為任務(wù),我們需要在JobConfiguration類中注冊(cè)Job Bean :
@Bean public Job job2() { return jobBuilderFactory.get(“job2”).start(stepBuilderFactory.get(“job2step1”).tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { LOGGER.info(“This job is from PeterWanghao”); return RepeatStatus.FINISHED; } }).build()).build(); }
我們需要使用@EnableBatchProcessing注解來(lái)裝飾TaskDemo類:
@SpringBootApplication@EnableTask@EnableBatchProcessingpublic class TaskDemo { // …}
該@EnableBatchProcessing注解集成了Spring Batch的功能,并設(shè)置批處理作業(yè)所需的基本配置。
現(xiàn)在,如果我們運(yùn)行應(yīng)用程序,@ EnableBatchProcessing注釋將觸發(fā)Spring Batch Job執(zhí)行,Spring Cloud Task將使用springcloud數(shù)據(jù)庫(kù)記錄所有批處理作業(yè)的執(zhí)行事件。
運(yùn)行TaskDemo類,在控制臺(tái)可看到任務(wù)被執(zhí)行:
14:30:26.003 [main] INFO o.s.j.e.a.AnnotationMBeanExporter – Registering beans for JMX exposure on startup14:30:26.008 [main] INFO o.s.c.s.DefaultLifecycleProcessor – Starting beans in phase 014:30:26.047 [main] INFO c.p.spring.cloud.task.TaskListener – Task Startup14:30:26.053 [main] INFO c.p.spring.cloud.task.TaskDemo – Hello World from Spring Cloud Task!14:30:26.054 [main] INFO o.s.b.a.b.JobLauncherCommandLineRunner – Running default command line with: []14:30:26.257 [main] INFO o.s.b.c.l.support.SimpleJobLauncher – Job: [SimpleJob: [name=job1]] launched with the following parameters: [{}]14:30:26.266 [main] INFO o.s.c.t.b.l.TaskBatchExecutionListener – The job execution id 1 was run within the task execution 414:30:26.292 [main] INFO o.s.batch.core.job.SimpleStepHandler – Executing step: [job1step1]14:30:26.312 [main] INFO c.p.s.cloud.task.JobConfiguration – Tasklet has run14:30:26.342 [main] INFO o.s.batch.core.job.SimpleStepHandler – Executing step: [job1step2]14:30:26.353 [main] INFO c.p.s.cloud.task.JobConfiguration – Processing of chunks14:30:26.353 [main] INFO c.p.s.cloud.task.JobConfiguration – Processing of chunks14:30:26.353 [main] INFO c.p.s.cloud.task.JobConfiguration – Processing of chunks14:30:26.354 [main] INFO c.p.s.cloud.task.JobConfiguration – >> -714:30:26.354 [main] INFO c.p.s.cloud.task.JobConfiguration – >> -214:30:26.354 [main] INFO c.p.s.cloud.task.JobConfiguration – >> -314:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration – Processing of chunks14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration – Processing of chunks14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration – Processing of chunks14:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration – >> -1014:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration – >> -514:30:26.359 [main] INFO c.p.s.cloud.task.JobConfiguration – >> -614:30:26.381 [main] INFO o.s.b.c.l.support.SimpleJobLauncher – Job: [SimpleJob: [name=job1]] completed with the following parameters: [{}] and the following status: [COMPLETED]14:30:26.398 [main] INFO o.s.b.c.l.support.SimpleJobLauncher – Job: [SimpleJob: [name=job2]] launched with the following parameters: [{}]14:30:26.404 [main] INFO o.s.c.t.b.l.TaskBatchExecutionListener – The job execution id 2 was run within the task execution 414:30:26.420 [main] INFO o.s.batch.core.job.SimpleStepHandler – Executing step: [job2step1]14:30:26.428 [main] INFO c.p.s.cloud.task.JobConfiguration – This job is from PeterWanghao14:30:26.441 [main] INFO o.s.b.c.l.support.SimpleJobLauncher – Job: [SimpleJob: [name=job2]] completed with the following parameters: [{}] and the following status: [COMPLETED]14:30:26.442 [main] INFO c.p.spring.cloud.task.TaskListener – End of Task14:30:26.448 [main] INFO o.s.c.a.AnnotationConfigApplicationContext – Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@399f45b1: startup date [Fri Oct 12 14:30:23 CST 2018]; root of context hierarchy14:30:26.450 [main] INFO o.s.c.s.DefaultLifecycleProcessor – Stopping beans in phase 014:30:26.450 [main] INFO o.s.j.e.a.AnnotationMBeanExporter – Unregistering JMX-exposed beans on shutdown14:30:26.451 [main] INFO o.s.o.j.LocalContainerEntityManagerFactoryBean – Closing JPA EntityManagerFactory for persistence unit ‘default’14:30:26.451 [main] INFO o.h.tool.hbm2ddl.SchemaExport – HHH000227: Running hbm2ddl schema export14:30:26.451 [main] INFO o.h.tool.hbm2ddl.SchemaExport – HHH000230: Schema export complete14:30:26.455 [main] INFO c.p.spring.cloud.task.TaskDemo – Started TaskDemo in 3.746 seconds (JVM running for 4.093)
5. 從Stream啟動(dòng)任務(wù)
我們可以從Spring Cloud Stream觸發(fā)任務(wù)。為了達(dá)到這個(gè)目的,我們使用@EnableTaskLaucnher注解。這一次,我們使用Spring Boot應(yīng)用程序添加注釋,TaskSink將可用:
@SpringBootApplication@EnableTaskLauncherpublic class StreamTaskSinkApplication { public static void main(String[] args) { SpringApplication.run(TaskSinkApplication.class, args); }}
TaskSink從一個(gè)流中接收消息,信息中包含GenericMessage含有TaskLaunchRequest作為有效負(fù)載。然后它觸發(fā)任務(wù)啟動(dòng)請(qǐng)求中提供的基于任務(wù)的坐標(biāo)。
要使TaskSink起作用,我們需要配置一個(gè)實(shí)現(xiàn)TaskLauncher接口的bean。出于測(cè)試目的,我們?cè)谶@里Mock實(shí)現(xiàn):
@Beanpublic TaskLauncher taskLauncher() { return mock(TaskLauncher.class);}
我們需要注意的是,TaskLauncher接口僅在添加spring-cloud-deployer-local依賴項(xiàng)后才可用:
org.springframework.cloud spring-cloud-deployer-local 1.3.1.RELEASE
我們可以測(cè)試通過(guò)調(diào)用發(fā)起的任務(wù)是否輸入了Sink:
public class SpringCloudTaskSinkApplicationIntegrationTest{ @Autowired private Sink sink; //}
現(xiàn)在,我們創(chuàng)建一個(gè)TaskLaunchRequest實(shí)例,并將其作為GenericMessage 對(duì)象的有效負(fù)載發(fā)送。然后我們可以調(diào)用Sink的輸入通道,將GenericMessage對(duì)象保留在通道中。
六,結(jié)論
在本文中,我們探討了Spring Cloud Task的執(zhí)行方式以及如何配置它以在數(shù)據(jù)庫(kù)中記錄其事件。我們還觀察了如何定義Spring Batch作業(yè)并將其存儲(chǔ)在TaskRepository中。最后,我們解釋了如何從Spring Cloud Stream中觸發(fā)Task。