Skip to content

Commit

Permalink
Merge pull request #588 from ICB-DCM/develop
Browse files Browse the repository at this point in the history
* Ignore B027 empty method in an abstract base class (#587)

* Ignore B027 empty method in an abstract base class

* Refactor EPSMixin

* Fix EPSMixin for ConcurrentFutureSampler

* restrict distributed, see dask/distributed#7227

* tmp add bokeh for tests to avoid restrictions

* version+releasenotes 0.12.7
  • Loading branch information
yannikschaelte authored Oct 30, 2022
2 parents c52091e + 9c86cc2 commit b92b4fb
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 161 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ extend-ignore =
E203
# Don't be crazy if line too long
E501
# Empty method in an abstract base class
B027

per-file-ignores =
# Imported but unused
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ Release Notes
...........


0.12.7 (2022-10-30)
-------------------

Minor:

* Ignore B027 empty method in an abstract base class
* Refactor EPSMixin
* Fix EPSMixin for ConcurrentFutureSampler
* Temporarily add bokeh to test due to dask error

0.12.6 (2022-08-30)
-------------------

Expand Down
5 changes: 3 additions & 2 deletions pyabc/population.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ def __init__(self, particles: List[Particle]):
raise AssertionError(
"A population should only consist of accepted particles"
)
if not np.isclose(sum(p.weight for p in particles), 1):

if not np.isclose(total_weight := sum(p.weight for p in particles), 1):
raise AssertionError(
"The population total weight is not normalized."
f"The population total weight {total_weight} is not normalized."
)

self.calculate_model_probabilities()
Expand Down
49 changes: 17 additions & 32 deletions pyabc/sampler/concurrent_future.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Sample via the python concurrent futures executor interface."""

from .base import Sampler
from .eps_mixin import EPSMixin

Expand All @@ -11,66 +13,49 @@ class ConcurrentFutureSampler(EPSMixin, Sampler):
Parameters
----------
cfuture_executor: concurrent.futures.Executor, required
cfuture_executor: "concurrent.futures.Executor"
Configured object that implements the concurrent.futures.Executor
interface
client_max_jobs:
Maximum number of jobs that can submitted to the client at a time.
If this value is smaller than the maximum number of cores provided by
the distributed infrastructure, the infrastructure will not be utilized
fully.
default_pickle:
Specify if the sampler uses pythons default pickle function to
communicate the submit function to python; if this is the case, a
cloud-pickle based workaround is used to pickle the simulate and
evaluate functions. This allows utilization of locally defined
functions, which can not be pickled using default pickle, at the cost
of an additional pickling overhead.
batch_size: int, optional
batch_size:
Number of parameter samples that are evaluated in one remote execution
call. Batch submission can be used to reduce the communication overhead
for fast (ms-s) model evaluations. Large batch sizes can result in un-
necessary model evaluations. By default, batch_size=1, i.e. no batching
is done.
"""

def __init__(
self,
cfuture_executor=None,
client_max_jobs=200,
default_pickle=True,
batch_size=1,
client_max_jobs: int = 200,
default_pickle: bool = True,
batch_size: int = 1,
):
super().__init__()

# Assign Client
self.my_client = cfuture_executor

# Client options
self.client_max_jobs = client_max_jobs

# Job state
self.jobs_queued = 0

# Empty functions
self.simulate_one = None
self.accept_one = None

# Option pickling
self.default_pickle = default_pickle

# Batchsize
self.batch_size = batch_size
EPSMixin.__init__(
self,
client=cfuture_executor,
client_max_jobs=client_max_jobs,
default_pickle=default_pickle,
batch_size=batch_size,
)
Sampler.__init__(self)

def __getstate__(self):
d = dict(self.__dict__)
del d['my_client']
del d['client']
return d

def client_cores(self):
def client_cores(self) -> int:
return self.client_max_jobs
46 changes: 21 additions & 25 deletions pyabc/sampler/dask_sampler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Sample via dask."""

import numpy as np
from dask.distributed import Client

Expand All @@ -13,7 +15,7 @@ class DaskDistributedSampler(EPSMixin, Sampler):
Parameters
----------
dask_client: dask.Client, optional
dask_client:
The configured dask Client.
If none is provided, then a local dask distributed cluster is created.
client_max_jobs:
Expand All @@ -22,14 +24,14 @@ class DaskDistributedSampler(EPSMixin, Sampler):
the distributed infrastructure, the infrastructure will not be utilized
fully.
default_pickle:
Specify if the sampler uses pythons default pickle function to
Specify if the sampler uses python's default pickle function to
communicate the submit function to python; if this is the case, a
cloud-pickle based workaround is used to pickle the simulate and
evaluate functions. This allows utilization of locally defined
functions, which can not be pickled using default pickle, at the cost
of an additional pickling overhead. For dask, this workaround should
not be necessary and it should be save to use default_pickle=false.
batch_size: int, optional
batch_size:
Number of parameter samples that are evaluated in one remote execution
call. Batchsubmission can be used to reduce the communication overhead
for fast (ms-s) model evaluations. Large batch sizes can result in un-
Expand All @@ -39,41 +41,35 @@ class DaskDistributedSampler(EPSMixin, Sampler):

def __init__(
self,
dask_client=None,
client_max_jobs=np.inf,
default_pickle=False,
batch_size=1,
dask_client: Client = None,
client_max_jobs: int = np.inf,
default_pickle: bool = False,
batch_size: int = 1,
):
super().__init__()

# Assign Client
if dask_client is None:
dask_client = Client()
self.my_client = dask_client

# Client options
self.client_max_jobs = client_max_jobs

# Job state
self.jobs_queued = 0

# For dask, we use cloudpickle by default
self.default_pickle = default_pickle

# Batchsize
self.batch_size = batch_size
EPSMixin.__init__(
self,
client=dask_client,
client_max_jobs=client_max_jobs,
default_pickle=default_pickle,
batch_size=batch_size,
)
Sampler.__init__(self)

def __getstate__(self):
d = dict(self.__dict__)
del d['my_client']
del d['client']
return d

def client_cores(self):
return sum(self.my_client.ncores().values())
def client_cores(self) -> int:
return sum(self.client.ncores().values())

def shutdown(self):
"""Shutdown the dask client.
If it was started without arguments, the
local cluster that was started at the same time is also closed.
"""
self.my_client.close()
self.client.close()
Loading

0 comments on commit b92b4fb

Please sign in to comment.