PostGIS  2.4.9dev-r@@SVN_REVISION@@
kmeans.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2 *
3 * kmeans.c
4 * Generic k-means implementation
5 *
6 * Copyright (c) 2016, Paul Ramsey <pramsey@cleverelephant.ca>
7 *
8 *------------------------------------------------------------------------*/
9 
10 #include <assert.h>
11 #include <float.h>
12 #include <math.h>
13 #include <stdlib.h>
14 #include <stdio.h>
15 #include <string.h>
16 
17 #include "kmeans.h"
18 
19 #ifdef KMEANS_THREADED
20 #include <pthread.h>
21 #endif
22 
23 static void
25 {
26  int i;
27 
28  for (i = 0; i < config->num_objs; i++)
29  {
30  double distance, curr_distance;
31  int cluster, curr_cluster;
32  Pointer obj;
33 
34  assert(config->objs != NULL);
35  assert(config->num_objs > 0);
36  assert(config->centers);
37  assert(config->clusters);
38 
39  obj = config->objs[i];
40 
41  /*
42  * Don't try to cluster NULL objects, just add them
43  * to the "unclusterable cluster"
44  */
45  if (!obj)
46  {
47  config->clusters[i] = KMEANS_NULL_CLUSTER;
48  continue;
49  }
50 
51  /* Initialize with distance to first cluster */
52  curr_distance = (config->distance_method)(obj, config->centers[0]);
53  curr_cluster = 0;
54 
55  /* Check all other cluster centers and find the nearest */
56  for (cluster = 1; cluster < config->k; cluster++)
57  {
58  distance = (config->distance_method)(obj, config->centers[cluster]);
59  if (distance < curr_distance)
60  {
61  curr_distance = distance;
62  curr_cluster = cluster;
63  }
64  }
65 
66  /* Store the nearest cluster this object is in */
67  config->clusters[i] = curr_cluster;
68  }
69 }
70 
71 static void
73 {
74  int i;
75 
76  for (i = 0; i < config->k; i++)
77  {
78  /* Update the centroid for this cluster */
79  (config->centroid_method)(config->objs, config->clusters, config->num_objs, i, config->centers[i]);
80  }
81 }
82 
83 #ifdef KMEANS_THREADED
84 
85 static void * update_r_threaded_main(void *args)
86 {
87  kmeans_config *config = (kmeans_config*)args;
88  update_r(config);
89  pthread_exit(args);
90 }
91 
92 static void update_r_threaded(kmeans_config *config)
93 {
94  /* Computational complexity is function of objs/clusters */
95  /* We only spin up threading infra if we need more than one core */
96  /* running. We keep the threshold high so the overhead of */
97  /* thread management is small compared to thread compute time */
98  int num_threads = config->num_objs * config->k / KMEANS_THR_THRESHOLD;
99 
100  /* Can't run more threads than the maximum */
101  num_threads = (num_threads > KMEANS_THR_MAX ? KMEANS_THR_MAX : num_threads);
102 
103  /* If the problem size is small, don't bother w/ threading */
104  if (num_threads < 1)
105  {
106  update_r(config);
107  }
108  else
109  {
110  pthread_t thread[KMEANS_THR_MAX];
111  pthread_attr_t thread_attr;
112  kmeans_config thread_config[KMEANS_THR_MAX];
113  int obs_per_thread = config->num_objs / num_threads;
114  int i, rc;
115 
116  for (i = 0; i < num_threads; i++)
117  {
118  /*
119  * Each thread gets a copy of the config, but with the list pointers
120  * offest to the start of the batch the thread is responsible for, and the
121  * object count number adjusted similarly.
122  */
123  memcpy(&(thread_config[i]), config, sizeof(kmeans_config));
124  thread_config[i].objs += i*obs_per_thread;
125  thread_config[i].clusters += i*obs_per_thread;
126  thread_config[i].num_objs = obs_per_thread;
127  if (i == num_threads-1)
128  {
129  thread_config[i].num_objs += config->num_objs - num_threads*obs_per_thread;
130  }
131 
132  /* Initialize and set thread detached attribute */
133  pthread_attr_init(&thread_attr);
134  pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
135 
136  /* Now we just run the thread, on its subset of the data */
137  rc = pthread_create(&thread[i], &thread_attr, update_r_threaded_main, (void *) &thread_config[i]);
138  if (rc)
139  {
140  printf("ERROR: return code from pthread_create() is %d\n", rc);
141  exit(-1);
142  }
143  }
144 
145  /* Free attribute and wait for the other threads */
146  pthread_attr_destroy(&thread_attr);
147 
148  /* Wait for all calculations to complete */
149  for (i = 0; i < num_threads; i++)
150  {
151  void *status;
152  rc = pthread_join(thread[i], &status);
153  if (rc)
154  {
155  printf("ERROR: return code from pthread_join() is %d\n", rc);
156  exit(-1);
157  }
158  }
159  }
160 }
161 
162 int update_means_k;
163 pthread_mutex_t update_means_k_mutex;
164 
165 static void *
166 update_means_threaded_main(void *arg)
167 {
168  kmeans_config *config = (kmeans_config*)arg;
169  int i = 0;
170 
171  do
172  {
173  pthread_mutex_lock (&update_means_k_mutex);
174  i = update_means_k;
175  update_means_k++;
176  pthread_mutex_unlock (&update_means_k_mutex);
177 
178  if (i < config->k)
179  (config->centroid_method)(config->objs, config->clusters, config->num_objs, i, config->centers[i]);
180  }
181  while (i < config->k);
182 
183  pthread_exit(arg);
184 }
185 
186 static void
187 update_means_threaded(kmeans_config *config)
188 {
189  /* We only spin up threading infra if we need more than one core */
190  /* running. We keep the threshold high so the overhead of */
191  /* thread management is small compared to thread compute time */
192  int num_threads = config->num_objs / KMEANS_THR_THRESHOLD;
193 
194  /* Can't run more threads than the maximum */
195  num_threads = (num_threads > KMEANS_THR_MAX ? KMEANS_THR_MAX : num_threads);
196 
197  /* If the problem size is small, don't bother w/ threading */
198  if (num_threads < 1)
199  {
200  update_means(config);
201  }
202  else
203  {
204  /* Mutex protected counter to drive threads */
205  pthread_t thread[KMEANS_THR_MAX];
206  pthread_attr_t thread_attr;
207  int i, rc;
208 
209  pthread_mutex_init(&update_means_k_mutex, NULL);
210  update_means_k = 0;
211 
212  pthread_attr_init(&thread_attr);
213  pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
214 
215  /* Create threads to perform computation */
216  for (i = 0; i < num_threads; i++)
217  {
218 
219  /* Now we just run the thread, on its subset of the data */
220  rc = pthread_create(&thread[i], &thread_attr, update_means_threaded_main, (void *) config);
221  if (rc)
222  {
223  printf("ERROR: return code from pthread_create() is %d\n", rc);
224  exit(-1);
225  }
226  }
227 
228  pthread_attr_destroy(&thread_attr);
229 
230  /* Watch until completion */
231  for (i = 0; i < num_threads; i++)
232  {
233  void *status;
234  rc = pthread_join(thread[i], &status);
235  if (rc)
236  {
237  printf("ERROR: return code from pthread_join() is %d\n", rc);
238  exit(-1);
239  }
240  }
241 
242  pthread_mutex_destroy(&update_means_k_mutex);
243  }
244 }
245 
246 #endif /* KMEANS_THREADED */
247 
250 {
251  int iterations = 0;
252  int *clusters_last;
253  size_t clusters_sz = sizeof(int)*config->num_objs;
254 
255  assert(config);
256  assert(config->objs);
257  assert(config->num_objs);
258  assert(config->distance_method);
259  assert(config->centroid_method);
260  assert(config->centers);
261  assert(config->k);
262  assert(config->clusters);
263  assert(config->k <= config->num_objs);
264 
265  /* Zero out cluster numbers, just in case user forgets */
266  memset(config->clusters, 0, clusters_sz);
267 
268  /* Set default max iterations if necessary */
269  if (!config->max_iterations)
271 
272  /*
273  * Previous cluster state array. At this time, r doesn't mean anything
274  * but it's ok
275  */
276  clusters_last = kmeans_malloc(clusters_sz);
277 
278  while (1)
279  {
280  LW_ON_INTERRUPT(kmeans_free(clusters_last); return KMEANS_ERROR);
281 
282  /* Store the previous state of the clustering */
283  memcpy(clusters_last, config->clusters, clusters_sz);
284 
285 #ifdef KMEANS_THREADED
286  update_r_threaded(config);
287  update_means_threaded(config);
288 #else
289  update_r(config);
290  update_means(config);
291 #endif
292  /*
293  * if all the cluster numbers are unchanged since last time,
294  * we are at a stable solution, so we can stop here
295  */
296  if (memcmp(clusters_last, config->clusters, clusters_sz) == 0)
297  {
298  kmeans_free(clusters_last);
299  config->total_iterations = iterations;
300  return KMEANS_OK;
301  }
302 
303  if (iterations++ > config->max_iterations)
304  {
305  kmeans_free(clusters_last);
306  config->total_iterations = iterations;
308  }
309 
310  }
311 
312  kmeans_free(clusters_last);
313  config->total_iterations = iterations;
314  return KMEANS_ERROR;
315 }
316 
317 
args
Definition: ovdump.py:44
int * clusters
Definition: kmeans.h:120
void * Pointer
Definition: kmeans.h:59
kmeans_result kmeans(kmeans_config *config)
Definition: kmeans.c:249
kmeans_result
Definition: kmeans.h:61
static void update_r(kmeans_config *config)
Definition: kmeans.c:24
#define LW_ON_INTERRUPT(x)
#define kmeans_free(ptr)
Definition: kmeans.h:57
#define kmeans_malloc(size)
Definition: kmeans.h:56
#define KMEANS_NULL_CLUSTER
Definition: kmeans.h:37
Pointer * centers
Definition: kmeans.h:107
unsigned int total_iterations
Definition: kmeans.h:117
Datum distance(PG_FUNCTION_ARGS)
unsigned int max_iterations
Definition: kmeans.h:114
size_t num_objs
Definition: kmeans.h:100
static void update_means(kmeans_config *config)
Definition: kmeans.c:72
unsigned int k
Definition: kmeans.h:110
Pointer * objs
Definition: kmeans.h:97
kmeans_distance_method distance_method
Definition: kmeans.h:85
kmeans_centroid_method centroid_method
Definition: kmeans.h:88
#define KMEANS_MAX_ITERATIONS
Definition: kmeans.h:43