This is interesting, and by some miracle the sample dataset is still accessible.
Kullback-Leibler divergence
The entropy() calls aliased as KL are extremely inefficient. Read the documentation to see what it's actually doing:
If qk is not None, then compute the relative entropy D = sum(pk * log(pk / qk)). This quantity is also known as the Kullback-Leibler divergence.
But you shouldn't write that verbatim. What they're actually describing is a dot product. Rewriting Kullback-Leibler as a direct dot product and removing the Scipy overhead alone produces a speedup of over 100x.
Metric tensor contraction symmetry
Even that is not enough. It is easy to prove that
- \$K(x,y) = K(y,x)\$
- \$K(x,x) = 0\$
So re-implement Kullback-Liebler as a pair of two tensor contractions. Numpy supports this via Einstein notation (einsum). For the case of intra-cluster distance, only apply it over over a little less than half of the matrix input space, i.e. the upper triangle of the Cartesian product indices excluding the diagonal; and double the subtotal.
Numpy
The program is riddled with Numpy API problems. Some of those problems are straightforward and some are complicated.
You need to remove every single call to asanyarray. It seems like it's being called on superstition. On the outer boundary of your program, you have control over how the data are being loaded; any conversion needed (hint: none) should happen there.
The code as-is doesn't run, and doesn't make any sense at the top level. C is a jagged list where the inner cluster sizes naturally differ, so then how do you expect to .shape[0] on it? It isn't - and can't be - an array. I did the minimum amount of rework on the original code to get it to run; this includes:
- Replace
C.shape[0] with len()
- Jettison some
asanyarray calls
- Fix the broken (twice!) indentation of
total_sum = total_sum + sub_sum
- Fully rewrite the
c1, c2 initialization block; this had been producing inner lists and you really shouldn't be doing that. Instead, generalise using a Pandas groupby.
- Comment out the line below
# sub_sum = sub_sum + which was probably also intended to be commented
Elsewhere: you have a superstition (perhaps inherited from C++) where numeric code needs to be generously sprinkled with float() casts. That isn't how Python works. Division auto-promotes to float.
Refactor (minus vectorisation)
For an end-to-end demonstration containing a regression test and benchmark, I had to severely truncate the input data to get it to run with the old code in reasonable time. Here the refactored functions have a refactored suffix. load() is a simple implementation that assumes only two clusters.
import time
import pandas as pd
from scipy.stats import entropy as KL
import numpy as np
def dis(di,dj):
di = np.asanyarray(di)
dj = np.asanyarray(dj)
m = 0.5 * (di+dj)
kl1 = KL(di,m)
kl2 = KL(dj,m)
return 0.5*(kl1+kl2)
def dis_refactored(di: np.ndarray, dj: np.ndarray) -> float:
"""
D = sum(pk * log(pk / qk)). This quantity is also known
as the Kullback-Leibler divergence.
"""
m = 2/(di + dj)
kl1 = np.log(di*m).dot(di)
kl2 = np.log(dj*m).dot(dj)
return 0.5*(kl1 + kl2)
def Intra_Cluster_dist(C):
# C = np.asanyarray(C)
K = float(len(C)) # .shape[0])
factor1 = 1.0/float(K)
total_sum = 0.0
for cluster in C:
cluster = np.asanyarray(cluster)
below1 = float(cluster.shape[0])
below2 = float(below1 - 1)
sub_sum = 0.0
for di in cluster:
#others = cluster[:]
#others.remove(di)
others = cluster[np.logical_not((cluster == np.array(di)).all(axis=1))]
#for dj in others:
# sub_sum = sub_sum +
# (2*float(dis(di,dj)))/(float(below1)*float(below2))
sub_sum = sub_sum + np.fromiter((((2*float(dis(di,dj)))/(float(below1)*float(below2))) for dj in others), dtype=float).sum()
total_sum = total_sum + sub_sum
return float(factor1 * total_sum)
def intra_cluster_dist_refactor(C: list[np.ndarray, np.ndarray]):
K = len(C)
total_sum = 0
for cluster in C:
below1 = cluster.shape[0]
below = 2/below1/(below1 - 1)
for di in cluster:
total_sum += below*sum(
dis_refactored(di, dj) for dj in cluster
if dj is not di
)
return total_sum/K
def Inter_Cluster_dist(C):
K = float(len(C))
factor1 = float((1/(K*(K-1))))
total_sum = 0.0
for cluster in C:
sub_sum = 0.0
other_clusters = C[:]
# other_clusters.remove(cluster)
other_clusters = list(filter(lambda c: c is not cluster, other_clusters))
below1 = float(len(cluster))
for other in other_clusters:
below2 = float(len(other))
for di in cluster:
for dj in other:
sub_sum = sub_sum + (float((dis(di, dj)))/float((below1*below2)))
total_sum = total_sum + sub_sum
return float(factor1 * total_sum )
def inter_cluster_dist_refactor(C: list[np.ndarray, np.ndarray]):
K = len(C)
total_sum = 0
for cluster in C:
for other in C:
if other is not cluster:
below = 1/len(cluster)/len(other)
for di in cluster:
for dj in other:
total_sum += below*dis_refactored(di, dj)
return total_sum/K/(K - 1)
def H_score(C):
return float(Intra_Cluster_dist(C))/float(Inter_Cluster_dist(C))
def H_score_refactored(C: list[np.ndarray, np.ndarray]) -> float:
return intra_cluster_dist_refactor(C)/inter_cluster_dist_refactor(C)
def load(df: pd.DataFrame) -> list[np.ndarray, np.ndarray]:
label_1 = df['labels'] == 1
features = df[['feature_1', 'feature_2']].values
c1 = features[~label_1]
c2 = features[label_1]
return [c1, c2]
def test() -> None:
print('Starting...')
df = pd.read_csv('example_data.csv').iloc[:15, :]
C = load(df)
print('OP implementation:')
start = time.perf_counter()
score = H_score(C)
stop = time.perf_counter()
print(score)
print(f'{stop - start:.5f} s')
print('Refactor:')
start = time.perf_counter()
score_refactored = H_score_refactored(C)
stop = time.perf_counter()
print(score_refactored)
print(f'{stop - start:.5f} s')
assert np.isclose(score, score_refactored, rtol=0, atol=1e-12)
if __name__ == '__main__':
test()
OP implementation:
0.5274884633416125
0.13273 s
Refactor:
0.5274884633416115
0.00122 s
Full vectorisation
The performance difference here is so large that it's not practical to test against the original code. This first tests against the refactored code above for a mid-size input subset, and then benchmarks the fully-vectorised implementation using the entire input. It also demonstrates a load() that could function with an arbitrary number of cluster labels.
import time
import typing
import pandas as pd
import numpy as np
MEAN = np.array((0.5, 0.5))
MEAN.flags.writeable = False
def dis_refactored(di: np.ndarray, dj: np.ndarray) -> float:
"""
D = sum(pk * log(pk / qk)). This quantity is also known
as the Kullback-Leibler divergence.
This returns 0 if di == dj.
dis(di, dj) == dis(dj, di).
"""
m = 2/(di + dj)
kl1 = np.log(di*m).dot(di)
kl2 = np.log(dj*m).dot(dj)
return 0.5*(kl1 + kl2)
def dis_vec(dij: np.ndarray, subscripts: str) -> float:
m = np.einsum('i,i...->...', MEAN, dij)
return np.einsum(
'i,' + subscripts + ',' + subscripts + '->',
MEAN, dij, np.log(dij/m),
)
def intra_cluster_dist_refactor(C: typing.Sequence[np.ndarray]) -> float:
K = len(C)
total_sum = 0
for cluster in C:
below1 = cluster.shape[0]
below = 2/below1/(below1 - 1)
for di in cluster:
total_sum += below*sum(
dis_refactored(di, dj) for dj in cluster
if dj is not di
)
return total_sum/K
def inter_cluster_dist_refactor(C: typing.Sequence[np.ndarray]):
K = len(C)
total_sum = 0
for cluster in C:
for other in C:
if other is not cluster:
below = 1/len(cluster)/len(other)
for di in cluster:
for dj in other:
total_sum += below*dis_refactored(di, dj)
return total_sum/K/(K - 1)
def h_score_refactored(C: typing.Sequence[np.ndarray]) -> float:
return intra_cluster_dist_refactor(C)/inter_cluster_dist_refactor(C)
def h_score_vectorised(C: typing.Sequence[np.ndarray]) -> float:
intra_dist = 0
inter_dist = 0
for cluster in C:
below1 = len(cluster)
ij = np.triu_indices(n=below1, k=1)
# Dimensions: (2=first,second), (n*(n-1)/2), nfeatures
subtotal = dis_vec(dij=cluster[ij, :], subscripts='ijk')
intra_dist += 4*subtotal/(below1*(below1 - 1))
for other in C:
if other is not cluster:
dij = np.stack(np.broadcast_arrays(
cluster[:, np.newaxis, :],
other[np.newaxis, :, :],
))
subtotal = dis_vec(dij, subscripts='ijkl')
inter_dist += subtotal/(below1*len(other))
k = len(C)
return intra_dist/inter_dist*(k - 1)
def load(df: pd.DataFrame) -> list[np.ndarray]:
feature_cols = [
col for col in df.columns
if col.startswith('feature')
]
return [
group.values
for label, group in df.groupby('labels')[feature_cols]
]
def demo() -> None:
short = 125
print('Starting...')
df = pd.read_csv('example_data.csv')
partial_c = load(df.iloc[:short, :])
print(f'Fully-vectorised implementation, n={short}:')
start = time.perf_counter()
score_vectorised = h_score_vectorised(partial_c)
stop = time.perf_counter()
print(score_vectorised)
print(f'{stop - start:.5f} s')
print()
print(f'Refactored, partially-vectorised implementation, n={short}:')
start = time.perf_counter()
score_refactored = h_score_refactored(partial_c)
stop = time.perf_counter()
print(score_refactored)
print(f'{stop - start:.5f} s')
print()
assert np.isclose(score_refactored, score_vectorised, rtol=0, atol=1e-12)
print(f'Vectorised time on full dataset, n={len(df)}:')
c = load(df)
start = time.perf_counter()
score_vectorised = h_score_vectorised(c)
stop = time.perf_counter()
print(score_vectorised)
print(f'{stop - start:.5f} s')
if __name__ == '__main__':
demo()
Fully-vectorised implementation, n=125:
0.20818338172329773
0.00112 s
Refactored, partially-vectorised implementation, n=125:
0.20818338172329806
0.11779 s
Vectorised time on full dataset, n=10224:
0.15981938717937083
6.61786 s