Skip to content
This repository has been archived by the owner on Oct 20, 2022. It is now read-only.

Maxsum synchronous impl #13

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
527 changes: 527 additions & 0 deletions pydcop/algorithms/amaxsum.py

Large diffs are not rendered by default.

799 changes: 155 additions & 644 deletions pydcop/algorithms/maxsum.py

Large diffs are not rendered by default.

165 changes: 82 additions & 83 deletions pydcop/algorithms/maxsum_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
import logging

from pydcop.infrastructure.computations import Message, register
from pydcop.algorithms.maxsum import FactorAlgo, MaxSumMessage, VariableAlgo
from pydcop.algorithms.amaxsum import MaxSumFactorComputation, MaxSumVariableComputation
from pydcop.algorithms.maxsum import MaxSumMessage
from pydcop.dcop.relations import NeutralRelation


class DynamicFunctionFactorComputation(FactorAlgo):
class DynamicFunctionFactorComputation(MaxSumFactorComputation):

"""
This is a specialisation of the computation performed for factor in the
Expand Down Expand Up @@ -68,9 +69,8 @@ class DynamicFunctionFactorComputation(FactorAlgo):

"""

def __init__(self, factor, name=None, msg_sender=None, comp_def=None):
super().__init__(factor, name=name, msg_sender=msg_sender,
comp_def=comp_def)
def __init__(self, comp_def=None):
super().__init__(comp_def=comp_def)

def change_factor_function(self, fn):
"""
Expand All @@ -80,27 +80,29 @@ def change_factor_function(self, fn):
"""
# Make sure the new function has the same dimension as the
# previous one.
if len(self._factor.dimensions) != len(fn.dimensions):
raise ValueError('Dimensions must be the same when changing '
'function in DynamicFunctionFactorComputation')
diff1 = [v for v in self._factor.dimensions if v not in fn.dimensions]
diff2 = [v for v in fn.dimensions if v not in self._factor.dimensions]
if len(self.factor.dimensions) != len(fn.dimensions):
raise ValueError(
"Dimensions must be the same when changing "
"function in DynamicFunctionFactorComputation"
)
diff1 = [v for v in self.factor.dimensions if v not in fn.dimensions]
diff2 = [v for v in fn.dimensions if v not in self.factor.dimensions]
if diff1 or diff2:
raise ValueError('Dimensions must be the same when changing '
'function in DynamicFunctionFactorComputation')
raise ValueError(
"Dimensions must be the same when changing "
"function in DynamicFunctionFactorComputation"
)

# Dimensions are ok, change factor computation object and emit cost
# messages
self._factor = fn
self.factor = fn
return self._init_msg()

def __str__(self):
return 'Maxsum dynamic function Factor computation for ' + \
self._factor.name
return "Maxsum dynamic function Factor computation for " + self.factor.name

def __repr__(self):
return 'Maxsum dynamic function Factor computation for ' + \
self._factor.name
return "Maxsum dynamic function Factor computation for " + self.factor.name


class FactorWithReadOnlyVariableComputation(DynamicFunctionFactorComputation):
Expand All @@ -118,8 +120,7 @@ class FactorWithReadOnlyVariableComputation(DynamicFunctionFactorComputation):

"""

def __init__(self, relation, read_only_variables, name=None,
msg_sender=None):
def __init__(self, relation, read_only_variables, name=None, msg_sender=None):

self._relation = relation
self._read_only_variables = read_only_variables
Expand All @@ -129,23 +130,22 @@ def __init__(self, relation, read_only_variables, name=None,
writable_vars = relation.dimensions[:]
for v in read_only_variables:
if v not in relation.dimensions:
raise ValueError('Read only {} variable must be in relation '
'scope {}'.format(v.name, relation.dimensions))
raise ValueError(
"Read only {} variable must be in relation "
"scope {}".format(v.name, relation.dimensions)
)
writable_vars.remove(v)

# We start with a neutral relation until we have all values from
# the read-only variables the condition depends on:
self._sliced_relation = NeutralRelation(writable_vars,
name=self._relation.name)
super().__init__(self._sliced_relation, name=name,
msg_sender=msg_sender)
self._sliced_relation = NeutralRelation(writable_vars, name=self._relation.name)
super().__init__(self._sliced_relation, name=name, msg_sender=msg_sender)

def on_start(self):
# when starting, subscribe to all sensor variable used in the
# condition of the rule
for v in self._read_only_variables:
self._msg_sender.post_msg(self.name, v.name, Message('SUBSCRIBE',
None))
self._msg_sender.post_msg(self.name, v.name, Message("SUBSCRIBE", None))
super().on_start()

@register("VARIABLE_VALUE")
Expand All @@ -154,8 +154,8 @@ def _on_new_var_value_msg(self, var_name, msg, t):

value = msg.content
if var_name not in [v.name for v in self._read_only_variables]:
self.logger.error('Unexpected value from %s - %s ', var_name, value)
self.logger.debug('Received new value for %s - %s ', var_name, value)
self.logger.error("Unexpected value from %s - %s ", var_name, value)
self.logger.debug("Received new value for %s - %s ", var_name, value)

self._read_only_values[var_name] = value

Expand All @@ -164,25 +164,23 @@ def _on_new_var_value_msg(self, var_name, msg, t):
new_sliced = self._relation.slice(self._read_only_values)

if hash(new_sliced) != hash(self._sliced_relation):
self.logger.info('Changing factor function %s ', self.name)
self.logger.info("Changing factor function %s ", self.name)
msg_count, msg_size = self.change_factor_function(new_sliced)
self._sliced_relation = new_sliced
self._active = True
else:
self.logger.info('Equivalent relation, no change %s ',
self.name)
self.logger.info("Equivalent relation, no change %s ", self.name)

else:
self.logger.info('Still waiting for values to evaluate the '
'rule ', self._read_only_values)
self.logger.info(
"Still waiting for values to evaluate the " "rule ",
self._read_only_values,
)

return {
'num_msg_out': msg_count,
'size_msg_out': msg_size,
}
return {"num_msg_out": msg_count, "size_msg_out": msg_size}


class DynamicFactorComputation(FactorAlgo):
class DynamicFactorComputation(MaxSumFactorComputation):
"""
Factor Computation for dynamic Max-Sum.

