Constraint streams score calculation
Constraint streams are a Functional Programming form of incremental score calculation in plain Python that is easy to read, write and debug. The API should feel familiar if you’re familiar with SQL.
1. Introduction
Using Python’s iterable operations, we could implement an easy score calculator that uses a functional approach:
def do_not_assign_ann(schedule):
soft_score = 0
for shift in schedule.shift_list:
if shift.employee == 'Ann':
soft_score -= 1
return soft_score
However, that scales poorly because it doesn’t do an incremental calculation:
When the planning variable of a single Shift
changes, to recalculate the score,
the function has to execute the entire stream from scratch.
The ConstraintStreams API enables you to write similar code in pure Python, while reaping the performance benefits of
incremental score calculation and having built-in constraint justifications.
This is an example of the same code, using the Constraint Streams API:
from optapy.score import HardSoftScore
from optapy.constraint import ConstraintFactory
def do_not_assign_ann(factory : ConstraintFactory):
return factory.for_each(Shift) \
.filter(lambda shift: shift.employee == 'Ann') \
.penalize("Don't assign Ann", HardSoftScore.ONE_SOFT)
This constraint stream iterates over all instances of class Shift
in the problem facts and
planning entities in the planning problem.
It finds every Shift
which is assigned to employee "Ann" and for every such instance (also called a match), it adds a
soft penalty of 1
to the overall score.
The following figure illustrates this process on a problem with 4 different shifts:
If any of the instances change during solving, the constraint stream automatically detects the change and only recalculates the minimum necessary portion of the problem that is affected by the change. The following figure illustrates this incremental score calculation:
If constraint matches are enabled such as when explaining the score, it also generate constraint matches for each tuple penalized or rewarded in the stream:
2. Creating a constraint stream
To use the ConstraintStreams API in your project, first write a pure Python function that take a ConstraintFactory
as its only argument and return a list of Constraint
generated from that ConstraintFactory
similar to the following example, and decorate it with `@constraint_provider.
from optapy import constraint_provider
from optapy.score import HardSoftScore
from optapy.constraint import ConstraintFactory
@constraint_provider
def define_constraints(factory : ConstraintFactory):
return [
penalize_every_shift(factory)
]
def penalize_every_shift(factory : ConstraintFactory):
return factory.for_each(Shift) \
.penalize("Penalize a shift", HardSoftScore.ONE_SOFT)
This example contains one constraint, |
Add the following code to your solver configuration:
<solver xmlns="https://www.optaplanner.org/xsd/solver" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.optaplanner.org/xsd/solver https://www.optaplanner.org/xsd/solver/solver.xsd">
<scoreDirectorFactory>
<constraintProviderClass>define_constraints</constraintProviderClass>
</scoreDirectorFactory>
...
</solver>
3. Constraint stream cardinality
Constraint stream cardinality is a measure of how many objects a single constraint match consists of.
The simplest constraint stream has a cardinality of 1, meaning each constraint match only consists of 1 object.
Therefore, it is called a UniConstraintStream
:
def do_not_assign_ann(factory : ConstraintFactory):
return factory.for_each(Shift) \
# // Returns UniStream[Shift]
...
Some constraint stream building blocks can increase stream cardinality, such as join or group_by:
def do_not_assign_ann(factory: ConstraintFactory):
return (factory.for_each(Shift) # Returns Uni[Shift].
.join(Employee) # Returns Bi[Shift, Employee].
.join(DayOff) # Returns Tri[Shift, Employee, DayOff].
.join(Country) # Returns Quad[Shift, Employee, DayOff, Country].
)
...
The latter can also decrease stream cardinality:
def do_not_assign_ann(factory: ConstraintFactory):
return (factory.for_each(Shift) # Returns UniStream[Shift].
.join(Employee) # Returns BiStream[Shift, Employee].
.group_by(lambda shift, employee: employee) # Returns UniStream[Employee].
)
...
The following constraint stream cardinalities are currently supported:
Cardinality |
Prefix |
Defining interface |
1 |
Uni |
|
2 |
Bi |
|
3 |
Tri |
|
4 |
Quad |
|
3.1. Achieving higher cardinalities
OptaPy currently does not support constraint stream cardinalities higher than 4. However, with tuple mapping effectively infinite cardinality is possible:
def penta_stream_example(factory: ConstraintFactory):
return (factory.for_each(Shift) # UniConstraintStream[Shift]
.join(Shift) # BiConstraintStream[Shift, Shift]
.join(Shift) # TriConstraintStream[Shift, Shift, Shift]
.join(Shift) # QuadConstraintStream[Shift, Shift, Shift, Shift]
.map(tuple) # UniConstraintStream[Tuple[Shift, Shift, Shift, Shift]]
.join(Shift) # BiConstraintStream[Tuple[Shift, Shift, Shift, Shift], Shift]
# This BiConstraintStream carries 5 Shift elements.
)
...
OptaPy does not provide any |
4. Building blocks
Constraint streams are chains of different operations, called building blocks.
Each constraint stream starts with a for_each(…)
building block and is terminated by either a penalty or a reward.
The following example shows the simplest possible constraint stream:
def penalize_initialized_shifts(factory: ConstraintFactory):
return factory.for_each(Shift) \
.penalize("Initialized shift", HardSoftScore.ONE_SOFT)
This constraint stream penalizes each known and initialized instance of Shift
.
4.1. ForEach
The .for_each(T)
building block selects every T
instance that
is in a problem fact collection
or a planning entity collection
and has no None
genuine planning variables.
To include instances with a None
genuine planning variable,
replace the for_each()
building block by for_each_including_null_vars()
:
def penalize_all_shifts(factory: ConstraintFactory):
return factory.for_each_including_null_vars(Shift) \
.penalize("A shift", HardSoftScore.ONE_SOFT)
The |
4.2. Penalties and rewards
The purpose of constraint streams is to build up a score for a solution.
To do this, every constraint stream must be terminated by a call to either a penalize()
or a reward()
building block.
The penalize()
building block makes the score worse and the reward()
building block improves the score.
Penalties and rewards have several components:
-
Constraint package is the python module that contains the constraint. The default value is the module that contains the
@constraint_provider
implementation. -
Constraint name is the human readable descriptive name for the constraint, which (together with the constraint package) must be unique within the entire
@constraint_provider
implementation. -
Constraint weight is a constant score value indicating how much every breach of the constraint affects the score. Valid examples include
SimpleScore.ONE
,HardSoftScore.ONE_HARD
andHardMediumSoftScore.of(1, 2, 3)
. -
Constraint match weigher is an optional function indicating how many times the constraint weight should be applied in the score. The penalty or reward score impact is the constraint weight multiplied by the match weight. The default value is
1
.
The ConstraintStreams API supports many different types of penalties. Browse the API in your IDE for the full list of method overloads. Here are some examples:
-
Simple penalty (
penalize("Constraint name", SimpleScore.ONE)
) makes the score worse by1
per every match in the constraint stream. The score type must be the same type as used on the@planning_score
decorated member on the planning solution. -
Dynamic penalty (
penalize("Constraint name", SimpleScore.ONE, lambda shift: shift.hours)
) makes the score worse by the number of hours in every matchingShift
in the constraint stream. This is an example of using a constraint match weigher.
By replacing the keyword penalize
by reward
in the name of these building blocks, you get operations that
affect score in the opposite direction.
4.3. Filtering
Filtering enables you to reduce the number of constraint matches in your stream.
It first enumerates all constraint matches and then applies a predicate to filter some matches out.
The predicate is a function that only returns True
if the match is to continue in the stream.
The following constraint stream removes all of Beth’s shifts from all Shift
matches:
def penalize_ann_shifts(factory: ConstraintFactory):
return factory.for_each(Shift) \
.filter(lambda shift: shift.employee.name == "Ann") \
.penalize("Ann's shift", SimpleScore.ONE)
The following example retrieves a list of shifts where an employee has asked for a day off from a bi-constraint match
of Shift
and DayOff
:
def penalize_shifts_on_off_days(factory: ConstraintFactory):
return factory.for_each(Shift) \
.join(DayOff) \
.filter(lambda shift, day_off: shift.date == day_off.date and shift.employee == day_off.employee) \
.penalize("Shift on an off-day", SimpleScore.ONE)
The following figure illustrates both these examples:
For performance reasons, using the join building block with the appropriate |
The following functions are required for filtering constraint streams of different cardinality:
Cardinality |
Filtering Predicate |
1 |
|
2 |
|
3 |
|
4 |
|
4.4. Joining
Joining is a way to increase stream cardinality and it is similar to the inner join
operation in SQL. As the following figure illustrates,
a join()
creates a cartesian product of the streams being joined:
Doing this is inefficient if the resulting stream contains a lot of constraint matches that need to be filtered out immediately.
Instead, use a Joiner
condition to restrict the joined matches only to those that are interesting:
For example:
from optapy.constraint import Joiners
def shift_on_day_off(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(Shift) \
.join(DayOff,
Joiners.equal(lambda shift: shift.date,
lambda day_off: day_off.date),
Joiners.equal(lambda shift: shift.employee,
lambda day_off: day_off.employee)) \
.penalize("Shift on an off-day",
HardSoftScore.ONE_HARD)
Through the Joiners
class, the following Joiner
conditions are supported to join two streams,
pairing a match from each side:
-
equal()
: the paired matches have a property that are equals to one another. This relies on__hash__(self)
and__eq__(self, other)
. -
greaterThan()
,greaterThanOrEqual()
,lessThan()
andlessThanOrEqual()
: the paired matches have aComparable
property following the prescribed ordering. -
overlapping()
: the paired matches have two properties (a start and an end property) of the sameComparable
type that both represent an interval which overlap.
All Joiners
methods have an overloaded method to use the same property of the same class on both stream sides.
For example, calling Joiners.equal(lambda shift: shift.employee)
is the same as calling Joiners.equal(lambda shift: shift.employee, lambda shift: shift.employee)
.
If the other stream might match multiple times, but it must only impact the score once (for each element of the original stream), use ifExists instead. It does not create cartesian products and therefore generally performs better. |
4.5. Grouping and collectors
Grouping collects items in a stream according to user-provider criteria (also called "group key"), similar to what a
GROUP BY
SQL clause does. Additionally, some grouping operations also accept one or more Collector
instances, which
provide various aggregation functions. The following figure illustrates a simple group_by()
operation:
For example, the following code snippet first groups all processes by the computer they run on, sums up all the power
required by the processes on that computer using the ConstraintCollectors.sum(…)
collector, and finally penalizes
every computer whose processes consume more power than is available.
from optapy.constraint import ConstraintCollectors
def required_cpu_power_total(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(CloudProcess) \
.group_by(lambda process: process.computer,
ConstraintCollectors.sum(lambda process: process.required_cpu_power)) \
.filter(lambda computer, required_cpu_power: required_cpu_power > computer.cpu_power) \
.penalize("required_cpu_power_total",
HardSoftScore.ONE_HARD,
lambda computer, required_cpu_power: required_cpu_power - computer.cpu_power)
Information might be lost during grouping.
In the previous example, |
There are several collectors available out of the box.
4.5.1. Out-of-the-box collectors
The following collectors are provided out of the box:
count()
collector
The ConstraintCollectors.count(…)
counts all elements per group. For example, the following use of the collector
gives a number of items for two separate groups - one where the talks have unavailable speakers, and one where they
don’t.
def speaker_availability(factory: ConstraintFactory):
return factory.for_each(Talk) \
.group_by(lambda talk: talk.has_any_unavailable_speaker(),
ConstraintCollectors.count()) \
.penalize("Speaker Availability",
HardSoftScore.ONE_HARD,
lambda has_unavailable_speaker, count: ...)
The count is collected in an int
.
To count a bi, tri or quad stream, use countBi()
, countTri()
or countQuad()
respectively,
because - unlike the other built-in collectors - they aren’t overloaded methods due to Java’s generics erasure.
count_distinct()
collector
The ConstraintCollectors.count_distinct(…)
counts any element per group once, regardless of how many times it
occurs. For example, the following use of the collector gives a number of talks in each unique room.
def room_count(factory: ConstraintFactory):
return factory.for_each(Talk) \
.group_by(lambda talk: talk.room,
ConstraintCollectors.count_distinct()) \
.penalize("room_count",
HardSoftScore.ONE_SOFT,
lambda room, count: ...)
The distinct count is collected in an int
.
sum()
collector
To sum the values of a particular property of all elements per group, use the ConstraintCollectors.sum(…)
collector. The following code snippet first groups all processes by the computer they run on and sums up all the power
required by the processes on that computer using the ConstraintCollectors.sum(…)
collector.
def required_cpu_power_total(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(CloudProcess) \
.group_by(lambda process: process.computer,
ConstraintCollectors.sum(lambda process: process.required_cpu_power)) \
.penalize("required_cpu_power_total",
HardSoftScore.ONE_SOFT,
lambda computer, required_cpu_power: required_cpu_power)
The sum is collected in an int
.
average()
collector
To calculate the average of a particular property of all elements per group, use the ConstraintCollectors.average(…)
collector.
The following code snippet first groups all processes by the computer they run on and averages all the power
required by the processes on that computer using the ConstraintCollectors.average(…)
collector.
def average_cpu_power(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(CloudProcess) \
.group_by(lambda process: process.computer,
ConstraintCollectors.average(lambda process: process.required_cpu_power)) \
.penalize("average_cpu_power",
HardSoftScore.ONE_SOFT,
lambda computer, average_cpu_power: average_cpu_power)
The average is collected as a float
, and the average of no elements is None
.
min()
and max()
collectors
To extract the minimum or maximum per group, use the ConstraintCollectors.min(…)
and
ConstraintCollectors.max(…)
collectors respectively.
These collectors operate on values of properties which are Comparable
(such as int
, str
or float
),
although there are also variants of these collectors which allow you to provide your own Comparator
.
The following example finds a computer which runs the most power-demanding process:
def computer_with_biggest_process(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(CloudProcess) \
.group_by(lambda process: process.computer,
ConstraintCollectors.max(lambda process: process.required_cpu_power)) \
.penalize("computer_with_biggest_process",
HardSoftScore.ONE_HARD,
lambda computer, biggest_process: ...)
|
to_list()
, to_set()
and to_map()
collectors
To extract all elements per group into a collection, use the ConstraintCollectors.to_list(…)
.
The following example retrieves all processes running on a computer in a list
:
def computer_and_its_processes(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(CloudProcess) \
.group_by(lambda process: process.computer,
ConstraintCollectors.to_list()) \
.penalize("computer_and_its_processes",
HardSoftScore.ONE_HARD,
lambda computer, process_list: ...)
Variants of this collector:
-
to_list()
collects alist
value. -
to_set()
collects aset
value. -
to_sorted_set()
collects aSortedSet
value. -
to_map()
collects adict
value. -
to_sorted_map()
collects aSortedMap
value.
The iteration order of elements in the resulting collection is not guaranteed to be stable,
unless it is a sorted collector such as |
4.5.2. Conditional collectors
The constraint collector framework enables you to create constraint collectors which will only collect in certain circumstances.
This is achieved using the ConstraintCollectors.conditionally(…)
constraint collector.
This collector accepts a predicate, and another collector to which it will delegate if the predicate is true. The following example returns a count of long-running processes assigned to a given computer, excluding processes which are not long-running:
def computer_with_long_running_processes(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(CloudProcess) \
.group_by(lambda process: process.computer,
ConstraintCollectors.conditionally(
lambda process: process.is_long_running(),
ConstraintCollectors.count()
)
) \
.penalize("long_running_processes",
HardSoftScore.ONE_HARD,
lambda computer, long_running_process_count: ...)
This is useful in situations where multiple collectors are used and only some of them need to be restricted.
If all of them needed to be restricted in the same way,
then applying a filter()
before the grouping is preferable.
4.5.3. Composing collectors
The constraint collector framework enables you to create complex collectors utilizing simpler ones.
This is achieved using the ConstraintCollectors.compose(…)
constraint collector.
This collector accepts 2 to 4 other constraint collectors,
and a function to merge their results into one.
The following example builds an average()
constraint collector
using the count
constraint collector and sum()
constraint collector:
def average(group_value_mapping: Callable[[A], int]):
return ConstraintCollectors.compose(
ConstraintCollectors.count(),
ConstraintCollectors.sum(group_value_mapping),
calculate_average)
def calculate_average(count: int, group_sum: int):
if count == 0:
return None
else:
return group_sum / count
Similarly, the compose()
collector enables you to work around the limitation of Constraint Stream cardinality
and use as many as 4 collectors in your group_by()
statements:
collector = ConstraintCollectors.compose(
ConstraintCollectors.count(),
ConstraintCollectors.min(),
ConstraintCollectors.max(),
tuple)
Such a composite collector returns a tuple
instance which allows you to access each of the sub collectors individually.
4.6. Conditional propagation
Conditional propagation enables you to exclude constraint matches from the constraint stream based on the presence or absence of some other object.
The following example penalizes computers which have at least one process running:
def running_computer(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(CloudComputer) \
.if_exists(CloudProcess,
Joiners.equal(lambda computer: computer,
lambda process: process.computer)) \
.penalize("running_computer",
HardSoftScore.ONE_SOFT,
lambda computer: ...)
Note the use of the if_exists()
building block.
On UniConstraintStream
, the if_exists_other()
building block is also available which is useful in situations where the
for_each()
constraint match type is the same as the if_exists()
type.
Conversely, if the if_not_exists()
building block is used (as well as the if_not_exists_other()
building block on
UniConstraintStream
) you can achieve the opposite effect:
def unused_computer(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(CloudComputer) \
.if_not_exists(CloudProcess,
Joiners.equal(lambda computer: computer,
lambda process: process.computer)) \
.penalize("unused_computer",
HardSoftScore.ONE_HARD,
lambda computer: ...)
Here, only the computers without processes running are penalized.
Also note the use of the Joiner
class to limit the constraint matches.
For a description of available joiners, see joining.
Conditional propagation operates much like joining, with the exception of not increasing the
stream cardinality.
Matches from these building blocks are not available further down the stream.
For performance reasons, using conditional propagation with the appropriate |
4.7. Mapping tuples
Mapping enables you to transform each tuple in a constraint stream by applying a mapping function to it.
The result of such mapping is UniConstraintStream
of the mapped tuples.
def computer_with_biggest_process(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(CloudProcess) # UniConstraintStream[CloudProcess]
.map(lambda process: process.computer) # UniConstraintStream[CloudComputer]
)
...
In the example above, the mapping function produces duplicate tuples if two different |
4.7.1. Designing the mapping function
When designing the mapping function, follow these guidelines for optimal performance:
-
Keep the function pure. The mapping function should only depend on its input. That is, given the same input, it always returns the same output.
-
Keep the function bijective. No two input tuples should map to the same output tuple, or to tuples that are equal. Not following this recommendation creates a constraint stream with duplicate tuples, and may force you to use
distinct()
later. -
Use immutable data carriers. The tuples returned by the mapping function should be immutable and identified by their contents and nothing else. If two tuples carry objects which equal one another, those two tuples should likewise equal and preferably be the same instance.
4.7.2. Dealing with duplicate tuples using distinct()
As a general rule, tuples in constraint streams are distinct. That is, no two tuples that equal one another. However, certain operations such as tuple mapping may produce constraint streams where that is not true.
If a constraint stream produces duplicate tuples, you can use the distinct()
building block
to have the duplicate copies eliminated.
def computer_with_biggest_process(constraint_factory: ConstraintFactory) {
return (
constraint_factory.for_each(CloudProcess) # UniConstraintStream[CloudProcess]
.map(lambda process: process.computer) # UniConstraintStream[CloudComputer]
.distinct() # The same, each CloudComputer just once.
)
...
There is a performance cost to |
4.8. Flattening
Flattening enables you to transform any Iterable
(such as list
or set
)
into a set of tuples, which are sent downstream.
(Similar to Java Stream’s flatMap(…)
.)
This is done by applying a mapping function to the final element in the source tuple.
def required_job_roles(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Person) # UniConstraintStream<Person>
.join(Job,
Joiners.equal(lambda job: job,
lambda job: job.assignee)) # BiConstraintStream<Person, Job>
.flatten_last(lambda job: job.required_roles) # BiConstraintStream<Person, Role>
.filter(lambda person, required_role: ...)
)
...
In the example above, the mapping function produces duplicate tuples
if |
5. Testing a constraint stream
Constraint streams include the Constraint Verifier unit testing harness.
5.1. Testing constraints in isolation
Consider the following constraint stream:
def horizontal_conflict(factory: ConstraintFactory):
return factory \
.for_each_unique_pair(Queen,
Joiners.equal(lambda queen: queen.row)) \
.penalize("Horizontal conflict", SimpleScore.ONE)
The following example uses the Constraint Verifier API to create a simple unit test for the preceding constraint stream:
from optapy.test import constraint_verifier_build
constraint_verifier
= constraint_verifier_build(n_queens_constraint_provider, NQueens, Queen)
def test_horizontal_conflict_with_two_queens():
row1 = Row(0)
column1 = Column(0)
column2 = Column(1)
queen1 = Queen(0, row1, column1)
queen2 = Queen(1, row1, column2)
constraint_verifier.verify_that(horizontal_conflict) \
.given(queen1, queen2) \
.penalizes_by(1)
This test ensures that the horizontal conflict constraint assigns a penalty of 1
when there are two queens on the same
row.
The following line creates a shared ConstraintVerifier
instance and initializes the instance with the
NQueensConstraintProvider
:
constraint_verifier
= constraint_verifier_build(n_queens_constraint_provider, NQueens, Queen)
The test_horizontal_conflict_with_two_queens
is the function that runs the unit test, and can be adapted to many test frameworks such as pytest and unittest.
The first part of the test prepares the test data.
In this case, the test data includes two instances of the Queen
planning entity and their dependencies
(Row
, Column
):
row1 = Row(0)
column1 = Column(0)
column2 = Column(1)
queen1 = Queen(0, row1, column1)
queen2 = Queen(1, row1, column2)
Further down, the following code tests the constraint:
constraint_verifier.verify_that(horizontal_conflict) \
.given(queen1, queen2) \
.penalizes_by(1)
The verify_that(…)
call is used to specify a function that provides the constraint.
The given(…)
call is used to enumerate all the facts that the constraint stream operates on.
In this case, the given(…)
call takes the queen1
and queen2
instances previously created.
Alternatively, you can use a given_solution(…)
method here and provide a planning solution instead.
Finally, the penalizes_by(…)
call completes the test, making sure that the horizontal conflict constraint, given
one Queen
, results in a penalty of 1
.
This number is a product of multiplying the match weight, as defined in the constraint stream, by the number of matches.
Alternatively, you can use a rewards_with(…)
call to check for rewards instead of penalties.
The method to use here depends on whether the constraint stream in question is terminated with a penalize
or a
reward
building block.
5.2. Testing all constraints together
In addition to testing individual constraints, you can test the entire @constraint_provider
instance.
Consider the following test:
def given_facts_multiple_constraints():
queen1 = Queen(0, row1, column1)
queen2 = Queen(1, row2, column2)
queen3 = Queen(2, row3, column3)
constraint_verifier.verify_that() \
.given(queen1, queen2, queen3) \
.scores(SimpleScore.of(-3))
There are only two notable differences to the previous example.
First, the verify_that()
call takes no argument here, signifying that the entire @constraint_provider
function is
being tested.
Second, instead of either a penalizes_by()
or rewards_with()
call, the scores(…)
method is used.
This runs the ConstraintProvider
on the given facts and returns a sum of Score
s of all constraint matches resulting
from the given facts.
Using this method, you ensure that the constraint provider does not miss any constraints and that the scoring function remains consistent as your code base evolves.