Quartz源码分析

零、简要

按照上一篇的介绍,Quartz是个作业调度系统,核心组件有Job/Trigger/Scheduler等,如何使用Quartz我们已经知晓,此次细心看看其内部的代码逻辑。

一、核心部件

Quartz内部部件关系如图所示

一个Job对应多个Trigger,每个Trigger被触发,即代表着Job执行一次。

1.Scheduler:任务调度器,是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。

2.Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多。CronTrigger在spring中封装在CronTriggerFactoryBean中。

3.JobDetail:用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。

4.Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(以前是实现StatefulJob接口,现在已被Deprecated),在与spring结合中可以在spring配置文件的job detail中配置concurrent参数。

二、Scheduler

在Quartz内部,每个调度器Scheduler有一个负责调度的线程QuartzSchedulerThread,该线程启动后不断地抓取即将触发的Trigger,做了一番封装后,放入负责执行的线程池ThreadPool,大致逻辑如下时序图:

2.1

时序图中间的QuartzSchedulerThread是最核心的部分,它撑起了整个Quartz的运行,看看该线程的启动逻辑:

(代码段1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void run() {

// ……

while (!halted.get()) {
try {
synchronized (sigLock) {
while (paused && !halted.get()) {
try {

// 除非本调度线程被触发上线,否则一直wait
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}

if (halted.get()) {
break;
}
}

} catch(RuntimeException re) {

}
} // loop...
}


调度线程在最开始的时候已经生成并启动,一直处于等待状态,直到QuartzScheduler调用了schedThread.togglePause(false),将paused设置为false,调度线程才启动。

2.2

schedThread.togglePause(false)方法如以下代码:

(代码段2)

1
2
3
4
5
6
7
8
9
10
11
12
void togglePause(boolean pause) {
synchronized (sigLock) {
// paused被设置为false,喻示着调度线程可以开始干活了。
paused = pause;

if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}

2.3

调度线程开始干活,第一步,是查看是否有空闲线程可以用来触发job,具体是调用ThreadPool.blockForAvailableThreads()

(代码段3)

1
2
3
4
5
6
7
8
9
10
11
12
13
public int blockForAvailableThreads() {
synchronized(nextRunnableLock) {
// 如果线程池中的所有线程都出去干活了,则等待0.5s,再看有没有线程空闲出来
while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}

return availWorkers.size();
}
}

2.4

调度线程获知有空闲线程,它会从JobStore中获取最近的Trigger,并进入等待,直到触发时间的到来

(代码段4)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
try {
// 从JobStore中获取最近的Trigger
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime );
lastAcquireFailed = false;
} catch ( JobPersistenceException jpe ) {

} catch ( RuntimeException e ) {

}

if ( trigger != null )
{
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;

// 距离触发时间还有2毫秒以上,就会加锁并wait()
while ( timeUntilTrigger > 2 )
{
synchronized (sigLock) {
if ( !isCandidateNewTimeEarlierWithinReason( triggerTime, false ) )
{
try {
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if ( timeUntilTrigger >= 1 )
sigLock.wait( timeUntilTrigger );
} catch ( InterruptedException ignore ) {
}
}
}
}
}

2.5

触发时间到来,调度线程先是将Trigger和它指向的Job包装成一个RunShell(安全壳)

(代码段5)

1
2
3
4
5
6
7
8
JobRunShell shell = null;
try {
// 从Shell工厂里生成一个Shell
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
// 初始化Shell
shell.initialize( qs, bndle );
} catch ( SchedulerException se ) {
}

2.6

初始化Shell的逻辑如下:

(代码段6)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void initialize(QuartzScheduler qs, TriggerFiredBundle firedBundle)
throws SchedulerException {
this.qs = qs;
Job job = null;
JobDetail jobDetail = firedBundle.getJobDetail();

try {

// 在Job工厂中,用反射的方式生成一个Job
job = qs.getJobFactory().newJob(firedBundle);

} catch (SchedulerException se) {

throw se;
} catch (Throwable ncdfe) { // such as NoClassDefFoundError

throw se;
}

this.jec = new JobExecutionContext(scheduler, firedBundle, job);
}

2.7

调度线程得到了RunShell,后者实现了Runnable接口,可以让线程去执行,所以下一步是放入线程池,让线程池执行Job,调用ThreadPool().runInThread(shell)

(代码段7)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}