Expand Down Expand Up @@ -211,16 +209,16 @@ def __init__(self, relation, name=None, msg_sender=None):
# Check if the factor depends on external variables
self._external_variables = {}
for v in relation.dimensions:
if hasattr(v, 'value'):
if hasattr(v, "value"):
self._external_variables[v.name] = v

if self._external_variables:
external_values = {v.name: v.value for v
in self._external_variables.values()}
external_values = {
v.name: v.value for v in self._external_variables.values()
}
self._current_relation = self._relation.slice(external_values)

super().__init__(self._current_relation, name=name,
msg_sender=msg_sender)
super().__init__(self._current_relation, name=name, msg_sender=msg_sender)

def on_start(self):
# subscribe to external variable
Expand All @@ -231,20 +229,20 @@ def on_start(self):
def change_factor_function(self, fn):
msg_count, msg_size = 0, 0

var_removed = [v for v in self._factor.dimensions if v not in
fn.dimensions]
var_added = [v for v in fn.dimensions if v not in
self._factor.dimensions]
var_removed = [v for v in self._factor.dimensions if v not in fn.dimensions]
var_added = [v for v in fn.dimensions if v not in self._factor.dimensions]
if not var_removed and not var_added:
# Dimensions have not changed, simply change factor object and emit
# cost messages
self.logger.info('Function change with no change in '
'factor\'s dimension')
self.logger.info("Function change with no change in " "factor's dimension")
self._factor = fn
msg_count, msg_size = self._init_msg()
else:
self.logger.info('Function change with new variables %s and '
'removed variables %s', var_added, var_removed)
self.logger.info(
"Function change with new variables %s and " "removed variables %s",
var_added,
var_removed,
)
self._factor = fn
for v in var_removed:
if v.name in self._costs:
Expand All @@ -271,16 +269,15 @@ def _on_new_var_value_msg(self, var_name, msg, t):
msg_count, msg_size = 0, 0
value = msg.content
if var_name not in self._external_variables:
self.logger.error('Unexpected value from %s - %s ', var_name, value)
self.logger.debug('Received new value for %s - %s ', var_name, value)
self.logger.error("Unexpected value from %s - %s ", var_name, value)
self.logger.debug("Received new value for %s - %s ", var_name, value)

self._external_variables[var_name].value = value
external_values = {v.name: v.value for v
in self._external_variables.values()}
external_values = {v.name: v.value for v in self._external_variables.values()}
new_sliced = self._relation.slice(external_values)

if hash(new_sliced) != hash(self._current_relation):
self.logger.info('Changing factor function %s ', self.name)
self.logger.info("Changing factor function %s ", self.name)
msg_count, msg_size = self.change_factor_function(new_sliced)
self._current_relation = new_sliced
self._active = True
Expand All @@ -298,16 +295,16 @@ def _send_add_var_msg(self, var_added):
msg_debug = {}
for v in var_added:
costs_v = self._costs_for_var(v)
msg = MaxSumMessage('ADD', {'costs': costs_v})
msg = MaxSumMessage("ADD", {"costs": costs_v})
self._msg_sender.post_msg(self.name, v.name, msg)
msg_debug[v.name] = costs_v
msg_size += msg.size
msg_count += 1

debug = 'ADD VAR MSG {} \n'.format(self.name)
debug = "ADD VAR MSG {} \n".format(self.name)
for dest, msg in msg_debug.items():
debug += ' * {} -> {} : {}\n'.format(self.name, dest, msg)
self.logger.info(debug + '\n')
debug += " * {} -> {} : {}\n".format(self.name, dest, msg)
self.logger.info(debug + "\n")

return msg_count, msg_size

Expand All @@ -323,33 +320,31 @@ def _send_remove_var_msg(self, var_removed):
msg_count, msg_size = 0, 0

