Interface

droolsjbpm-knowledge/kie-api/src/main/java/org/kie/api$ vi runtime/KieSession.java

public interface KieSession
        extends
        StatefulRuleSession,
        StatefulProcessSession,
        CommandExecutor,
        KieRuntime

droolsjbpm-knowledge/kie-api/src/main/java/org/kie/api$ vi runtime/rule/StatefulRuleSession.java

public interface StatefulRuleSession {

    /**
     * Fire all Matches on the Agenda.
     * @return
     *     returns the number of rules fired
     */
    int fireAllRules();

    /**
     * Fire Matches on the Agenda up to the given maximum number of Matches, before returning
     * the control to the application.
     * In case the application wants to continue firing the rules later, from the point where it stopped,
     * it just needs to call <code>fireAllRules()</code> again.
     *
     * @param max
     *     the maximum number of rules that should be fired
     * @return
     *     returns the number of rules fired
     */
    int fireAllRules(int max);

Implements

drools/drools-core/src/main/java/org/drools/core/impl$ vi StatefulKnowledgeSessionImpl.java

public class StatefulKnowledgeSessionImpl extends AbstractRuntime
        implements
        StatefulKnowledgeSession,
        WorkingMemoryEntryPoint,
        InternalKnowledgeRuntime,
        KieSession,
        KieRuntimeEventManager,
        InternalWorkingMemoryActions,
        EventSupport,
        RuleEventManager,
        ProcessEventManager,
        CorrelationAwareProcessRuntime,
        Externalizable

protected InternalAgenda agenda;


private int internalFireAllRules(AgendaFilter agendaFilter, int fireLimit) {
        int fireCount = 0;
        try {
            fireCount = this.agenda.fireAllRules( agendaFilter, fireLimit );
        } finally {
            if (kBase.flushModifications()) {
                fireCount += internalFireAllRules(agendaFilter, fireLimit);
            }
        }
        return fireCount;
    }

drools/drools-core/src/main/java/org/drools/core$ vi common/CompositeDefaultAgenda.java

public class CompositeDefaultAgenda implements Externalizable, InternalAgenda {

...


@Override
    public int fireAllRules( AgendaFilter agendaFilter, int fireLimit ) {
        if (!executionStateMachine.toFireAllRules()) {
            return 0;
        }

        if ( log.isTraceEnabled() ) {
            log.trace("Starting Fire All Rules");
        }

        int fireCount = 0;

        try {
            int iterationFireCount = parallelFire( agendaFilter, fireLimit );
            fireCount += iterationFireCount;
            boolean limitReached = ( fireLimit > 0 && fireCount >= fireLimit );

            while ( iterationFireCount > 0 && !limitReached && hasPendingPropagations() ) {
                iterationFireCount = parallelFire( agendaFilter, fireLimit - fireCount );
                fireCount += iterationFireCount;
                limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
            }
        } finally {
            executionStateMachine.immediateHalt(propagationList);
        }

        if ( log.isTraceEnabled() ) {
            log.trace("Ending Fire All Rules");
        }

        return fireCount;
    }


    private int parallelFire( AgendaFilter agendaFilter, int fireLimit ) {
        CompletableFuture<Integer>[] results = new CompletableFuture[agendas.length-1];
        for (int i = 0; i < results.length; i++) {
            final int j = i;
            results[j] = supplyAsync( () -> agendas[j].internalFireAllRules( agendaFilter, fireLimit, false ), EXECUTOR );
        }

        int result = agendas[agendas.length-1].internalFireAllRules( agendaFilter, fireLimit, false );
        for (int i = 0; i < results.length; i++) {
            result += results[i].join();
        }
        return result;
    }

drools/drools-core/src/main/java/org/drools/core$ vi common/DefaultAgenda.java

int internalFireAllRules( AgendaFilter agendaFilter, int fireLimit, boolean isInternalFire ) {
        return fireLoop( agendaFilter, fireLimit, RestHandler.FIRE_ALL_RULES, isInternalFire );
    }

