I have been searching around for an implementation of DBSCAN for 3 dimensional points without much luck. Does anyone know I library that handles this or has any experience with doing this? I am assuming that the DBSCAN algorithm can handle 3 dimensions, by having the e value be a radius metric and the distance between points measured by euclidean separation. If anyone has tried implementing this and would like to share that would also be greatly appreciated, thanks.
So this is what I came up with, I know it is not the most efficient implementation but it works; for example the region query, which is the main time eater of the algorithm computes the distance between two points more than once, instead of just storing it for use later.
class DBSCAN(object):
def __init__(self, eps=0, min_points=2):
self.eps = eps
self.min_points = min_points
self.visited = []
self.noise = []
self.clusters = []
self.dp = []
def cluster(self, data_points):
self.visited = []
self.dp = data_points
c = 0
for point in data_points:
if point not in self.visited:
self.visited.append(point)
neighbours = self.region_query(point)
if len(neighbours) < self.min_points:
self.noise.append(point)
else:
c += 1
self.expand_cluster(c, neighbours)
def expand_cluster(self, cluster_number, p_neighbours):
cluster = ("Cluster: %d" % cluster_number, [])
self.clusters.append(cluster)
new_points = p_neighbours
while new_points:
new_points = self.pool(cluster, new_points)
def region_query(self, p):
result = []
for d in self.dp:
distance = (((d[0] - p[0])**2 + (d[1] - p[1])**2 + (d[2] - p[2])**2)**0.5)
if distance <= self.eps:
result.append(d)
return result
def pool(self, cluster, p_neighbours):
new_neighbours = []
for n in p_neighbours:
if n not in self.visited:
self.visited.append(n)
n_neighbours = self.region_query(n)
if len(n_neighbours) >= self.min_points:
new_neighbours = self.unexplored(p_neighbours, n_neighbours)
for c in self.clusters:
if n not in c[1] and n not in cluster[1]:
cluster[1].append(n)
return new_neighbours
@staticmethod
def unexplored(x, y):
z = []
for p in y:
if p not in x:
z.append(p)
return z
DBSCAN
on it? –
Gargle You can use sklearn for DBSCAN. Here is some code that works for me-
from sklearn.cluster import DBSCAN
import numpy as np
data = np.random.rand(500,3)
db = DBSCAN(eps=0.12, min_samples=1).fit(data)
labels = db.labels_
from collections import Counter
Counter(labels)
The output I got was-
Counter({1: 342, 10: 30, 31: 13, 13: 11, 30: 10, 24: 5, 29: 5, 2: 4, 18: 4,
19: 4, 28: 4, 49: 4, 3: 3, 17: 3, 23: 3, 32: 3, 7: 2, 9: 2, 12: 2, 14: 2, 15: 2,
16: 2, 20: 2, 21: 2, 26: 2, 39: 2, 41: 2, 46: 2, 0: 1, 4: 1, 5: 1, 6: 1, 8: 1, 11:
1, 22: 1, 25: 1, 27: 1, 33: 1, 34: 1, 35: 1, 36: 1, 37: 1, 38: 1, 40: 1, 42: 1,
43: 1, 44: 1, 45: 1, 47: 1, 48: 1, 50: 1, 51: 1, 52: 1, 53: 1, 54: 1, 55: 1})
So, the clustering identifies 55 clusters with the count of the number of points in each cluster as shown above.
So this is what I came up with, I know it is not the most efficient implementation but it works; for example the region query, which is the main time eater of the algorithm computes the distance between two points more than once, instead of just storing it for use later.
class DBSCAN(object):
def __init__(self, eps=0, min_points=2):
self.eps = eps
self.min_points = min_points
self.visited = []
self.noise = []
self.clusters = []
self.dp = []
def cluster(self, data_points):
self.visited = []
self.dp = data_points
c = 0
for point in data_points:
if point not in self.visited:
self.visited.append(point)
neighbours = self.region_query(point)
if len(neighbours) < self.min_points:
self.noise.append(point)
else:
c += 1
self.expand_cluster(c, neighbours)
def expand_cluster(self, cluster_number, p_neighbours):
cluster = ("Cluster: %d" % cluster_number, [])
self.clusters.append(cluster)
new_points = p_neighbours
while new_points:
new_points = self.pool(cluster, new_points)
def region_query(self, p):
result = []
for d in self.dp:
distance = (((d[0] - p[0])**2 + (d[1] - p[1])**2 + (d[2] - p[2])**2)**0.5)
if distance <= self.eps:
result.append(d)
return result
def pool(self, cluster, p_neighbours):
new_neighbours = []
for n in p_neighbours:
if n not in self.visited:
self.visited.append(n)
n_neighbours = self.region_query(n)
if len(n_neighbours) >= self.min_points:
new_neighbours = self.unexplored(p_neighbours, n_neighbours)
for c in self.clusters:
if n not in c[1] and n not in cluster[1]:
cluster[1].append(n)
return new_neighbours
@staticmethod
def unexplored(x, y):
z = []
for p in y:
if p not in x:
z.append(p)
return z
DBSCAN
on it? –
Gargle After working with the code provided in the first answer for some time I have concluded it has significant issues: 1)noise points can appear in later clusters. 2)it throws additional clusters which are subsets of previously built clusters due to issues with accounting for visited and unexplored points resulting in clusters with less than min_points, and 3)some points can end up in two clusters - they are reachable from both clusters and in this code may even be a core point for one of the clusters. The official DBSCAN algorithm places any point which is a core point in the cluster in which it is part of the core but places points which are only reachable from two clusters in the first cluster they are found to be reachable from. This makes the clustering of these points dependent on the order of the points in the data, but all points appear only once in the output data - either in a single cluster or as noise. Certain applications will want these shared points which are reachable from two clusters to be placed in both clusters, but core points should only appear in one cluster.
So this I what I came up with. It calculates the separation distance between two points twice and does not use any trees but it eliminates points with no near neighbors immediately and creates a list of core points once so only those points need be considered when building the cores. It makes use of sets for inclusion testing Note this implementation does place shared points in all clusters they are reachable from
class DBSCAN(object):
def __init__(self, eps=0, min_points=2):
self.eps = eps
self.min_points = min_points
self.noise = []
self.clusters = []
self.dp = []
self.near_neighbours = []
self.wp = set()
self.proto_cores = set()
def cluster(self, points):
c = 0
self.dp = points
self.near_neighbours = self.nn(points)
while self.proto_cores:
near_points = set(self.proto_cores)
for p in near_points:
c += 1
core = self.add_core(self.near_neighbours[p])
complete_cluster = self.expand_cluster(core)
self.clusters.append(["Cluster: %d" % c, complete_cluster])
self.proto_cores -= core
break
for pt in self.dp:
flag = True
for c in self.clusters:
if pt in c[1]:
flag = False
if flag:
self.noise.append(pt)
# set up dictionary of near neighbours,create working_point and proto_core sets
def nn(self, points):
self.wp = set()
self.proto_cores = set()
i = -1
near_neighbours = {}
for p in points:
i += 1
j = -1
neighbours = []
for q in points:
j += 1
distance = (((q[0] - p[0]) ** 2 + (q[1] - p[1]) ** 2
+ (q[2] - p[2]) ** 2) ** 0.5)
if distance <= self.eps:
neighbours.append(j)
near_neighbours[i] = neighbours
if len(near_neighbours[i]) > 1:
self.wp |= {i}
if len(near_neighbours[i]) >= self.min_points:
self.proto_cores |= {i}
return near_neighbours
# add cluster core points
def add_core(self, neighbours):
core_points = set(neighbours)
visited = set()
while neighbours:
points = set(neighbours)
neighbours = set()
for p in points:
visited |= {p}
if len(self.near_neighbours[p]) >= self.min_points:
core_points |= set(self.near_neighbours[p])
neighbours |= set(self.near_neighbours[p])
neighbours -= visited
return core_points
# expand cluster to reachable points and rebuild actual point values
def expand_cluster(self, core):
core_points = set(core)
full_cluster = []
for p in core_points:
core |= set(self.near_neighbours[p])
for point_number in core:
full_cluster.append(self.dp[point_number])
return full_cluster
Why not just flatten the data to 2 dimensions with PCA and use DBSCAN with only 2 dimensions? Seems easier than trying to custom build something else.
© 2022 - 2024 — McMap. All rights reserved.