Skip to content

Commit

Permalink
make hash does not need to be part of basic memoizer
Browse files Browse the repository at this point in the history
and is more reusable when it isn't

this isn't the only way to make a hash though. and
hashing isn't the only way to compare checkpoint
entries for equality.
  • Loading branch information
benclifford committed Aug 22, 2024
1 parent 333c7eb commit e78e12d
Showing 1 changed file with 37 additions and 36 deletions.
73 changes: 37 additions & 36 deletions parsl/dataflow/memoization.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,42 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
return pickle.dumps(["types.FunctionType", f.__name__, f.__module__])


def make_hash(task: TaskRecord) -> str:
"""Create a hash of the task inputs.
Args:
- task (dict) : Task dictionary from dfk.tasks
Returns:
- hash (str) : A unique hash string
"""

t: List[bytes] = []

# if kwargs contains an outputs parameter, that parameter is removed
# and normalised differently - with output_ref set to True.
# kwargs listed in ignore_for_cache will also be removed

filtered_kw = task['kwargs'].copy()

ignore_list = task['ignore_for_cache']

logger.debug("Ignoring these kwargs for checkpointing: %s", ignore_list)
for k in ignore_list:
logger.debug("Ignoring kwarg %s", k)
del filtered_kw[k]

if 'outputs' in task['kwargs']:
outputs = task['kwargs']['outputs']
del filtered_kw['outputs']
t.append(id_for_memo(outputs, output_ref=True))

t.extend(map(id_for_memo, (filtered_kw, task['func'], task['args'])))

x = b''.join(t)
return hashlib.md5(x).hexdigest()


class Memoizer:
def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: Sequence[str], run_dir: str) -> None:
raise NotImplementedError
Expand Down Expand Up @@ -200,41 +236,6 @@ def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files:
logger.info("App caching disabled for all apps")
self.memo_lookup_table = {}

def make_hash(self, task: TaskRecord) -> str:
"""Create a hash of the task inputs.
Args:
- task (dict) : Task dictionary from dfk.tasks
Returns:
- hash (str) : A unique hash string
"""

t: List[bytes] = []

# if kwargs contains an outputs parameter, that parameter is removed
# and normalised differently - with output_ref set to True.
# kwargs listed in ignore_for_cache will also be removed

filtered_kw = task['kwargs'].copy()

ignore_list = task['ignore_for_cache']

logger.debug("Ignoring these kwargs for checkpointing: %s", ignore_list)
for k in ignore_list:
logger.debug("Ignoring kwarg %s", k)
del filtered_kw[k]

if 'outputs' in task['kwargs']:
outputs = task['kwargs']['outputs']
del filtered_kw['outputs']
t.append(id_for_memo(outputs, output_ref=True))

t.extend(map(id_for_memo, (filtered_kw, task['func'], task['args'])))

x = b''.join(t)
return hashlib.md5(x).hexdigest()

def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
"""Create a hash of the task and its inputs and check the lookup table for this hash.
Expand All @@ -256,7 +257,7 @@ def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
logger.debug("Task {} will not be memoized".format(task_id))
return None

hashsum = self.make_hash(task)
hashsum = make_hash(task)
logger.debug("Task {} has memoization hash {}".format(task_id, hashsum))
result = None
if hashsum in self.memo_lookup_table:
Expand Down

0 comments on commit e78e12d

Please sign in to comment.