Interface
droolsjbpm-knowledge/kie-api/src/main/java/org/kie/api$ vi runtime/KieSession.java
public interface KieSession
extends
StatefulRuleSession,
StatefulProcessSession,
CommandExecutor,
KieRuntime
droolsjbpm-knowledge/kie-api/src/main/java/org/kie/api$ vi runtime/rule/StatefulRuleSession.java
public interface StatefulRuleSession {
/**
* Fire all Matches on the Agenda.
* @return
* returns the number of rules fired
*/
int fireAllRules();
/**
* Fire Matches on the Agenda up to the given maximum number of Matches, before returning
* the control to the application.
* In case the application wants to continue firing the rules later, from the point where it stopped,
* it just needs to call <code>fireAllRules()</code> again.
*
* @param max
* the maximum number of rules that should be fired
* @return
* returns the number of rules fired
*/
int fireAllRules(int max);
Implements
drools/drools-core/src/main/java/org/drools/core/impl$ vi StatefulKnowledgeSessionImpl.java
public class StatefulKnowledgeSessionImpl extends AbstractRuntime
implements
StatefulKnowledgeSession,
WorkingMemoryEntryPoint,
InternalKnowledgeRuntime,
KieSession,
KieRuntimeEventManager,
InternalWorkingMemoryActions,
EventSupport,
RuleEventManager,
ProcessEventManager,
CorrelationAwareProcessRuntime,
Externalizable
protected InternalAgenda agenda;
private int internalFireAllRules(AgendaFilter agendaFilter, int fireLimit) {
int fireCount = 0;
try {
fireCount = this.agenda.fireAllRules( agendaFilter, fireLimit );
} finally {
if (kBase.flushModifications()) {
fireCount += internalFireAllRules(agendaFilter, fireLimit);
}
}
return fireCount;
}
drools/drools-core/src/main/java/org/drools/core$ vi common/CompositeDefaultAgenda.java
public class CompositeDefaultAgenda implements Externalizable, InternalAgenda {
...
@Override
public int fireAllRules( AgendaFilter agendaFilter, int fireLimit ) {
if (!executionStateMachine.toFireAllRules()) {
return 0;
}
if ( log.isTraceEnabled() ) {
log.trace("Starting Fire All Rules");
}
int fireCount = 0;
try {
int iterationFireCount = parallelFire( agendaFilter, fireLimit );
fireCount += iterationFireCount;
boolean limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
while ( iterationFireCount > 0 && !limitReached && hasPendingPropagations() ) {
iterationFireCount = parallelFire( agendaFilter, fireLimit - fireCount );
fireCount += iterationFireCount;
limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
}
} finally {
executionStateMachine.immediateHalt(propagationList);
}
if ( log.isTraceEnabled() ) {
log.trace("Ending Fire All Rules");
}
return fireCount;
}
private int parallelFire( AgendaFilter agendaFilter, int fireLimit ) {
CompletableFuture<Integer>[] results = new CompletableFuture[agendas.length-1];
for (int i = 0; i < results.length; i++) {
final int j = i;
results[j] = supplyAsync( () -> agendas[j].internalFireAllRules( agendaFilter, fireLimit, false ), EXECUTOR );
}
int result = agendas[agendas.length-1].internalFireAllRules( agendaFilter, fireLimit, false );
for (int i = 0; i < results.length; i++) {
result += results[i].join();
}
return result;
}
drools/drools-core/src/main/java/org/drools/core$ vi common/DefaultAgenda.java
int internalFireAllRules( AgendaFilter agendaFilter, int fireLimit, boolean isInternalFire ) {
return fireLoop( agendaFilter, fireLimit, RestHandler.FIRE_ALL_RULES, isInternalFire );
}
private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restHandler, boolean isInternalFire) {
int fireCount = 0;
try {
PropagationEntry head = propagationList.takeAll();
int returnedFireCount;
boolean limitReached = fireLimit == 0; // -1 or > 0 will return false. No reason for user to give 0, just handled for completeness.
// The engine comes to potential rest (inside the loop) when there are no propagations and no rule firings.
// It's potentially at rest, because we cannot guarantee it is at rest.
// This is because external async actions (timer rules) can populate the queue that must be executed immediately.
// A final takeAll within the sync point determines if it can safely come to rest.
// if takeAll returns null, the engine is now safely at rest. If it returns something
// the engine is not at rest and the loop continues.
//
// When FireUntilHalt comes to a safe rest, the thread is put into a wait state,
// when the queue is populated the thread is notified and the loop begins again.
//
// When FireAllRules comes to a safe rest it will put the engine into an INACTIVE state
// and the loop can exit.
//
// When a halt() command is added to the propagation queue and that queue is flushed
// the engine is put into a INACTIVE state. At this point isFiring returns false and
// no more rules can fire. However the loop will continue until rest point has been safely
// entered, i.e. the queue returns null within that sync point.
//
// The loop is susceptable to never return on extremely greedy behaviour.
//
// Note that if a halt() command is given, the engine is changed to INACTIVE,
// and isFiring returns false allowing it to exit before all rules are fired.
//
while ( isFiring() ) {
if ( head != null ) {
// it is possible that there are no action propagations, but there are rules to fire.
propagationList.flush(head);
head = null;
}
// a halt may have occurred during the flushPropagations,
// which changes the isFiring state. So a second isFiring guard is needed
if (!isFiring()) {
break;
}
evaluateEagerList();
InternalAgendaGroup group = getNextFocus();
if ( group != null && !limitReached ) {
// only fire rules while the limit has not reached.
// if halt is called, then isFiring will be false.
// The while loop may continue to loop, to keep flushing the action propagation queue
returnedFireCount = ruleEvaluator.evaluateAndFire( agendaFilter, fireCount, fireLimit, group );
fireCount += returnedFireCount;
limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
head = propagationList.takeAll();
} else {
returnedFireCount = 0; // no rules fired this iteration, so we know this is 0
group = null; // set the group to null in case the fire limit has been reached
}
if ( returnedFireCount == 0 && head == null && ( group == null || !group.isAutoDeactivate() ) && !flushExpirations() ) {
// if true, the engine is now considered potentially at rest
head = restHandler.handleRest( this, isInternalFire );
if (!isInternalFire && head == null) {
break;
}
}
}
if ( this.focusStack.size() == 1 && this.mainAgendaGroup.isEmpty() ) {
// the root MAIN agenda group is empty, reset active to false, so it can receive more activations.
this.mainAgendaGroup.setActive( false );
}
} finally {
// makes sure the engine is inactive, if an exception is thrown.
// if it safely returns, then the engine should already be inactive
if (isInternalFire) {
executionStateMachine.immediateHalt(propagationList);
}
}
return fireCount;
}
drools/drools-core/src/main/java/org/drools/core$ vi concurrent/SequentialRuleEvaluator.java
public class SequentialRuleEvaluator extends AbstractRuleEvaluator implements RuleEvaluator {
private final boolean sequential;
private final KnowledgeHelper knowledgeHelper;
public SequentialRuleEvaluator( DefaultAgenda agenda ) {
super(agenda);
sequential = agenda.getWorkingMemory().getKnowledgeBase().getConfiguration().isSequential();
knowledgeHelper = newKnowledgeHelper();
}
@Override
public int evaluateAndFire( AgendaFilter filter,
int fireCount,
int fireLimit,
InternalAgendaGroup group ) {
RuleAgendaItem item = sequential ? (RuleAgendaItem) group.remove() : (RuleAgendaItem) group.peek();
return item != null ? internalEvaluateAndFire( filter, fireCount, fireLimit, item ) : 0;
}
public KnowledgeHelper getKnowledgeHelper() {
return knowledgeHelper;
}
}
drools/drools-core/src/main/java/org/drools/core$ vi concurrent/AbstractRuleEvaluator.java
public class AbstractRuleEvaluator {
private final DefaultAgenda agenda;
public AbstractRuleEvaluator( DefaultAgenda agenda ) {
this.agenda = agenda;
}
protected int internalEvaluateAndFire( AgendaFilter filter, int fireCount, int fireLimit, RuleAgendaItem item ) {
agenda.evaluateQueriesForRule( item );
return item.getRuleExecutor().evaluateNetworkAndFire(agenda, filter, fireCount, fireLimit);
}
protected KnowledgeHelper newKnowledgeHelper() {
RuleBaseConfiguration rbc = agenda.getWorkingMemory().getKnowledgeBase().getConfiguration();
return rbc.getComponentFactory().getKnowledgeHelperFactory().newStatefulKnowledgeHelper( agenda.getWorkingMemory() );
}
}
drools/drools-core/src/main/java/org/drools/core$ vi phreak/RuleExecutor.java
drools/drools-core/src/main/java/org/drools/core$ vi phreak/RuleNetworkEvaluator.java --> reteoo