Python: DBSCAN in 3 dimensional space
Asked Answered
A

4

12

I have been searching around for an implementation of DBSCAN for 3 dimensional points without much luck. Does anyone know I library that handles this or has any experience with doing this? I am assuming that the DBSCAN algorithm can handle 3 dimensions, by having the e value be a radius metric and the distance between points measured by euclidean separation. If anyone has tried implementing this and would like to share that would also be greatly appreciated, thanks.

Anthrax answered 7/10, 2014 at 21:59 Comment(3)
Have you tried sklearn? It has DBSCAN, and it can do three dimensions, too.Spragens
@Anony-Mousse I have and it doesn't work. Unless I am doing something wrong. I give it a list of 3 dimensional coordinates through dbscan.fit(X) and it gives me an error: expected dimension size 2 not 3. Otherwise, I know you can supply a distance matrix, in which case it doesn't have much value to me, I could just write a DBSCAN algorithm myself.Anthrax
Two options 1. Implement DBSCAN yourself, by using R Tree libraries, its just a day, or two task 2. Use Singular Value DecompositionTamberg
A
5

So this is what I came up with, I know it is not the most efficient implementation but it works; for example the region query, which is the main time eater of the algorithm computes the distance between two points more than once, instead of just storing it for use later.

class DBSCAN(object):

def __init__(self, eps=0, min_points=2):
    self.eps = eps
    self.min_points = min_points
    self.visited = []
    self.noise = []
    self.clusters = []
    self.dp = []

def cluster(self, data_points):
    self.visited = []
    self.dp = data_points
    c = 0
    for point in data_points:
        if point not in self.visited:
            self.visited.append(point)
            neighbours = self.region_query(point)
            if len(neighbours) < self.min_points:
                self.noise.append(point)
            else:
                c += 1
                self.expand_cluster(c, neighbours)

def expand_cluster(self, cluster_number, p_neighbours):
    cluster = ("Cluster: %d" % cluster_number, [])
    self.clusters.append(cluster)
    new_points = p_neighbours
    while new_points:
        new_points = self.pool(cluster, new_points)

def region_query(self, p):
    result = []
    for d in self.dp:
        distance = (((d[0] - p[0])**2 + (d[1] - p[1])**2 + (d[2] - p[2])**2)**0.5)
        if distance <= self.eps:
            result.append(d)
    return result

def pool(self, cluster, p_neighbours):
    new_neighbours = []
    for n in p_neighbours:
        if n not in self.visited:
            self.visited.append(n)
            n_neighbours = self.region_query(n)
            if len(n_neighbours) >= self.min_points:
                new_neighbours = self.unexplored(p_neighbours, n_neighbours)
        for c in self.clusters:
            if n not in c[1] and n not in cluster[1]:
                cluster[1].append(n)
    return new_neighbours

@staticmethod
def unexplored(x, y):
    z = []
    for p in y:
        if p not in x:
            z.append(p)
    return z
Anthrax answered 8/10, 2014 at 20:10 Comment(1)
Why bother with custom code as you can simply calculate 2D distance matrix and do DBSCAN on it?Gargle
V
13

You can use sklearn for DBSCAN. Here is some code that works for me-

from sklearn.cluster import DBSCAN
import numpy as np
data = np.random.rand(500,3)

db = DBSCAN(eps=0.12, min_samples=1).fit(data)
labels = db.labels_
from collections import Counter
Counter(labels)

The output I got was-

Counter({1: 342, 10: 30, 31: 13, 13: 11, 30: 10, 24: 5, 29: 5, 2: 4, 18: 4,
19: 4, 28: 4, 49: 4, 3: 3, 17: 3, 23: 3, 32: 3, 7: 2, 9: 2, 12: 2, 14: 2, 15: 2,
16: 2, 20: 2, 21: 2, 26: 2, 39: 2, 41: 2, 46: 2, 0: 1, 4: 1, 5: 1, 6: 1, 8: 1, 11:
1, 22: 1, 25: 1, 27: 1, 33: 1, 34: 1, 35: 1, 36: 1, 37: 1, 38: 1, 40: 1, 42: 1,
43: 1, 44: 1, 45: 1, 47: 1, 48: 1, 50: 1, 51: 1, 52: 1, 53: 1, 54: 1, 55: 1})

So, the clustering identifies 55 clusters with the count of the number of points in each cluster as shown above.

Valedictory answered 14/2, 2015 at 3:23 Comment(3)
What is the point of this exercise? The OP asks for 3-dimensional data.Gargle
@SergeyBushmanov it is 3d data, it has 500 samples and each sample has 3 coordinantesLehmbruck
This solution actually helped me understand a lot more and I've been able to shape my data accordingly to cluster 5D dataGerous
A
5

So this is what I came up with, I know it is not the most efficient implementation but it works; for example the region query, which is the main time eater of the algorithm computes the distance between two points more than once, instead of just storing it for use later.

class DBSCAN(object):

def __init__(self, eps=0, min_points=2):
    self.eps = eps
    self.min_points = min_points
    self.visited = []
    self.noise = []
    self.clusters = []
    self.dp = []

