19 #ifdef KMEANS_THREADED 28 for (i = 0; i < config->
num_objs; i++)
31 int cluster, curr_cluster;
34 assert(config->
objs != NULL);
39 obj = config->
objs[i];
56 for (cluster = 1; cluster < config->
k; cluster++)
59 if (distance < curr_distance)
62 curr_cluster = cluster;
76 for (i = 0; i < config->
k; i++)
83 #ifdef KMEANS_THREADED 85 static void * update_r_threaded_main(
void *
args)
98 int num_threads = config->
num_objs * config->
k / KMEANS_THR_THRESHOLD;
101 num_threads = (num_threads > KMEANS_THR_MAX ? KMEANS_THR_MAX : num_threads);
110 pthread_t thread[KMEANS_THR_MAX];
111 pthread_attr_t thread_attr;
113 int obs_per_thread = config->
num_objs / num_threads;
116 for (i = 0; i < num_threads; i++)
124 thread_config[i].
objs += i*obs_per_thread;
125 thread_config[i].
clusters += i*obs_per_thread;
126 thread_config[i].
num_objs = obs_per_thread;
127 if (i == num_threads-1)
133 pthread_attr_init(&thread_attr);
134 pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
137 rc = pthread_create(&thread[i], &thread_attr, update_r_threaded_main, (
void *) &thread_config[i]);
140 printf(
"ERROR: return code from pthread_create() is %d\n", rc);
146 pthread_attr_destroy(&thread_attr);
149 for (i = 0; i < num_threads; i++)
152 rc = pthread_join(thread[i], &status);
155 printf(
"ERROR: return code from pthread_join() is %d\n", rc);
163 pthread_mutex_t update_means_k_mutex;
166 update_means_threaded_main(
void *arg)
173 pthread_mutex_lock (&update_means_k_mutex);
176 pthread_mutex_unlock (&update_means_k_mutex);
181 while (i < config->k);
192 int num_threads = config->
num_objs / KMEANS_THR_THRESHOLD;
195 num_threads = (num_threads > KMEANS_THR_MAX ? KMEANS_THR_MAX : num_threads);
205 pthread_t thread[KMEANS_THR_MAX];
206 pthread_attr_t thread_attr;
209 pthread_mutex_init(&update_means_k_mutex, NULL);
212 pthread_attr_init(&thread_attr);
213 pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
216 for (i = 0; i < num_threads; i++)
220 rc = pthread_create(&thread[i], &thread_attr, update_means_threaded_main, (
void *) config);
223 printf(
"ERROR: return code from pthread_create() is %d\n", rc);
228 pthread_attr_destroy(&thread_attr);
231 for (i = 0; i < num_threads; i++)
234 rc = pthread_join(thread[i], &status);
237 printf(
"ERROR: return code from pthread_join() is %d\n", rc);
242 pthread_mutex_destroy(&update_means_k_mutex);
253 size_t clusters_sz =
sizeof(int)*config->
num_objs;
256 assert(config->
objs);
266 memset(config->
clusters, 0, clusters_sz);
283 memcpy(clusters_last, config->
clusters, clusters_sz);
285 #ifdef KMEANS_THREADED 286 update_r_threaded(config);
287 update_means_threaded(config);
296 if (memcmp(clusters_last, config->
clusters, clusters_sz) == 0)
kmeans_result kmeans(kmeans_config *config)
static void update_r(kmeans_config *config)
#define LW_ON_INTERRUPT(x)
#define kmeans_malloc(size)
#define KMEANS_NULL_CLUSTER
unsigned int total_iterations
Datum distance(PG_FUNCTION_ARGS)
unsigned int max_iterations
static void update_means(kmeans_config *config)
kmeans_distance_method distance_method
kmeans_centroid_method centroid_method
#define KMEANS_MAX_ITERATIONS