synchronized (nextRunnableLock) {

handoffPending = true;

// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}

if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();

// 把空闲线程从空闲队列移到忙碌线程队列(队列用的是LinkedList)
busyWorkers.add(wt);

// 空闲线程开始执行RunShell
wt.run(runnable);

} else {
// 如果Trigger触发后,线程池被ShutDown,则新生成一个线程来执行RunShell
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}

return true;
}

2.8

RunShell实现了Runnable接口,它负责触发Job的执行,记录事务,记录Job执行的全程,通知各种触发器等

(代码段8)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
public void run() {
try {
scheduler.addSchedulerListener(this);
} catch (SchedulerException ignore) {
}

try {
Trigger trigger = jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();

do {

JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();

try {

// Job开始前的工作,如果配置了事务,则开启事务
begin();

} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getFullName()
+ ": couldn't begin execution.", se);
break;
}

// notify job & trigger listeners...
try {
if (!notifyListenersBeginning(jec)) {
break;
}
} catch(VetoedException ve) {
try {
int instCode = trigger.executionComplete(jec, null);
try {
qs.notifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode);
} catch(JobPersistenceException jpe) {
vetoedJobRetryLoop(trigger, jobDetail, instCode);
}

// QTZ-205
// Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not.
if (jec.getTrigger().getNextFireTime() == null) {
qs.notifySchedulerListenersFinalized(jec.getTrigger());
}

complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error during veto of Job ("
+ jec.getJobDetail().getFullName()
+ ": couldn't finalize execution.", se);
}
break;
}

long startTime = System.currentTimeMillis();
long endTime = startTime;

// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getFullName());

// 这是Job真正被调用的代码
job.execute(jec);

endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getFullName() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getFullName() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
se.setErrorCode(SchedulerException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getFullName()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
jobExEx.setErrorCode(JobExecutionException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
}

jec.setJobRunTime(endTime - startTime);

// notify all job listeners
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}

int instCode = Trigger.INSTRUCTION_NOOP;

try {

// 将执行结果告知触发器
instCode = trigger.executionComplete(jec, jobExEx);

} catch (Exception e) {
SchedulerException se = new SchedulerException(
"Trigger threw an unhandled exception.", e);
se.setErrorCode(SchedulerException.ERR_TRIGGER_THREW_EXCEPTION);
qs.notifySchedulerListenersError(
"Please report this error to the Quartz developers.",
se);
}

// notify all trigger listeners
if (!notifyTriggerListenersComplete(jec, instCode)) {
break;
}

// update job/trigger or re-execute job
if (instCode == Trigger.INSTRUCTION_RE_EXECUTE_JOB) {
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getFullName()
+ ": couldn't finalize execution.", se);
}
// 如果需要立即重新触发Trigger,跳出循环,重新来一遍
continue;
}

try {
// Job执行之后的操作,若有有事务且不需回滚则提交事务,如果需rollback则回滚
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getFullName()
+ ": couldn't finalize execution.", se);
continue;
}

try {
qs.notifyJobStoreJobComplete(schdCtxt, trigger, jobDetail,
instCode);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while marking executed job complete. job= '"
+ jobDetail.getFullName() + "'", jpe);
if (!completeTriggerRetryLoop(trigger, jobDetail, instCode)) {
return;
}
}

break;
} while (true);

} finally {
try {
scheduler.removeSchedulerListener(this);
} catch (SchedulerException e) {
// can never happen on a local scheduler - which by definition this will be (since we are executing on it)
}
// 清空RunShell里面的上下文,如果有事务则清除事务
jobRunShellFactory.returnJobRunShell(this);
}
}

二、Job

2.1 Job的保存