for v in var_removed:
msg = MaxSumMessage('REMOVE', {})
msg = MaxSumMessage("REMOVE", {})
self._msg_sender.post_msg(self.name, v.name, msg)
msg_size += msg.size
msg_count += 1
debug = 'REMOVE VAR INIT MSG {} \n'.format(self.name)
debug = "REMOVE VAR INIT MSG {} \n".format(self.name)
for dest in var_removed:
debug += ' * {} -> {} \n'.format(self.name, dest)
self.logger.info(debug + '\n')
debug += " * {} -> {} \n".format(self.name, dest)
self.logger.info(debug + "\n")

return msg_count, msg_size

def subscribe(self, variable):
self._msg_sender.post_msg(self.name, variable.name,
Message('SUBSCRIBE', None))
self._msg_sender.post_msg(self.name, variable.name, Message("SUBSCRIBE", None))

def unsubscribe(self, variable):
self._msg_sender.post_msg(self.name, variable.name,
Message('SUBSCRIBE', None))
self._msg_sender.post_msg(self.name, variable.name, Message("SUBSCRIBE", None))

def __str__(self):
return 'Maxsum dynamic Factor computation for ' + self._factor.name
return "Maxsum dynamic Factor computation for " + self._factor.name

def __repr__(self):
return 'Maxsum dynamic Factor computation for ' + self._factor.name
return "Maxsum dynamic Factor computation for " + self._factor.name


class DynamicFactorVariableComputation(VariableAlgo):
class DynamicFactorVariableComputation(MaxSumVariableComputation):
"""
Variable computation for dynamic Max-Sum.

Expand All @@ -359,23 +354,25 @@ class DynamicFactorVariableComputation(VariableAlgo):


"""

def __init__(self, variable, factor_names, msg_sender=None):

super().__init__(variable, factor_names=factor_names,
msg_sender=msg_sender)
super().__init__(variable, factor_names=factor_names, msg_sender=msg_sender)

@register("REMOVE")
def _on_remove_msg(self, factor_name, msg, t):
self.logger.debug("Received REMOVE msg from %s on var %s",
factor_name, self.name)
self.logger.debug(
"Received REMOVE msg from %s on var %s", factor_name, self.name
)

# The removed factor should always be in the list of our factors but we
# might have not received any costs from him yet.
try:
self._factors.remove(factor_name)
except ValueError:
msg = 'CANNOT remove factor {} from variable {}, not in {}'\
.format(factor_name, self.name, self._factors)
msg = "CANNOT remove factor {} from variable {}, not in {}".format(
factor_name, self.name, self._factors
)
self.logger.error(msg)
raise ValueError(msg)

Expand All @@ -386,16 +383,18 @@ def _on_remove_msg(self, factor_name, msg, t):

# Select a new value.
self._current_value, self._current_cost = self._select_value()
self.logger.debug('On Remove msg, Variable %s select value %s with '
'cost %s', self.name, self._current_value,
self._current_cost)
self.logger.debug(
"On Remove msg, Variable %s select value %s with " "cost %s",
self.name,
self._current_value,
self._current_cost,
)

# Do not send init cost, we may still have costs from other factors !
msg_count, msg_size = self._compute_and_send_costs(self.factors)

@register("ADD")
def _on_add_msg(self, factor_name, msg, t):
self.logger.debug("Received ADD msg from %s : %s ", factor_name,
msg.content)
self.logger.debug("Received ADD msg from %s : %s ", factor_name, msg.content)
self._factors.append(factor_name)
return self._on_cost_msg(factor_name, msg)
6 changes: 3 additions & 3 deletions tests/api/test_api_distribute_adhoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@
def test_api_distribute_maxsum_adhoc():
from pydcop.computations_graph import factor_graph
from pydcop.distribution import adhoc
from pydcop.algorithms import maxsum
from pydcop.algorithms import amaxsum

dcop = dcop_graphcoloring_3()
agents = create_agents('a', [1, 2, 3], capacity=100)
dcop._agents_def = agents

cg = factor_graph.build_computation_graph(dcop)
dist = adhoc.distribute(cg, dcop.agents.values(),
computation_memory=maxsum.computation_memory,
communication_load=maxsum.communication_load)
computation_memory=amaxsum.computation_memory,
communication_load=amaxsum.communication_load)

assert dist.is_hosted(['v1', 'v2', 'v3'])

Expand Down
6 changes: 3 additions & 3 deletions tests/api/test_api_distribute_ilp_compref.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@
def test_api_distribute_maxsum_ilp_compref():
from pydcop.computations_graph import factor_graph
from pydcop.distribution import ilp_compref
from pydcop.algorithms import maxsum
from pydcop.algorithms import amaxsum

dcop = dcop_graphcoloring_3()
agents = create_agents('a', range(1, 4), capacity=100)
dcop._agents_def = agents

cg = factor_graph.build_computation_graph(dcop)
dist = ilp_compref.distribute(cg, dcop.agents.values(),
computation_memory=maxsum.computation_memory,
communication_load=maxsum.communication_load)
computation_memory=amaxsum.computation_memory,
communication_load=amaxsum.communication_load)

assert dist.is_hosted(['v1', 'v2', 'v3'])

Expand Down
Loading