    private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restHandler, boolean isInternalFire) {
        int fireCount = 0;
        try {
            PropagationEntry head = propagationList.takeAll();
            int returnedFireCount;

            boolean limitReached = fireLimit == 0; // -1 or > 0 will return false. No reason for user to give 0, just handled for completeness.

            // The engine comes to potential rest (inside the loop) when there are no propagations and no rule firings.
            // It's potentially at rest, because we cannot guarantee it is at rest.
            // This is because external async actions (timer rules) can populate the queue that must be executed immediately.
            // A final takeAll within the sync point determines if it can safely come to rest.
            // if takeAll returns null, the engine is now safely at rest. If it returns something
            // the engine is not at rest and the loop continues.
            //         
            // When FireUntilHalt comes to a safe rest, the thread is put into a wait state,
            // when the queue is populated the thread is notified and the loop begins again.
            //
            // When FireAllRules comes to a safe rest it will put the engine into an INACTIVE state
            // and the loop can exit.
            //
            // When a halt() command is added to the propagation queue and that queue is flushed
            // the engine is put into a INACTIVE state. At this point isFiring returns false and
            // no more rules can fire. However the loop will continue until rest point has been safely
            // entered, i.e. the queue returns null within that sync point.
            //
            // The loop is susceptable to never return on extremely greedy behaviour.
            //
            // Note that if a halt() command is given, the engine is changed to INACTIVE,
            // and isFiring returns false allowing it to exit before all rules are fired.
            //
            while ( isFiring()  )  {
                if ( head != null ) {
                    // it is possible that there are no action propagations, but there are rules to fire.
                    propagationList.flush(head);
                    head = null;
                }

                // a halt may have occurred during the flushPropagations,
                // which changes the isFiring state. So a second isFiring guard is needed
                if (!isFiring()) {
                    break;
                }

                evaluateEagerList();
                InternalAgendaGroup group = getNextFocus();
                if ( group != null && !limitReached ) {
                    // only fire rules while the limit has not reached.
                    // if halt is called, then isFiring will be false.
                    // The while loop may continue to loop, to keep flushing the action propagation queue
                    returnedFireCount = ruleEvaluator.evaluateAndFire( agendaFilter, fireCount, fireLimit, group );
                    fireCount += returnedFireCount;

                    limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
                    head = propagationList.takeAll();
                } else {
                    returnedFireCount = 0; // no rules fired this iteration, so we know this is 0
                    group = null; // set the group to null in case the fire limit has been reached
                }

                if ( returnedFireCount == 0 && head == null && ( group == null || !group.isAutoDeactivate() ) && !flushExpirations() ) {
                    // if true, the engine is now considered potentially at rest
                    head = restHandler.handleRest( this, isInternalFire );
                    if (!isInternalFire && head == null) {
                        break;
                    }
                }
            }

            if ( this.focusStack.size() == 1 && this.mainAgendaGroup.isEmpty() ) {
                // the root MAIN agenda group is empty, reset active to false, so it can receive more activations.
                this.mainAgendaGroup.setActive( false );
            }
        } finally {
            // makes sure the engine is inactive, if an exception is thrown.
            // if it safely returns, then the engine should already be inactive
            if (isInternalFire) {
                executionStateMachine.immediateHalt(propagationList);
            }
        }
        return fireCount;
    }

drools/drools-core/src/main/java/org/drools/core$ vi concurrent/SequentialRuleEvaluator.java

public class SequentialRuleEvaluator extends AbstractRuleEvaluator implements RuleEvaluator {

    private final boolean sequential;

    private final KnowledgeHelper knowledgeHelper;

    public SequentialRuleEvaluator( DefaultAgenda agenda ) {
        super(agenda);
        sequential = agenda.getWorkingMemory().getKnowledgeBase().getConfiguration().isSequential();
        knowledgeHelper = newKnowledgeHelper();
    }

    @Override
    public int evaluateAndFire( AgendaFilter filter,
                                int fireCount,
                                int fireLimit,
                                InternalAgendaGroup group ) {
        RuleAgendaItem item = sequential ? (RuleAgendaItem) group.remove() : (RuleAgendaItem) group.peek();
        return item != null ? internalEvaluateAndFire( filter, fireCount, fireLimit, item ) : 0;
    }

    public KnowledgeHelper getKnowledgeHelper() {
        return knowledgeHelper;
    }
}

drools/drools-core/src/main/java/org/drools/core$ vi concurrent/AbstractRuleEvaluator.java

public class AbstractRuleEvaluator {
    private final DefaultAgenda agenda;

    public AbstractRuleEvaluator( DefaultAgenda agenda ) {
        this.agenda = agenda;
    }

    protected int internalEvaluateAndFire( AgendaFilter filter, int fireCount, int fireLimit, RuleAgendaItem item ) {
        agenda.evaluateQueriesForRule( item );
        return item.getRuleExecutor().evaluateNetworkAndFire(agenda, filter, fireCount, fireLimit);
    }

    protected KnowledgeHelper newKnowledgeHelper() {
        RuleBaseConfiguration rbc = agenda.getWorkingMemory().getKnowledgeBase().getConfiguration();
        return rbc.getComponentFactory().getKnowledgeHelperFactory().newStatefulKnowledgeHelper( agenda.getWorkingMemory() );
    }
}

drools/drools-core/src/main/java/org/drools/core$ vi phreak/RuleExecutor.java

drools/drools-core/src/main/java/org/drools/core$ vi phreak/RuleNetworkEvaluator.java --> reteoo

results matching ""

    No results matching ""