对于用户来说,在新建Job类的基础上,每new一个JobDetail,都代表着建立了一个Job实例,比如

(代码段10)

1
2
3
4
5
6
7
// 新建一个Job
JobDetail jobDetail = new JobDetail("qyc_job_detail", QycJob.class);

// 新建第一个触发器
Trigger tenSecondTrigger = new SimpleTrigger("tenSecondTrigger", 10000, 10000);
// 将每10秒触发的触发器绑定到Job
scheduler.scheduleJob(jobDetail, tenSecondTrigger);

最后一行的scheduler.scheduleJob,进一步调用JobStore.storeJobAndTrigger,将JobDetail和Trigger存放进JobStore之中。

在这引出了JobStore的概念,看一下它的官方定义: The interface to be implemented by classes that want to provide a org.quartz.Job and org.quartz.Trigger storage mechanism for the org.quartz.core.QuartzScheduler’s use。非常的简明概要,用来保存JobStore和Trigger,一个调度器有一个JobStore。

JobStore有三种实现,RAMJobStore、JDBC-JobStoreTX、JDBC-JobStoreCMT。

2.1.1 RAMJobStore

RAMJobStore用于存储内存中的调度信息(jobs,Triggers和日历)。RAMJobStore快速轻便,但是当进程终止时,所有调度信息都会丢失。

RAMJobStore存取JobDetail和Trigger的时候,使用synchronized加锁,如下面所示

(代码段11)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  public void storeTrigger(SchedulingContext ctxt, Trigger newTrigger,
boolean replaceExisting) throws JobPersistenceException {
TriggerWrapper tw = new TriggerWrapper((Trigger)newTrigger.clone());

// 使用synchronized加锁
synchronized (lock) {

……

triggersByFQN.put(tw.key, tw);

if (pausedTriggerGroups.contains(newTrigger.getGroup())
|| pausedJobGroups.contains(newTrigger.getJobGroup())) {

// 如果Trigger组被暂停了,则设置Trigger状态为暂停
tw.state = TriggerWrapper.STATE_PAUSED;

// 如果触发器绑定的Job被阻塞了,则设置Trigger状态为暂停且阻塞
if (blockedJobs.contains(tw.jobKey)) {
tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
}
} else if (blockedJobs.contains(tw.jobKey)) {
tw.state = TriggerWrapper.STATE_BLOCKED;
} else {
timeTriggers.add(tw);
}
}
}
2.1.2 JDBC-JobStore

JDBCJobStore用于在关系数据库中存储调度信息(jobs,Triggers和日历),这意味着所有数据都会落地,系统崩溃后重新启动可以恢复Job和Trigger的运行。在Quartz中,JDBCJobStore的具体实现是JobStoreSupport,JobStoreTX和JobStoreCMT都继承自它。

JobStoreSupport中保存了DataSource、数据库表名、数据库重试时长和具体的锁类型等消息。

JobStoreSupport既支持数据库锁,也在内部实现了一个简单的信号量锁(将锁资源放在threadlocal中,若申请不到锁资源,则一直wait)。

a) 信号量锁的实现:

(代码段12)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public synchronized boolean obtainLock(Connection conn, String lockName) {
lockName = lockName.intern();
Logger log = getLog();

if (!isLockOwner(conn, lockName)) {

// locks.contains表明锁资源被别的线程拿走了
while (locks.contains(lockName)) {
try {
this.wait();
} catch (InterruptedException ie) {
}
}
}
getThreadLocks().add(lockName);

// 把锁资源放入locks,其它线程抢不走了
locks.add(lockName);
} else if(log.isDebugEnabled()) {
}

return true;
}

public synchronized void releaseLock(Connection conn, String lockName) {
lockName = lockName.intern();
if (isLockOwner(conn, lockName)) {
getThreadLocks().remove(lockName);
locks.remove(lockName);

// 释放锁,唤醒其它线程
this.notifyAll();
} else if (getLog().isDebugEnabled()) {
}
}
b) 数据库锁实现