def cluster(self, data_points):
    self.visited = []
    self.dp = data_points
    c = 0
    for point in data_points:
        if point not in self.visited:
            self.visited.append(point)
            neighbours = self.region_query(point)
            if len(neighbours) < self.min_points:
                self.noise.append(point)
            else:
                c += 1
                self.expand_cluster(c, neighbours)

def expand_cluster(self, cluster_number, p_neighbours):
    cluster = ("Cluster: %d" % cluster_number, [])
    self.clusters.append(cluster)
    new_points = p_neighbours
    while new_points:
        new_points = self.pool(cluster, new_points)

def region_query(self, p):
    result = []
    for d in self.dp:
        distance = (((d[0] - p[0])**2 + (d[1] - p[1])**2 + (d[2] - p[2])**2)**0.5)
        if distance <= self.eps:
            result.append(d)
    return result

def pool(self, cluster, p_neighbours):
    new_neighbours = []
    for n in p_neighbours:
        if n not in self.visited:
            self.visited.append(n)
            n_neighbours = self.region_query(n)
            if len(n_neighbours) >= self.min_points:
                new_neighbours = self.unexplored(p_neighbours, n_neighbours)
        for c in self.clusters:
            if n not in c[1] and n not in cluster[1]:
                cluster[1].append(n)
    return new_neighbours

@staticmethod
def unexplored(x, y):
    z = []
    for p in y:
        if p not in x:
            z.append(p)
    return z
Anthrax answered 8/10, 2014 at 20:10 Comment(1)
Why bother with custom code as you can simply calculate 2D distance matrix and do DBSCAN on it?Gargle
Q
1

After working with the code provided in the first answer for some time I have concluded it has significant issues: 1)noise points can appear in later clusters. 2)it throws additional clusters which are subsets of previously built clusters due to issues with accounting for visited and unexplored points resulting in clusters with less than min_points, and 3)some points can end up in two clusters - they are reachable from both clusters and in this code may even be a core point for one of the clusters. The official DBSCAN algorithm places any point which is a core point in the cluster in which it is part of the core but places points which are only reachable from two clusters in the first cluster they are found to be reachable from. This makes the clustering of these points dependent on the order of the points in the data, but all points appear only once in the output data - either in a single cluster or as noise. Certain applications will want these shared points which are reachable from two clusters to be placed in both clusters, but core points should only appear in one cluster.

So this I what I came up with. It calculates the separation distance between two points twice and does not use any trees but it eliminates points with no near neighbors immediately and creates a list of core points once so only those points need be considered when building the cores. It makes use of sets for inclusion testing Note this implementation does place shared points in all clusters they are reachable from

 class DBSCAN(object):
    def __init__(self, eps=0, min_points=2):
        self.eps = eps
        self.min_points = min_points
        self.noise = []
        self.clusters = []
        self.dp = []
        self.near_neighbours = []
        self.wp = set()
        self.proto_cores = set()

    def cluster(self, points):
        c = 0
        self.dp = points
        self.near_neighbours = self.nn(points)
        while self.proto_cores:
            near_points = set(self.proto_cores)
            for p in near_points:
                c += 1
                core = self.add_core(self.near_neighbours[p])
                complete_cluster = self.expand_cluster(core)
                self.clusters.append(["Cluster: %d" % c, complete_cluster])
                self.proto_cores -= core
                break
        for pt in self.dp:
            flag = True
            for c in self.clusters:
                if pt in c[1]:
                    flag = False
            if flag:
                self.noise.append(pt)

    # set up dictionary of near neighbours,create working_point and proto_core sets
    def nn(self, points):
        self.wp = set()
        self.proto_cores = set()
        i = -1
        near_neighbours = {}
        for p in points:
            i += 1
            j = -1
            neighbours = []
            for q in points:
                j += 1
                distance = (((q[0] - p[0]) ** 2 + (q[1] - p[1]) ** 2
                             + (q[2] - p[2]) ** 2) ** 0.5)
                if distance <= self.eps:
                    neighbours.append(j)
            near_neighbours[i] = neighbours
            if len(near_neighbours[i]) > 1:
                self.wp |= {i}
            if len(near_neighbours[i]) >= self.min_points:
                self.proto_cores |= {i}
        return near_neighbours

    # add cluster core points
    def add_core(self, neighbours):
        core_points = set(neighbours)
        visited = set()
        while neighbours:
            points = set(neighbours)
            neighbours = set()
            for p in points:
                visited |= {p}
                if len(self.near_neighbours[p]) >= self.min_points:
                    core_points |= set(self.near_neighbours[p])
                    neighbours |= set(self.near_neighbours[p])
            neighbours -= visited
        return core_points

    # expand cluster to reachable points and rebuild actual point values
    def expand_cluster(self, core):
        core_points = set(core)
        full_cluster = []
        for p in core_points:
            core |= set(self.near_neighbours[p])
        for point_number in core:
            full_cluster.append(self.dp[point_number])
        return full_cluster
Quaternary answered 5/5, 2017 at 22:9 Comment(1)
How do you run this script? What example code should I run this class in my notebook?Lipinski
C
1

Why not just flatten the data to 2 dimensions with PCA and use DBSCAN with only 2 dimensions? Seems easier than trying to custom build something else.

Canotas answered 31/7, 2021 at 13:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.