零、简要 按照上一篇的介绍,Quartz是个作业调度系统,核心组件有Job/Trigger/Scheduler等,如何使用Quartz我们已经知晓,此次细心看看其内部的代码逻辑。
一、核心部件 Quartz内部部件关系如图所示 一个Job对应多个Trigger,每个Trigger被触发,即代表着Job执行一次。
1.Scheduler:任务调度器,是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。
2.Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多。CronTrigger在spring中封装在CronTriggerFactoryBean中。
3.JobDetail:用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。
4.Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(以前是实现StatefulJob接口,现在已被Deprecated),在与spring结合中可以在spring配置文件的job detail中配置concurrent参数。
二、Scheduler 在Quartz内部,每个调度器Scheduler有一个负责调度的线程QuartzSchedulerThread,该线程启动后不断地抓取即将触发的Trigger,做了一番封装后,放入负责执行的线程池ThreadPool,大致逻辑如下时序图:
2.1 时序图中间的QuartzSchedulerThread是最核心的部分,它撑起了整个Quartz的运行,看看该线程的启动逻辑:
(代码段1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void run () { while (!halted.get()) { try { synchronized (sigLock) { while (paused && !halted.get()) { try { sigLock.wait(1000L ); } catch (InterruptedException ignore) { } } if (halted.get()) { break ; } } } catch (RuntimeException re) { } } }
调度线程在最开始的时候已经生成并启动,一直处于等待状态,直到QuartzScheduler调用了schedThread.togglePause(false),将paused设置为false,调度线程才启动。
2.2 schedThread.togglePause(false)方法如以下代码:
(代码段2)
1 2 3 4 5 6 7 8 9 10 11 12 void togglePause (boolean pause) { synchronized (sigLock) { paused = pause; if (paused) { signalSchedulingChange(0 ); } else { sigLock.notifyAll(); } } }
2.3 调度线程开始干活,第一步,是查看是否有空闲线程可以用来触发job,具体是调用ThreadPool.blockForAvailableThreads()
(代码段3)
1 2 3 4 5 6 7 8 9 10 11 12 13 public int blockForAvailableThreads () { synchronized (nextRunnableLock) { while ((availWorkers.size() < 1 || handoffPending) && !isShutdown) { try { nextRunnableLock.wait(500 ); } catch (InterruptedException ignore) { } } return availWorkers.size(); } }
2.4 调度线程获知有空闲线程,它会从JobStore中获取最近的Trigger,并进入等待,直到触发时间的到来
(代码段4)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 try { trigger = qsRsrcs.getJobStore().acquireNextTrigger( ctxt, now + idleWaitTime ); lastAcquireFailed = false ; } catch ( JobPersistenceException jpe ) { } catch ( RuntimeException e ) { } if ( trigger != null ){ now = System.currentTimeMillis(); long triggerTime = trigger.getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while ( timeUntilTrigger > 2 ) { synchronized (sigLock) { if ( !isCandidateNewTimeEarlierWithinReason( triggerTime, false ) ) { try { now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if ( timeUntilTrigger >= 1 ) sigLock.wait( timeUntilTrigger ); } catch ( InterruptedException ignore ) { } } } } }
2.5 触发时间到来,调度线程先是将Trigger和它指向的Job包装成一个RunShell(安全壳)
(代码段5)
1 2 3 4 5 6 7 8 JobRunShell shell = null ;try { shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell(); shell.initialize( qs, bndle ); } catch ( SchedulerException se ) { }
2.6 初始化Shell的逻辑如下:
(代码段6)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void initialize (QuartzScheduler qs, TriggerFiredBundle firedBundle) throws SchedulerException { this .qs = qs; Job job = null ; JobDetail jobDetail = firedBundle.getJobDetail(); try { job = qs.getJobFactory().newJob(firedBundle); } catch (SchedulerException se) { throw se; } catch (Throwable ncdfe) { throw se; } this .jec = new JobExecutionContext (scheduler, firedBundle, job); }
2.7 调度线程得到了RunShell,后者实现了Runnable接口,可以让线程去执行,所以下一步是放入线程池,让线程池执行Job,调用ThreadPool().runInThread(shell)
(代码段7)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public boolean runInThread (Runnable runnable) { if (runnable == null ) { return false ; } synchronized (nextRunnableLock) { handoffPending = true ; while ((availWorkers.size() < 1 ) && !isShutdown) { try { nextRunnableLock.wait(500 ); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { WorkerThread wt = new WorkerThread (this , threadGroup, "WorkerThread-LastJob" , prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false ; } return true ; }
2.8 RunShell实现了Runnable接口,它负责触发Job的执行,记录事务,记录Job执行的全程,通知各种触发器等
(代码段8)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 public void run () { try { scheduler.addSchedulerListener(this ); } catch (SchedulerException ignore) { } try { Trigger trigger = jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { JobExecutionException jobExEx = null ; Job job = jec.getJobInstance(); try { begin(); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getFullName() + ": couldn't begin execution." , se); break ; } try { if (!notifyListenersBeginning(jec)) { break ; } } catch (VetoedException ve) { try { int instCode = trigger.executionComplete(jec, null ); try { qs.notifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode); } catch (JobPersistenceException jpe) { vetoedJobRetryLoop(trigger, jobDetail, instCode); } if (jec.getTrigger().getNextFireTime() == null ) { qs.notifySchedulerListenersFinalized(jec.getTrigger()); } complete(true ); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getFullName() + ": couldn't finalize execution." , se); } break ; } long startTime = System.currentTimeMillis(); long endTime = startTime; try { log.debug("Calling execute on job " + jobDetail.getFullName()); job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getFullName() + " threw a JobExecutionException: " , jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getFullName() + " threw an unhandled Exception: " , e); SchedulerException se = new SchedulerException ( "Job threw an unhandled exception." , e); se.setErrorCode(SchedulerException.ERR_JOB_EXECUTION_THREW_EXCEPTION); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getFullName() + " threw an exception." , se); jobExEx = new JobExecutionException (se, false ); jobExEx.setErrorCode(JobExecutionException.ERR_JOB_EXECUTION_THREW_EXCEPTION); } jec.setJobRunTime(endTime - startTime); if (!notifyJobListenersComplete(jec, jobExEx)) { break ; } int instCode = Trigger.INSTRUCTION_NOOP; try { instCode = trigger.executionComplete(jec, jobExEx); } catch (Exception e) { SchedulerException se = new SchedulerException ( "Trigger threw an unhandled exception." , e); se.setErrorCode(SchedulerException.ERR_TRIGGER_THREW_EXCEPTION); qs.notifySchedulerListenersError( "Please report this error to the Quartz developers." , se); } if (!notifyTriggerListenersComplete(jec, instCode)) { break ; } if (instCode == Trigger.INSTRUCTION_RE_EXECUTE_JOB) { jec.incrementRefireCount(); try { complete(false ); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getFullName() + ": couldn't finalize execution." , se); } continue ; } try { complete(true ); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getFullName() + ": couldn't finalize execution." , se); continue ; } try { qs.notifyJobStoreJobComplete(schdCtxt, trigger, jobDetail, instCode); } catch (JobPersistenceException jpe) { qs.notifySchedulerListenersError( "An error occured while marking executed job complete. job= '" + jobDetail.getFullName() + "'" , jpe); if (!completeTriggerRetryLoop(trigger, jobDetail, instCode)) { return ; } } break ; } while (true ); } finally { try { scheduler.removeSchedulerListener(this ); } catch (SchedulerException e) { } jobRunShellFactory.returnJobRunShell(this ); } }
二、Job 2.1 Job的保存 对于用户来说,在新建Job类的基础上,每new一个JobDetail,都代表着建立了一个Job实例,比如
(代码段10)
1 2 3 4 5 6 7 JobDetail jobDetail = new JobDetail ("qyc_job_detail" , QycJob.class);Trigger tenSecondTrigger = new SimpleTrigger ("tenSecondTrigger" , 10000 , 10000 );scheduler.scheduleJob(jobDetail, tenSecondTrigger);
最后一行的scheduler.scheduleJob,进一步调用JobStore.storeJobAndTrigger,将JobDetail和Trigger存放进JobStore之中。
在这引出了JobStore的概念,看一下它的官方定义: The interface to be implemented by classes that want to provide a org.quartz.Job and org.quartz.Trigger storage mechanism for the org.quartz.core.QuartzScheduler’s use。非常的简明概要,用来保存JobStore和Trigger,一个调度器有一个JobStore。
JobStore有三种实现,RAMJobStore、JDBC-JobStoreTX、JDBC-JobStoreCMT。
2.1.1 RAMJobStore RAMJobStore用于存储内存中的调度信息(jobs,Triggers和日历)。RAMJobStore快速轻便,但是当进程终止时,所有调度信息都会丢失。
RAMJobStore存取JobDetail和Trigger的时候,使用synchronized加锁,如下面所示
(代码段11)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void storeTrigger (SchedulingContext ctxt, Trigger newTrigger, boolean replaceExisting) throws JobPersistenceException { TriggerWrapper tw = new TriggerWrapper ((Trigger)newTrigger.clone()); synchronized (lock) { …… triggersByFQN.put(tw.key, tw); if (pausedTriggerGroups.contains(newTrigger.getGroup()) || pausedJobGroups.contains(newTrigger.getJobGroup())) { tw.state = TriggerWrapper.STATE_PAUSED; if (blockedJobs.contains(tw.jobKey)) { tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED; } } else if (blockedJobs.contains(tw.jobKey)) { tw.state = TriggerWrapper.STATE_BLOCKED; } else { timeTriggers.add(tw); } } }
2.1.2 JDBC-JobStore JDBCJobStore用于在关系数据库中存储调度信息(jobs,Triggers和日历),这意味着所有数据都会落地,系统崩溃后重新启动可以恢复Job和Trigger的运行。在Quartz中,JDBCJobStore的具体实现是JobStoreSupport,JobStoreTX和JobStoreCMT都继承自它。
JobStoreSupport中保存了DataSource、数据库表名、数据库重试时长和具体的锁类型等消息。
JobStoreSupport既支持数据库锁,也在内部实现了一个简单的信号量锁(将锁资源放在threadlocal中,若申请不到锁资源,则一直wait)。
a) 信号量锁的实现: (代码段12)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public synchronized boolean obtainLock (Connection conn, String lockName) { lockName = lockName.intern(); Logger log = getLog(); if (!isLockOwner(conn, lockName)) { while (locks.contains(lockName)) { try { this .wait(); } catch (InterruptedException ie) { } } } getThreadLocks().add(lockName); locks.add(lockName); } else if (log.isDebugEnabled()) { } return true ; } public synchronized void releaseLock (Connection conn, String lockName) { lockName = lockName.intern(); if (isLockOwner(conn, lockName)) { getThreadLocks().remove(lockName); locks.remove(lockName); this .notifyAll(); } else if (getLog().isDebugEnabled()) { } }
b) 数据库锁实现 数据库锁的原理如下:
1.在执行某种需要加锁的『关键的逻辑』之前,先启用一个事务,并执行一个需要数据库加锁的语句(比如for update或update),令当前线程得到数据库锁。
2.线程得到数据库锁,开始执行『关键的逻辑』。
3.线程执行完毕『关键的逻辑』,若一切正常,则提交事务,否则回滚事务,线程失去数据库锁。
具体的代码如下:
(代码段13)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public boolean obtainLock (Connection conn, String lockName) throws LockException { lockName = lockName.intern(); Logger log = getLog(); if (!isLockOwner(conn, lockName)) { executeSQL(conn, lockName, expandedSQL); getThreadLocks().add(lockName); } else if (log.isDebugEnabled()) { } return true ; } public void releaseLock (Connection conn, String lockName) { lockName = lockName.intern(); if (isLockOwner(conn, lockName)) { getThreadLocks().remove(lockName); } else if (getLog().isDebugEnabled()) { getLog().warn( "Lock '" + lockName + "' attempt to return by: " + Thread.currentThread().getName() + " -- but not owner!" , new Exception ("stack-trace of wrongful returner" )); } }
执行sql获取锁的代码逻辑则是在行锁StdRowLockSemaphore和更新锁UpdateLockRowSemaphore,MySQL锁介绍可看MySQL innodb中各种SQL语句加锁分析
行锁信号量StdRowLockSemaphore使用for update语句获取MySQL锁,代码如下
(代码段14)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static final String SELECT_FOR_LOCK = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_LOCK_NAME + " = ? FOR UPDATE" ; protected void executeSQL (Connection conn, String lockName, String expandedSQL) throws LockException { PreparedStatement ps = null ; ResultSet rs = null ; try { ps = conn.prepareStatement(expandedSQL); ps.setString(1 , lockName); rs = ps.executeQuery(); if (!rs.next()) { throw new SQLException (); } } catch (SQLException sqle) { throw new LockException ("Failure obtaining db row lock: " + sqle.getMessage(), sqle); } finally { …… } }
更新锁UpdateLockRowSemaphore执行Update语句来获取MySQL锁,Quartz之所以在行锁的基础上,额外又实现更新锁,是因为部分数据库不支持for update,比如MSSQL。更新锁UpdateLockRowSemaphore执行的sql语句如下:
(代码段15)
1 2 3 4 public static final String UPDATE_FOR_LOCK = "UPDATE " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " SET " + COL_LOCK_NAME + " = " + COL_LOCK_NAME + " WHERE " + COL_LOCK_NAME + " = ? " ;
2.1.3 JDBC-JobStoreTX JobStoreTX自身使用数据库的事务,不依赖spring等『容器』的事务,它独立于『容器』。 实际上,JobStoreTX的逻辑和JobStoreSupport一致,在保存Job等资料的时候,是放在事务中进行的,如下:
(代码段16)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 protected Object executeInNonManagedTXLock ( String lockName, TransactionCallback txCallback) throws JobPersistenceException { boolean transOwner = false ; Connection conn = null ; try { if (lockName != null ) { if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null ) { conn = getNonManagedTXConnection(); } Object result = txCallback.execute(conn); commitConnection(conn); return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException ("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { releaseLock(conn, lockName, transOwner); } finally { cleanupConnection(conn); } } }
保存Job的代码如下:
(代码段17)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected void storeJob (Connection conn, SchedulingContext ctxt, JobDetail newJob, boolean replaceExisting) throws ObjectAlreadyExistsException, JobPersistenceException { boolean existingJob = jobExists(conn, newJob.getName(), newJob .getGroup()); try { if (existingJob) { if (!replaceExisting) { throw new ObjectAlreadyExistsException (newJob); } getDelegate().updateJobDetail(conn, newJob); } else { getDelegate().insertJobDetail(conn, newJob); } } catch (IOException e) { throw new JobPersistenceException ("Couldn't store job: " + e.getMessage(), e); } catch (SQLException e) { throw new JobPersistenceException ("Couldn't store job: " + e.getMessage(), e); } }
2.1.4 JDBC-JobStoreCMT 和JobStoreTX相比,JobStoreCMT使用容器内的事务管理器,意味着它不能自己调用connection的commit或rollback,需要交给容器的事务管理器,这一点从JobStoreCMT的加锁执行方法executeInLock可以看出来:
(代码段18)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 protected Object executeInLock ( String lockName, TransactionCallback txCallback) throws JobPersistenceException { boolean transOwner = false ; Connection conn = null ; try { if (lockName != null ) { if (getLockHandler().requiresConnection()) { conn = getConnection(); } transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null ) { conn = getConnection(); } return txCallback.execute(conn); } finally { try { releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner); } finally { cleanupConnection(conn); } } }
2.2 如何扫描出到时间的job 对任何的job来说,只有指向它的触发器Trigger到时间了,job才会被执行。简单地说,job什么时候执行,由触发器Trigger决定。
2.3 如何调度job 类似上面一条,在Quartz中,调度器Scheduler调度的对象是触发器Trigger,调度器不关心job处于何种状态。
2.4 如何执行job 当调度器Scheduler发现Trigger到了触发时间,会触发Trigger,连带着执行Trigger指向的job
2.6 触发器和job的关系 Trigger和Job是多对一的关系,一个Trigger只能对应一个Job,一个Job可以对应多个Trigger。
2.7 如何恢复崩溃的job job在执行过程中,如果发生了异常,job的execute被中断,Quartz根据异常中的code判断是否需要重新启动job,代码如下:
(代码段19)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 try { job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getFullName() + " threw a JobExecutionException: " , jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getFullName() + " threw an unhandled Exception: " , e); SchedulerException se = new SchedulerException ( "Job threw an unhandled exception." , e); se.setErrorCode(SchedulerException.ERR_JOB_EXECUTION_THREW_EXCEPTION); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getFullName() + " threw an exception." , se); jobExEx = new JobExecutionException (se, false ); jobExEx.setErrorCode(JobExecutionException.ERR_JOB_EXECUTION_THREW_EXCEPTION); } int instCode = Trigger.INSTRUCTION_NOOP;try { instCode = trigger.executionComplete(jec, jobExEx); } catch (Exception e) { }
三、触发器 3.1 如何扫描出到时间的trigger Trigger和Job一样,存放在JobStore之中,调度器扫描最近的触发器时,委托JobStore去查找,以RAMJobStore为例,它的acquireNextTrigger方法代码如下:
(代码段20)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public Trigger acquireNextTrigger (SchedulingContext ctxt, long noLaterThan) { TriggerWrapper tw = null ; synchronized (lock) { while (tw == null ) { try { tw = (TriggerWrapper) timeTriggers.first(); } catch (java.util.NoSuchElementException nsee) { return null ; } if (tw == null ) { return null ; } … tw.state = TriggerWrapper.STATE_ACQUIRED; tw.trigger.setFireInstanceId(getFiredTriggerRecordId()); Trigger trig = (Trigger) tw.trigger.clone(); return trig; } } return null ; }
3.2 如何暂停、如何重启 暂停一个job,本质上仅是暂停它的Trigger,重启同理。
(代码段21)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void pauseTrigger (Connection conn, SchedulingContext ctxt, String triggerName, String groupName) throws JobPersistenceException { try { String oldState = getDelegate().selectTriggerState(conn, triggerName, groupName); if (oldState.equals(STATE_WAITING) || oldState.equals(STATE_ACQUIRED)) { getDelegate().updateTriggerState(conn, triggerName, groupName, STATE_PAUSED); } else if (oldState.equals(STATE_BLOCKED)) { getDelegate().updateTriggerState(conn, triggerName, groupName, STATE_PAUSED_BLOCKED); } } catch (SQLException e) { throw new JobPersistenceException ("Couldn't pause trigger '" + groupName + "." + triggerName + "': " + e.getMessage(), e); } }
四、集群部署 4.1 集群部署原理 Quartz中,独立的节点并不与其它节点进行通行,而是通过数据库中的数据感知别的节点。每个节点有个ClusterManager,它是个独立的线程,定时到数据库中『签到』,如果发现别的调度器很久没来签到,则将其视为已故障,并重新恢复故障调度器的触发器。
节点『签到』的实现:
(代码段22)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected List clusterCheckIn (Connection conn) throws JobPersistenceException { List failedInstances = findFailedInstances(conn); try { lastCheckin = System.currentTimeMillis(); if (getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0 ) { getDelegate().insertSchedulerState(conn, getInstanceId(), lastCheckin, getClusterCheckinInterval()); } } catch (Exception e) { } return failedInstances; }
判断集群故障的逻辑:
(代码段23)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for (Iterator itr = states.iterator(); itr.hasNext();) { SchedulerStateRecord rec = (SchedulerStateRecord) itr.next(); if (rec.getSchedulerInstanceId().equals(getInstanceId())) { foundThisScheduler = true ; if (firstCheckIn) { failedInstances.add(rec); } } else { if (calcFailedIfAfter(rec) < timeNow) { failedInstances.add(rec); } } }
恢复其它集群触发器的逻辑:
(代码段24)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 protected void clusterRecover (Connection conn, List failedInstances) throws JobPersistenceException { if (failedInstances.size() > 0 ) { long recoverIds = System.currentTimeMillis(); logWarnIfNonZero(failedInstances.size(), "ClusterManager: detected " + failedInstances.size() + " failed or restarted instances." ); try { Iterator itr = failedInstances.iterator(); while (itr.hasNext()) { SchedulerStateRecord rec = (SchedulerStateRecord) itr .next(); List firedTriggerRecs = getDelegate() .selectInstancesFiredTriggerRecords(conn, rec.getSchedulerInstanceId()); Set triggerKeys = new HashSet (); Iterator ftItr = firedTriggerRecs.iterator(); while (ftItr.hasNext()) { FiredTriggerRecord ftRec = (FiredTriggerRecord) ftItr .next(); Key tKey = ftRec.getTriggerKey(); Key jKey = ftRec.getJobKey(); triggerKeys.add(tKey); if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) { getDelegate().updateTriggerStateFromOtherState( conn, tKey.getName(), tKey.getGroup(), STATE_WAITING, STATE_ACQUIRED); acquiredCount++; } else if (ftRec.isJobRequestsRecovery()) { if (jobExists(conn, jKey.getName(), jKey.getGroup())) { SimpleTrigger rcvryTrig = new SimpleTrigger ( "recover_" + rec.getSchedulerInstanceId() + "_" + String.valueOf(recoverIds++), Scheduler.DEFAULT_RECOVERY_GROUP, new Date (ftRec.getFireTimestamp())); rcvryTrig.setVolatility(ftRec.isTriggerIsVolatile()); rcvryTrig.setJobName(jKey.getName()); rcvryTrig.setJobGroup(jKey.getGroup()); rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW); rcvryTrig.setPriority(ftRec.getPriority()); JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp())); rcvryTrig.setJobDataMap(jd); rcvryTrig.computeFirstFireTime(null ); storeTrigger(conn, null , rcvryTrig, null , false , STATE_WAITING, false , true ); recoveredCount++; } else { } } else { otherCount++; } } getDelegate().deleteFiredTriggers(conn, rec.getSchedulerInstanceId()); } } catch (Exception e) { throw new JobPersistenceException ("Failure recovering jobs: " + e.getMessage(), e); } } }