数据库锁的原理如下:

1.在执行某种需要加锁的『关键的逻辑』之前,先启用一个事务,并执行一个需要数据库加锁的语句(比如for update或update),令当前线程得到数据库锁。

2.线程得到数据库锁,开始执行『关键的逻辑』。

3.线程执行完毕『关键的逻辑』,若一切正常,则提交事务,否则回滚事务,线程失去数据库锁。

具体的代码如下:

(代码段13)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean obtainLock(Connection conn, String lockName)
throws LockException {
lockName = lockName.intern();
Logger log = getLog();

if (!isLockOwner(conn, lockName)) {

// 执行sql语句取得锁
executeSQL(conn, lockName, expandedSQL);

getThreadLocks().add(lockName);
} else if(log.isDebugEnabled()) {
}

return true;
}

public void releaseLock(Connection conn, String lockName) {
lockName = lockName.intern();

if (isLockOwner(conn, lockName)) {
getThreadLocks().remove(lockName);
} else if (getLog().isDebugEnabled()) {
getLog().warn(
"Lock '" + lockName + "' attempt to return by: "
+ Thread.currentThread().getName()
+ " -- but not owner!",
new Exception("stack-trace of wrongful returner"));
}
}

执行sql获取锁的代码逻辑则是在行锁StdRowLockSemaphore和更新锁UpdateLockRowSemaphore,MySQL锁介绍可看MySQL innodb中各种SQL语句加锁分析

行锁信号量StdRowLockSemaphore使用for update语句获取MySQL锁,代码如下

(代码段14)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

// 使用select…for update将行锁住,在索引记录上加『index record lock』
public static final String SELECT_FOR_LOCK = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_LOCK_NAME
+ " = ? FOR UPDATE";

protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
ps = conn.prepareStatement(expandedSQL);
ps.setString(1, lockName);

// 执行sql语句,在commit之前,其它sql语句阻塞
rs = ps.executeQuery();
if (!rs.next()) {
throw new SQLException();
}
} catch (SQLException sqle) {

throw new LockException("Failure obtaining db row lock: "
+ sqle.getMessage(), sqle);
} finally {
……
}
}

更新锁UpdateLockRowSemaphore执行Update语句来获取MySQL锁,Quartz之所以在行锁的基础上,额外又实现更新锁,是因为部分数据库不支持for update,比如MSSQL。更新锁UpdateLockRowSemaphore执行的sql语句如下:

(代码段15)

1
2
3
4
public static final String UPDATE_FOR_LOCK = 
"UPDATE " + TABLE_PREFIX_SUBST + TABLE_LOCKS +
" SET " + COL_LOCK_NAME + " = " + COL_LOCK_NAME +
" WHERE " + COL_LOCK_NAME + " = ? ";
2.1.3 JDBC-JobStoreTX

JobStoreTX自身使用数据库的事务,不依赖spring等『容器』的事务,它独立于『容器』。
实际上,JobStoreTX的逻辑和JobStoreSupport一致,在保存Job等资料的时候,是放在事务中进行的,如下:

(代码段16)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected Object executeInNonManagedTXLock(
String lockName,
TransactionCallback txCallback) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}

// 获取数据库锁
transOwner = getLockHandler().obtainLock(conn, lockName);
}

if (conn == null) {
conn = getNonManagedTXConnection();
}

// 执行具体的保存等逻辑
Object result = txCallback.execute(conn);

// 提交事务,释放数据库锁
commitConnection(conn);
return result;
} catch (JobPersistenceException e) {

// 发送异常则回滚事务
rollbackConnection(conn);
throw e;
} catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(conn, lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
}

保存Job的代码如下:

