coclustering module provides the functionality to perform the co-clustering analysis of a positive data matrix
with real-valued elements. The code implements the Bregman block-average co-clustering (BBAC) algorithm from Ref. 1
and it was inspired by the Matlab code by Srujana Merugu and Arindam Banerjee. Algorithmic differences include:
The current implementation is independent of the order in which dimensions are provided (the column clustering followed the row clustering in the Matlab code).
Empty (co-)clusters are handled without the need of an additional numerical parameters.
The code was designed for geospatial applications (see the Tutorial section for some examples), where the array dimensions typically correspond to space and time.
Setup the Analysis¶
For an array
import numpy as np Z = np.array([[1., 1., 2., 4.], [1., 1., 2., 4.], [3., 3., 3., 5.]])
the co-clustering analysis is setup by initializing a
from cgc.coclustering import Coclustering cc = Coclustering( Z, # data matrix nclusters_row=2, # number of row clusters nclusters_col=3, # number of column clusters max_iterations=100, # maximum number of iterations conv_threshold=1.e-5, # error convergence threshold nruns=10, # number of differently-initialized runs output_filename='results.json' # JSON file where to write output )
Here, we have set the maximum number of row and column clusters to 2 and 3, respectively. However, a lower number of clusters can be identified by the algorithm (some of the clusters may remain empty). The algorithm entails an iterative procedure that is considered converged when the error of two consecutive iterations differs by less than a threshold (the default value is 1.e-5).
Multiple runs should be performed in order to limit the influence of the choice of initial cluster assignment on the result. A numerical parameter guarantees that no zero-valued arguments are encountered in the logarithm that appears in the I-divergence expression, which is employed as an objective function. Results are (optionally) written to a JSON file.
The first one, based on Numpy, is suitable to run the algorithm on a single machine. To make efficient use of architectures with multi-core CPUs, the various differently-initialized co-clustering runs can be executed as multiple threads. They are, in fact, embarrassingly parallel tasks that require no communication between each other. The co-clustering analysis is run using e.g. 4 threads as:
results = cc.run_with_threads(nthreads=4)
Only one thread is spawned if the
nthreads argument is not provided. This first implementation makes use of (fast)
matrix multiplications to calculate cluster-based properties, such as averages and distances. However, if
dimensions are large, large auxiliary matrices need to be stored into memory, so that the memory requirement of this
implementation quickly becomes a bottleneck. This is the default Numpy-based implementation.
Local (Numpy-based), low-memory footprint¶
A second Numpy-based implementation makes use of an algorithm with a much lower memory footprint, and can be selected
with the optional flag
results = cc.run_with_threads(nthreads=4, low_memory=True)
Also here, only one thread is spawned if the
nthreads argument is not provided. The reduced memory requirement
comes at a certain cost of performance. Fortunately, we also applied Numba’s just-in-time compilation feature in the
low_memory option. Thanks to this feature, the performance cost is significantly reduced.
An alternative implementation makes use of Dask and is thus suitable to run the co-clustering algorithm on distributed systems (e.g. on a cluster of compute nodes). Dask arrays are employed to process the data in chunks, which are distributed across the cluster. This approach is thus suitable to tackle large matrices that do not fit the memory of a single node.
If a Dask cluster is already running, we can connect to it and run the co-clustering analysis in the following way:
from dask.distributed import Client client = Client('tcp://daskscheduler:8786') # connect to the Dask scheduler results = cc.run_with_dask(client)
Dask clusters can be run on different types of distributed systems: clusters of nodes connected by SSH, HPC systems,
Kubernetes clusters on cloud services. A local Dask cluster (
LocalCluster) allows one to make use of the same
framework but using the local (multi-core) CPU(s). If no client is provided as argument, a default
is instantiated and made use of (see Dask docs).
In a second Dask-based implementation, the various co-clustering runs are submitted to the Dask scheduler, which
distributes them across the cluster. This implementation, which is activated by setting
experimental and it typically leads to very large memory usages.
This notebook presents a performance comparison of the various co-clustering implementations for varying input data size and number of clusters. For the system sizes considered, the default Numpy implementation is found to be x2-5 times faster that its low-memory counterpart, with exception of the smallest dataset sizes (similar timings are obtained there). The comparison also includes timings obtained with the Dask implementation, which was tested with a local thread-based cluster and four workers. While the overhead linked to using Dask is found to be significant for small datasets, the gap between the Dask and the Numpy implementations narrows with increasing datasets and number of clusters. Only for sufficiently large matrices and clusters the former is projected to become faster than the latter. It is important to stress, however, that the Dask implementation was not designed for improved performances, but to handle large datasets that could not be tackled due to memory limitations.
CoclusteringResults object returned by
contains the final row and column cluster assignments (
respectively) as well as the approximation error of the co-clustering (
results.error). Few other metadata are also
present, including the input parameters employed to setup the analysis (
- class cgc.coclustering.Coclustering(Z, nclusters_row, nclusters_col, conv_threshold=1e-05, max_iterations=1, nruns=1, output_filename='', row_clusters_init=None, col_clusters_init=None)¶
Perform a co-clustering analysis for a two-dimensional array.
Z (numpy.ndarray or dask.array.Array) – Data matrix for which to run the co-clustering analysis
nclusters_row (int) – Number of row clusters.
nclusters_col (int) – Number of column clusters.
conv_threshold (float, optional) – Convergence threshold for the objective function.
max_iterations (int, optional) – Maximum number of iterations.
nruns (int, optional) – Number of differently-initialized runs.
output_filename (string, optional) – Name of the JSON file where to write the results.
row_clusters_init (numpy.ndarray or array_like, optional) – Initial row cluster assignment.
col_clusters_init (numpy.ndarray or array_like, optional) – Initial column cluster assignment.
>>> import numpy as np >>> Z = np.random.randint(1, 100, size=(10, 8)).astype('float64') >>> cc = Coclustering(Z, nclusters_row=5, nclusters_col=4, max_iterations=50, nruns=10)
- run_with_dask(client=None, low_memory=True)¶
Run the co-clustering analysis using Dask.
client (dask.distributed.Client, optional) – Dask client. If not specified, the default LocalCluster is employed.
low_memory (bool, optional) – If False, all runs are submitted to the Dask cluster (experimental feature, discouraged).
- run_with_threads(nthreads=1, low_memory=False)¶
Run the co-clustering using an algorithm based on Numpy plus threading (only suitable for local runs).
nthreads (int, optional) – Number of threads employed to simultaneously run differently-initialized co-clustering analysis.
low_memory (bool, optional) – Make use of a low-memory version of the algorithm with Numba JIT acceleration
- class cgc.coclustering.CoclusteringResults(**input_parameters)¶
Contains results and metadata of a co-clustering calculation.
row_clusters – Final row cluster assignment.
col_clusters – Final column cluster assignment.
error – Approximation error of the co-clustering.
nruns_completed – Number of successfully completed runs.
nruns_converged – Number of converged runs.