<bean id="quartzExceptionSchedulerListener" class="com.***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean> <!-- 配置器 --> <property name="schedulerListeners"> <list> <ref bean="quartzExceptionSchedulerListener"/> </list> </property>
quartz与线程
主处理线程:QuartzSchedulerThread
启动Scheduler时。QuartzScheduler被创建并创建一个org.quartz.core.QuartzSchedulerThread 类的实例。
QuartzSchedulerThread 包含有决定何时下一个Job将被触发的处理循环。QuartzSchedulerThread 是一个 Java 线程。它作为一个非守护线程运行在正常优先级下。
QuartzSchedulerThread 的主处理轮循步骤:
1. 当 Scheduler 正在运行时:
A. 检查是否有转换为 standby 模式的请求。
1. 假如 standby 方法被调用,等待继续的信号
B. 询问 JobStore 下次要被触发的 Trigger.
1. 如果没有 Trigger 待触发,等候一小段时间后再次检查
2. 假如有一个可用的 Trigger,等待触发它的确切时间的到来
D. 时间到了,为 Trigger 获取到 triggerFiredBundle.
E. 使用Scheduler和triggerFiredBundle 为 Job 创建一个JobRunShell实例
F. 在ThreadPool 申请一个线程运行 JobRunShell 实例.
代码逻辑在QuartzSchedulerThread 的 run() 中,如下:
/**
* QuartzSchedulerThread.run
* <p>
* The main processing loop of the <code>QuartzSchedulerThread</code>.
* </p>
*/
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (si) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
si.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availTreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
Trigger trigger = null;
long now = System.currentTimeMillis();
clearSignaledSchengChange();
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
}
if (trigger != null) {
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized(si) {
if(!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
si.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(trigger, triggerTime)) {
trigger = null;
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
if(trigger == null)
continue;
// set trigger to 'executing'
TriggerFiredBundle bndle = null;
boolean goAhead = true;
synchronized(si) {
goAhead = !halted.get();
}
if(goAhead) {
try {
bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while firing trigger '"
+ trigger.getFullName() + "'", se);
} catch (RuntimeException e) {
getLog().error(
"RuntimeException while firing trigger " +
trigger.getFullName(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
}
// it's possible to get 'null' if the trigger was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
trigger);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'", se);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
continue;
}
// TODO: improvements:
//
// 2- make sure we can get a job runshell before firing trigger, or
// don't let that throw an exception (right now it never does,
// but the signature says it can).
// 3- acquire more triggers at a time (based on num threads available?)
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
}
continue;
}
} else { // if(availTreadCount > 0)
continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(si) {
try {
si.wait(timeUntilContinue);
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occured in main trigger firing loop.", re);
}
} // loop...
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
本文来自电脑杂谈,转载请注明本文网址:
http://www.pc-fly.com/a/jisuanjixue/article-29777-12.html
轮番到南海岛礁周边区域作业