(代码段17)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void storeJob(Connection conn, SchedulingContext ctxt,
JobDetail newJob, boolean replaceExisting)
throws ObjectAlreadyExistsException, JobPersistenceException {

boolean existingJob = jobExists(conn, newJob.getName(), newJob
.getGroup());
try {
if (existingJob) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
// 存在则更新
getDelegate().updateJobDetail(conn, newJob);
} else {
// 不存在则插入
getDelegate().insertJobDetail(conn, newJob);
}
} catch (IOException e) {
throw new JobPersistenceException("Couldn't store job: "
+ e.getMessage(), e);
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't store job: "
+ e.getMessage(), e);
}
}
2.1.4 JDBC-JobStoreCMT

和JobStoreTX相比,JobStoreCMT使用容器内的事务管理器,意味着它不能自己调用connection的commit或rollback,需要交给容器的事务管理器,这一点从JobStoreCMT的加锁执行方法executeInLock可以看出来:

(代码段18)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected Object executeInLock(
String lockName,
TransactionCallback txCallback) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getConnection();
}

transOwner = getLockHandler().obtainLock(conn, lockName);
}

if (conn == null) {
conn = getConnection();
}

// 执行完关键逻辑后,不再手动调用commit
return txCallback.execute(conn);
} finally {
try {
releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
} finally {
cleanupConnection(conn);
}
}
}

2.2 如何扫描出到时间的job

对任何的job来说,只有指向它的触发器Trigger到时间了,job才会被执行。简单地说,job什么时候执行,由触发器Trigger决定。

2.3 如何调度job

类似上面一条,在Quartz中,调度器Scheduler调度的对象是触发器Trigger,调度器不关心job处于何种状态。

2.4 如何执行job

当调度器Scheduler发现Trigger到了触发时间,会触发Trigger,连带着执行Trigger指向的job

2.6 触发器和job的关系

Trigger和Job是多对一的关系,一个Trigger只能对应一个Job,一个Job可以对应多个Trigger。

2.7 如何恢复崩溃的job

job在执行过程中,如果发生了异常,job的execute被中断,Quartz根据异常中的code判断是否需要重新启动job,代码如下:

(代码段19)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
try {
// job执行
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getFullName() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getFullName() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
se.setErrorCode(SchedulerException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getFullName()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
jobExEx.setErrorCode(JobExecutionException.ERR_JOB_EXECUTION_THREW_EXCEPTION);
}

int instCode = Trigger.INSTRUCTION_NOOP;
try {
// 从异常中计算出下一步指令code,查看是否需要重新启动
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
}

三、触发器

3.1 如何扫描出到时间的trigger

Trigger和Job一样,存放在JobStore之中,调度器扫描最近的触发器时,委托JobStore去查找,以RAMJobStore为例,它的acquireNextTrigger方法代码如下:

(代码段20)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Trigger acquireNextTrigger(SchedulingContext ctxt, long noLaterThan) {
TriggerWrapper tw = null;

synchronized (lock) {

while (tw == null) {
try {
// 从treeSet中取出第一个元素(排序依据是触发时间)
tw = (TriggerWrapper) timeTriggers.first();
} catch (java.util.NoSuchElementException nsee) {
return null;
}

if (tw == null) {
return null;
}



tw.state = TriggerWrapper.STATE_ACQUIRED;

tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
Trigger trig = (Trigger) tw.trigger.clone();
return trig;
}
}

return null;
}

3.2 如何暂停、如何重启

暂停一个job,本质上仅是暂停它的Trigger,重启同理。

(代码段21)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void pauseTrigger(Connection conn, SchedulingContext ctxt,
String triggerName, String groupName)
throws JobPersistenceException {

try {
String oldState = getDelegate().selectTriggerState(conn,
triggerName, groupName);

if (oldState.equals(STATE_WAITING)
|| oldState.equals(STATE_ACQUIRED)) {

// 将触发器的状态修改为pause
getDelegate().updateTriggerState(conn, triggerName,
groupName, STATE_PAUSED);
} else if (oldState.equals(STATE_BLOCKED)) {
getDelegate().updateTriggerState(conn, triggerName,
groupName, STATE_PAUSED_BLOCKED);
}
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't pause trigger '"
+ groupName + "." + triggerName + "': " + e.getMessage(), e);
}
}

四、集群部署

