-
Notifications
You must be signed in to change notification settings - Fork 342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelized time series k means barycenter update procedure #321
base: main
Are you sure you want to change the base?
Parallelized time series k means barycenter update procedure #321
Conversation
I forgot to mention that also some redundant calls to the functions to_time_series_dataset() were removed because they caused memory allocation problems and they were basically not useful |
Hi @enricofallacara , I'm not the maintainer for tslearn but I thought I'd share some comments since this would be useful for me.
|
You are completly right, sorry for the psutil mistake. I have used it for some stuffs and i forgot to remove it. Also, I know that I had hard-coded the number of threads, but I was in a hurry and I loaded my implementation. Tomorrow I will fix this stuffs, thanks! |
Your PR is now in conflict with the master branch since we have refactored the code. I can take care of the merge to resolve the conflicts if you want, after which I could do a review of your code. |
Hello,
No problem with that. Sorry, I didn't have time to fix the code due to work
problems, I really appreciate your help.
Il giorno mar 5 gen 2021 alle ore 11:00 Romain Tavenard <
[email protected]> ha scritto:
… Hi @enricofallacara <https://github.com/enricofallacara>
Your PR is now in conflict with the master branch since we have refactored
the code. I can take care of the merge to resolve the conflicts if you
want, after which I could do a review of your code.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#321 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AKOARI3KO75OW4YPXDB54JTSYLPNTANCNFSM4U4M42ZQ>
.
|
# Conflicts: # tslearn/barycenters/dba.py # tslearn/clustering/kmeans.py # tslearn/metrics/dtw_variants.py Co-authored-by: enricofallacara <[email protected]>
# Conflicts: # tslearn/barycenters/dba.py # tslearn/clustering/kmeans.py # tslearn/metrics/dtw_variants.py Co-authored-by: enricofallacara <[email protected]>
@@ -148,7 +149,8 @@ def dtw_barycenter_averaging_petitjean(X, barycenter_size=None, | |||
for dynamic time warping, with applications to clustering. Pattern | |||
Recognition, Elsevier, 2011, Vol. 44, Num. 3, pp. 678-693 | |||
""" | |||
X_ = to_time_series_dataset(X) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that you want to avoid a copy here, but making sure that the data is in the right format is important. Maybe we shoud tweak the to_time_series_dataset
function to add an avoid_copy_if_possible
parameter, WDYT?
@@ -176,7 +178,7 @@ def dtw_barycenter_averaging_petitjean(X, barycenter_size=None, | |||
return barycenter | |||
|
|||
|
|||
def _mm_assignment(X, barycenter, weights, metric_params=None): | |||
def _mm_assignment(X, barycenter, weights, metric_params=None, n_treads=15): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use n_jobs
everywhere as a parameter name, and None
as its default, we should stick to the same convention here, I think.
#with parallel_backend('threading'): | ||
res = Parallel(backend='threading',prefer="threads",require='sharedmem',n_jobs=n_treads,verbose=10)(delayed(dtw_path)(barycenter, X[i], global_constraint="sakoe_chiba",sakoe_chiba_radius=1) for i in range(n)) | ||
paths, dists = zip(*res) | ||
paths = list(paths) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need these 2 casts?
cost += dist_i ** 2 * weights[i] | ||
list_p_k.append(path) | ||
#with parallel_backend('threading'): | ||
res = Parallel(backend='threading',prefer="threads",require='sharedmem',n_jobs=n_treads,verbose=10)(delayed(dtw_path)(barycenter, X[i], global_constraint="sakoe_chiba",sakoe_chiba_radius=1) for i in range(n)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we define a cdist_dtw_path
function instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should design a high-efficiency cdist_dtw_path function instead. The main bottlenecks are _mm_assignment function and _subgradient_valence_warping from my view. However, as DBA is communication-intensive, we require a good tune on parallelizing DBA's implementation. Perhaps, a good target is to ensure that all the available computing units are at the desired utilization level.
return list_p_k, cost | ||
|
||
|
||
def _subgradient_valence_warping(list_p_k, barycenter_size, weights): | ||
def _subgradient_valence_warping(list_p_k, barycenter_size, weights, n_treads): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use n_jobs
everywhere as a parameter name, and None
as its default, we should stick to the same convention here, I think.
list_w_k.append(w_k * weights[k]) | ||
list_v_k.append(w_k.sum(axis=1) * weights[k]) | ||
return w_k | ||
w_k_ones = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(create_w_k)(p_k,barycenter_size) for p_k in list_p_k) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this part really crucial to parallelize? I would expect that it would be neglectible in terms of running times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function cannot fully utilize CPU cores, even if we force to parallelize dba on different clusters and then becomes the bottleneck.
return list_v_k, list_w_k | ||
|
||
|
||
def _mm_valence_warping(list_p_k, barycenter_size, weights): | ||
def _mm_valence_warping(list_p_k, barycenter_size, weights, n_treads): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use n_jobs
everywhere as a parameter name, and None
as its default, we should stick to the same convention here, I think.
#diag_sum_v_k = numpy.zeros(list_v_k[0].shape) | ||
#for v_k in list_v_k: | ||
# diag_sum_v_k += v_k | ||
diag_sum_v_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.sum)(x) for x in zip(*numpy.array(list_v_k))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once again, I'm not sure we need to parallelize this part.
return diag_sum_v_k, list_w_k | ||
|
||
|
||
def _mm_update_barycenter(X, diag_sum_v_k, list_w_k): | ||
def _mm_update_barycenter(X, diag_sum_v_k, list_w_k, n_treads): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use n_jobs
everywhere as a parameter name, and None
as its default, we should stick to the same convention here, I think.
#sum_w_x = numpy.zeros((barycenter_size, d)) | ||
#for k, (w_k, x_k) in enumerate(zip(list_w_k, X)): | ||
# sum_w_x += w_k.dot(x_k[:ts_size(x_k)]) | ||
sum_w_x = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.dot)(w_k,x_k[:ts_size(x_k)]) for (w_k, x_k) in zip(list_w_k, X)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once again, I'm not sure we need to parallelize this part.
@@ -392,7 +412,7 @@ def _subgradient_update_barycenter(X, list_diag_v_k, list_w_k, weights_sum, | |||
|
|||
|
|||
def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None, | |||
max_iter=30, tol=1e-5, weights=None, | |||
max_iter=30, tol=1e-5, weights=None, n_treads=15, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use n_jobs
everywhere as a parameter name, and None
as its default, we should stick to the same convention here, I think.
@@ -514,6 +534,7 @@ def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None, | |||
def dtw_barycenter_averaging_one_init(X, barycenter_size=None, | |||
init_barycenter=None, | |||
max_iter=30, tol=1e-5, weights=None, | |||
n_treads=15, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use n_jobs
everywhere as a parameter name, and None
as its default, we should stick to the same convention here, I think.
@@ -576,23 +597,23 @@ def dtw_barycenter_averaging_one_init(X, barycenter_size=None, | |||
for Averaging in Dynamic Time Warping Spaces. | |||
Pattern Recognition, 74, 340-358. | |||
""" | |||
X_ = to_time_series_dataset(X) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above about the array copy issue.
@@ -692,23 +693,37 @@ def _assign(self, X, update_class_attributes=True): | |||
|
|||
def _update_centroids(self, X): | |||
metric_params = self._get_metric_params() | |||
X_list = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I guess we could use a comprehension list for better readability.
self.cluster_centers_[k] = euclidean_barycenter( | ||
X=X[self.labels_ == k]) | ||
X_list.append(X[self.labels_ == k]) | ||
cluster_centers = Parallel(backend='threading', prefer='threads', require='sharedmem', n_jobs=self.n_clusters, verbose=5)(delayed(dtw_barycenter_averaging)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take care that here, the function to be called depends on the value of self.metric
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply parallelize DBA over the clusters only speedup less than 0.5x times. More effort is required.
@@ -9,48 +9,39 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose, | |||
compute_diagonal=True, dtype=numpy.float, *args, **kwargs): | |||
"""Compute cross-similarity matrix with joblib parallelization for a given | |||
similarity function. | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please revert all these line suppressions? They tend to pack the docstrings.
@@ -60,7 +51,7 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose, | |||
k=0 if compute_diagonal else 1, | |||
m=len(dataset1)) | |||
matrix[indices] = Parallel(n_jobs=n_jobs, | |||
prefer="threads", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we always prefer this? Or should it be specified when calling the function?
dataset1[i], dataset2[j], | ||
*args, **kwargs | ||
#dataset2 = to_time_series_dataset(dataset2, dtype=dtype) | ||
with parallel_backend('loky'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a good idea to specify the parallel_backend
here? Will loky always be a better alternative or could it depend on some specificities or one's data?
I have left a few comments. Maybe the main point would be to assess the improvement (in terms of running times) that these changes offer on some benchmark. |
Also, tests seem to fail at the moment (let me know if it's due to a bug in my merge). |
First of all I want to thank you for the amazing job that you have done creating this python package which is awesome and very useful. I have used it in my final thesis for a master degree in which I had to cluster some type of players based on their behaviors. The problem in using Time series k-means wiht my dataset derived from the fact that the actual implementation performs parallel computations only in distance matrices (DTW in my case),thus i re-wrote the code related to the MM algorithm and to the barycenter update proceudre in the following way: the barceynter update procedure creates a process for each cluster of the algorithm (equal to the value of k) and then each process creates a pool of threads (15 in my implementation), that are used to perform the barycenter update procedure in a parallel and efficient way.