4.1 集群部署原理

Quartz中,独立的节点并不与其它节点进行通行,而是通过数据库中的数据感知别的节点。每个节点有个ClusterManager,它是个独立的线程,定时到数据库中『签到』,如果发现别的调度器很久没来签到,则将其视为已故障,并重新恢复故障调度器的触发器。

节点『签到』的实现:

(代码段22)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected List clusterCheckIn(Connection conn)
throws JobPersistenceException {

List failedInstances = findFailedInstances(conn);

try {
lastCheckin = System.currentTimeMillis();
if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {

// 如果是第一次加入集群,则插入自己的签到记录
getDelegate().insertSchedulerState(conn, getInstanceId(),
lastCheckin, getClusterCheckinInterval());
}

} catch (Exception e) {
}

return failedInstances;
}

判断集群故障的逻辑:

(代码段23)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

for (Iterator itr = states.iterator(); itr.hasNext();) {
SchedulerStateRecord rec = (SchedulerStateRecord) itr.next();

// find own record...
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
foundThisScheduler = true;
if (firstCheckIn) {
failedInstances.add(rec);
}
} else {
// 该集群最迟『签到』的时间由calcFailedIfAfter算出,该时间已过期。
if (calcFailedIfAfter(rec) < timeNow) {
failedInstances.add(rec);
}
}
}

恢复其它集群触发器的逻辑:

(代码段24)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
protected void clusterRecover(Connection conn, List failedInstances)
throws JobPersistenceException {

if (failedInstances.size() > 0) {

long recoverIds = System.currentTimeMillis();

logWarnIfNonZero(failedInstances.size(),
"ClusterManager: detected " + failedInstances.size()
+ " failed or restarted instances.");
try {
Iterator itr = failedInstances.iterator();

// 遍历所有已故障的调度器
while (itr.hasNext()) {
SchedulerStateRecord rec = (SchedulerStateRecord) itr
.next();

// 找出该调度器下面所有已触发过的Trigger
List firedTriggerRecs = getDelegate()
.selectInstancesFiredTriggerRecords(conn,
rec.getSchedulerInstanceId());
Set triggerKeys = new HashSet();
Iterator ftItr = firedTriggerRecs.iterator();
while (ftItr.hasNext()) {
FiredTriggerRecord ftRec = (FiredTriggerRecord) ftItr
.next();

Key tKey = ftRec.getTriggerKey();
Key jKey = ftRec.getJobKey();

triggerKeys.add(tKey);

// 释放掉已获得锁的Trigger
if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
getDelegate().updateTriggerStateFromOtherState(
conn, tKey.getName(), tKey.getGroup(),
STATE_WAITING, STATE_ACQUIRED);
acquiredCount++;

} else if (ftRec.isJobRequestsRecovery()) {
// 该Trigger需恢复,重新生成一个Trigger放入数据库
if (jobExists(conn, jKey.getName(), jKey.getGroup())) {
SimpleTrigger rcvryTrig = new SimpleTrigger(
"recover_"
+ rec.getSchedulerInstanceId()
+ "_"
+ String.valueOf(recoverIds++),
Scheduler.DEFAULT_RECOVERY_GROUP,
new Date(ftRec.getFireTimestamp()));
rcvryTrig.setVolatility(ftRec.isTriggerIsVolatile());
rcvryTrig.setJobName(jKey.getName());
rcvryTrig.setJobGroup(jKey.getGroup());
rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW);
rcvryTrig.setPriority(ftRec.getPriority());
JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
rcvryTrig.setJobDataMap(jd);

rcvryTrig.computeFirstFireTime(null);
storeTrigger(conn, null, rcvryTrig, null, false,
STATE_WAITING, false, true);
recoveredCount++;
} else {

}
} else {
otherCount++;
}

}

getDelegate().deleteFiredTriggers(conn,
rec.getSchedulerInstanceId());

}
} catch (Exception e) {
throw new JobPersistenceException("Failure recovering jobs: "
+ e.getMessage(), e);
}
}
}