Some cleanup to separate the buffers
Output and state are now managed in separate buffers.
diff --git a/median.c b/median.c
index 1dad56d..9e2a572 100644
--- a/median.c
+++ b/median.c
@@ -12,15 +12,6 @@
// Where the number of blocks is nLevels and stored in Metadata, and nMembers
// is stored in Metadata as well.
// The buffer is cast into this struct.
-typedef struct metadata {
- unsigned int gid; // The next available global id
- size_t nLevels; // Number of buffers available.
- size_t nMembers; // The number of members in each block.
- size_t currentBufferIdx; // The index of the currently active buffer.
- size_t spareSize; // How much spare space is there in the buffer.
- unsigned char data[0]; // The start of the data.
-} metadata_t;
-
typedef struct bufferdata {
unsigned int id; // Block identity.
size_t level; // The level of this buffer.
@@ -28,6 +19,30 @@
median_data_t data[0]; // The data for this buffer.
} blockdata_t;
+typedef struct metadata {
+ unsigned int gid; // The next available global id
+ size_t nLevels; // Number of buffers available.
+ size_t nMembers; // The number of members in each block.
+ size_t currentBufferIdx; // The index of the currently active buffer.
+ unsigned char data[0]; // The start of the data.
+ blockdata_t blocks[0]; // Makes the code cleaner and avoids unnecessary casting.
+} metadata_t;
+
+typedef struct sortrec {
+ median_data_t d;
+ size_t sz;
+} sort_rec_t;
+
+typedef struct spare_buffer {
+ size_t spareSize; // Size in bytes of the spare block.
+ size_t recordSize; // Size in SortRecords
+ size_t datumSize; // Size in number of Datum elements.
+ unsigned char data[0]; // Start of the data block.
+ sort_rec_t records[0]; // The same point cast as a sort_rec
+ median_data_t median_data[0]; // The same point cast as a median datum
+ size_t sizes[0]; // The same point case as size_t
+} spare_buffer_t;
+
// ----------------------------------------------------------------------------
// Helper functions.
@@ -44,11 +59,17 @@
return nLevels;
}
-// This returns the blocksize.
+// This returns the blocksize in bytes.
size_t blocksize(metadata_t * m) {
return sizeof(blockdata_t) + m->nMembers * sizeof(median_data_t);
}
+// This return the start of the next block given
+// a pointer to a block.
+blockdata_t * nextblock(blockdata_t * blk, metadata_t * m) {
+ return (blockdata_t *) (blk->data + m->nMembers);
+}
+
// This returns the size of the entire buffer.
size_t buffersize(metadata_t * m) {
return sizeof(metadata_t) + m->nLevels * blocksize(m);
@@ -57,12 +78,14 @@
// This is largely so that the "output" function
// can be run without ruining the buffer.
size_t sparespace(metadata_t * m) {
- return 2 * buffersize(m);
+ return sizeof(spare_buffer_t) + m->nLevels * m->nMembers * sizeof(sort_rec_t);
}
-// Returns the first byte address which is out of bounds.
-unsigned char * out_of_bounds(metadata_t * m) {
- return m->data + buffersize(m) - sizeof(blockdata_t);
+// Returns the first byte address which is too high to start a block regardless
+// of size.. One blockdata worth less than the actual end of the buffer plus
+// one.
+blockdata_t * out_of_bounds(metadata_t * m) {
+ return (blockdata_t *) (m->data + buffersize(m) - blocksize(m));
}
// Get the current block.
@@ -70,16 +93,12 @@
size_t offset = m->currentBufferIdx * blocksize(m);
return (blockdata_t *)(m->data + offset);
}
+
// Does the current block have space?
unsigned int current_block_has_space(metadata_t* m) {
return (get_current_block(m)->occupancy < m->nMembers);
}
-// First byte of the spare space.
-void * spare_space(metadata_t * m) {
- return out_of_bounds(m);
-}
-
// This will only be called in a safe situation. It's an internal function.
// Does not bound check!
// Do not call it unless you know what you are doing.
@@ -91,17 +110,12 @@
// Check if there is space in some block and if there is, then
// make that block the current block.
unsigned int occupy_available_empty_block(metadata_t * m) {
- size_t idx = 0;
- for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
- ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
- ptr += blocksize(m)) { // Advance by a block
- blockdata_t * b = (blockdata_t *)ptr;
+ // Iterate over the blocks.
+ for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
if(b->occupancy < m->nMembers) {
// Empty block found!
- m->currentBufferIdx = idx;
+ m->currentBufferIdx = b->id;
return 1;
- } else {
- idx++;
}
}
// Failed to find an empty block.
@@ -115,10 +129,13 @@
return (*i < *j)?-1:+1;
}
-unsigned int merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j){
+unsigned int merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j, median_scratch_buffer_t spare){
// Prepare an array for the data.
- unsigned char * d = (unsigned char *)spare_space(m);
+ unsigned char * d = spare->data;
size_t datasz = m->nMembers * sizeof(median_data_t);
+ if (2*datasz > spare->spareSize) {
+ return 0;
+ }
bzero(d, 2*datasz);
// This is the size of the data.
@@ -134,7 +151,7 @@
// inefficiency in.
// But we should fix it.
qsort(d,2*m->nMembers,sizeof(median_data_t),&compare_data);
- median_data_t * sorted_data = (median_data_t *)d;
+ median_data_t * sorted_data = spare->median_data;
// Now interleave the sorted buffer.
// We should track parity and take the
@@ -147,6 +164,8 @@
// Update the metadata. i is full, j is empty.
// half the items are gone.
i->level++;
+
+ // Reset j to clean state.
j->level = 0;
j->occupancy = 0;
bzero(j->data, datasz);
@@ -154,17 +173,23 @@
}
// Find the smallest level such that there are two blocks at the same level.
-unsigned int identify_qualified_level(metadata_t *m, size_t *lvl) {
- // Zero out the count buffer.
- size_t * levelcount = (size_t *)spare_space(m);
- bzero(levelcount, m->nLevels * sizeof(size_t));
- // Iterate over the blocks, and count up the blocks at each level.
- for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
- ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
- ptr += blocksize(m)) { // Advance by a block
- blockdata_t * b = (blockdata_t*) ptr;
+unsigned int identify_qualified_level(metadata_t *m, size_t *lvl, median_scratch_buffer_t spare) {
+ // Zero out the count buffer after validating the
+ // spare space buffer.
+ size_t datasz = m->nLevels * sizeof(size_t);
+ if (datasz > spare->spareSize) {
+ return 0;
+ }
+
+ // Prepare the size array.
+ size_t * levelcount = spare->sizes;
+ bzero(levelcount, datasz);
+
+ // Count up the level population.
+ for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
levelcount[b->level]++;
}
+
// Look for a level where the count is at least 2.
for(size_t idx = 0; idx < m->nLevels; idx++) {
if(levelcount[idx] > 1) {
@@ -173,69 +198,75 @@
return 1;
}
}
+
// There is no such thing.
return 0;
}
-unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j) {
- size_t qualified_level;
+// This identifies the pair of blocks to merge.
+unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j, median_scratch_buffer_t spare) {
+ // The qualified level will reside here.
+ // Find the qualified level.
+ size_t qualified_level = 0;
+ if (!identify_qualified_level(m,&qualified_level,spare)) {
+ return 0;
+ }
+
// Initial values to return.
*i = 0;
*j = 0;
- // Find the qualified level.
- if (!identify_qualified_level(m,&qualified_level)) {
- return 0;
- }
+
// State machine to find the two buffers.
- for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
- ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
- ptr += blocksize(m)) { // Advance by a block
- blockdata_t * b = (blockdata_t*) ptr;
+ // Iterate over the blocks.
+ for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
if (!(*i) && (b->level == qualified_level)) {
- // Found the first one.
- *i = b;
- continue;
+ *i = b; continue; // Found the first one.
}
if (!(*j) && (b->level == qualified_level)) {
- // Found both.
- *j = b;
- break;
+ *j = b; return 1; // Found the second one.
}
}
- return 1;
+ // Loop should not exit before the two are found.
+ return 0;
}
-unsigned int create_empty_block(metadata_t * m) {
+// THis routine creates an empty block by merging two available blocks into a
+// single block. Consequently this frees up one of them.
+// If two compatible blocks cannot be found, then this routine returns 0.
+// It also returns 0 if for some reason the two blocks cannot be merged.
+unsigned int create_empty_block(metadata_t * m, median_scratch_buffer_t spare) {
blockdata_t *i,*j;
- if(!identify_compatible_block_pair(m, &i, &j)){
- // Nothing to do unless there is a compatible block pair
- // to merge.
+ if(!identify_compatible_block_pair(m, &i, &j, spare)){
return 0;
}
- return merge_blocks(m,i,j);
+ return merge_blocks(m,i,j,spare);
}
-// Shift the current block.
-unsigned int shift_current_block(metadata_t * m) {
+// Shift the current block. If there is an empty block, great!
+// If not, then empty a block by merging two blocks and then occupy the
+// freed up block.
+unsigned int shift_current_block(metadata_t * m, median_scratch_buffer_t spare) {
if(!occupy_available_empty_block(m)) {
- if(create_empty_block(m)) {
- // Now there should be an empty block!
- // So occupy it.
- occupy_available_empty_block(m);
+ if(create_empty_block(m,spare)) { // Now there should be an empty block!
+ if(!occupy_available_empty_block(m)) {
+ return 0; // Stuck we are. Empty block is not, even though create it we did.
+ }
} else {
- // Stuck we are, block not.
- return 0;
+ return 0; // Stuck we are. Block not.
}
}
- return 1;
+ return 1; // Occupy a block we did.
}
// ----------------------------------------------------------------------------
// Implementation of the interface functions (specified in median.h).
+// ----------------------------------------------------------------------------
+// ----------------------------------------------------------------------------
// Suggest the size of the buffer needed for a fixed epsilon and maxN.
+// ----------------------------------------------------------------------------
median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size){
// First check for sane parameters.
@@ -259,13 +290,24 @@
// within the metadata_t block. I prefer keeping this data structure
// relocatable and serializable. So we may need to think about what the
// alternatives are.
- *suggested_size = buffersize(&m)+sparespace(&m);
+ *suggested_size = buffersize(&m);
// Things worked. Return MEDIAN_OK
return MEDIAN_OK;
}
+// ----------------------------------------------------------------------------
+// Suggest a buffer size for the scratch buffer.
+// ----------------------------------------------------------------------------
+median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size){
+ metadata_t * md = (metadata_t *) m;
+ *suggested_size = sparespace(md);
+ return MEDIAN_OK;
+}
+
+// ----------------------------------------------------------------------------
// Initialize an allocated buffer.
+// ----------------------------------------------------------------------------
median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer){
// Initialize.
@@ -279,24 +321,21 @@
// Cast the buffer as metadata_t so that we can initialize the metadata block.
metadata_t * m = (metadata_t *)buffer;
// Initialize the metadata block.
- m->gid = 1;
+ m->gid = 0;
m->nMembers = calculate_nMembers(epsilon);
m->nLevels = calculate_nLevels(maxN);
m->currentBufferIdx = 0;
- m->spareSize = buffer_size - buffersize(m);
// Zero all the data following the metadata block.
// Strictly this should not be necessary, but I'm a coward!
bzero(m->data, buffersize(m) - sizeof(metadata_t));
// cast the data part as an array of blockdata_t fronted blocks.
// and initialize each of the blocks.
- for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
- ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
- ptr += blocksize(m)) { // Advance by a block
- blockdata_t * t = (blockdata_t *) ptr;
- t->id = (m->gid)++;
- t->level = 0;
- t->occupancy = 0;
+ // Iterate over the blocks.
+ for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
+ b->id = (m->gid)++;
+ b->level = 0;
+ b->occupancy = 0;
}
// Populate the return value.
@@ -304,7 +343,17 @@
return error;
}
+// ----------------------------------------------------------------------------
+// Initialize the scratch buffer.
+// ----------------------------------------------------------------------------
+median_error_t median_init_scratch_buffer(void * buffer, size_t buffer_size, median_buffer_t m, median_scratch_buffer_t * initialized_buffer){
+ median_scratch_buffer_t s = buffer;
+ s->spareSize = buffer_size - sizeof(spare_buffer_t);
+ *initialized_buffer = s;
+ return MEDIAN_OK;
+}
+// ----------------------------------------------------------------------------
// Insert data into the buffer.
// We could have made this more concise and pretty if we used short-circuit
// execution of the boolean condition..
@@ -318,8 +367,10 @@
//
// But this code is harder to read, because of the side
// effect issue.
+// ----------------------------------------------------------------------------
median_error_t median_insert_data(median_buffer_t buffer,
- median_data_t data){
+ median_data_t data,
+ median_scratch_buffer_t scratch){
metadata_t * m = (metadata_t *) buffer;
// If there is space in the current block..
if(current_block_has_space(m)) {
@@ -329,7 +380,7 @@
insert_data_into_current_block(m,data);
} else {
// Current block is full. So try and shift it.
- if(shift_current_block(m)) {
+ if(shift_current_block(m, scratch)) {
// The current block has shifted successfully.
// So we can now call the unchecked method again.
insert_data_into_current_block(m,data);
@@ -342,23 +393,65 @@
return MEDIAN_OK;
}
+// ----------------------------------------------------------------------------
+// Compare two sort records.
+// ----------------------------------------------------------------------------
+int compare_sort_rec(const void * a, const void *b) {
+ return ((sort_rec_t *)a)->d < ((sort_rec_t*)b)->d?-1:1;
+}
+
+// ----------------------------------------------------------------------------
+// Output the estimated median.
+// ----------------------------------------------------------------------------
median_error_t median_output_median(const median_buffer_t buffer,
- median_data_t *approximate_median){
+ median_data_t *approximate_median,
+ median_scratch_buffer_t scratch){
+ // Initialize the data buffer to zero.
+ metadata_t * m = (metadata_t *) buffer;
+ size_t datasz = m->nMembers * m->nLevels * sizeof(sort_rec_t);
+ if(datasz > scratch->spareSize) {
+ return MEDIAN_ERROR_BUFFER_TOO_SMALL;
+ }
+ sort_rec_t * s = (sort_rec_t *) scratch->data;
+ bzero(s, datasz);
+ // Initialize the count and weight values.
+ size_t count = 0;
+ size_t weight = 0;
+ // First construct the sort buffer.
+ for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
+ for(size_t i = 0; i < b->occupancy; i++) {
+ s->d = b->data[i];
+ s->sz = (1 << b->level);
+ weight += s->sz;
+ s++; count++;
+ }
+ }
+ qsort(scratch->data,count,sizeof(sort_rec_t),&compare_sort_rec);
+
+ // This is the weight at which the median will be found.
+ weight = weight/2;
+
+ // We now need to pick out the
+ // median.
+ for(s = (sort_rec_t *) scratch->data;
+ weight > s->sz;
+ weight -= (s++)->sz) ;
+
+ *approximate_median = s->d;
return MEDIAN_OK;
}
+// ----------------------------------------------------------------------------
// Dump the state to stderr.
+// ----------------------------------------------------------------------------
median_error_t median_dump_stderr(const median_buffer_t buffer){
// First print out all the metadata.
metadata_t * m = (metadata_t *) buffer;
blockdata_t * current = get_current_block(m);
- fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n SpareSize:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id,(unsigned)m->spareSize);
+ fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id);
+ for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
// Now dump the blocks.
- for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
- ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
- ptr += blocksize(m)) { // Advance by a block
- blockdata_t * b = (blockdata_t *)ptr;
fprintf(stderr, "Block:%d\n Level:%d\n Occupancy:%d\n", b->id,(unsigned)b->level,(unsigned)b->occupancy);
// For every block, dump the first 8 keys in the block.
stderr_hexdmp("Data:",(unsigned char *)(b->data),8*sizeof(median_data_t));
@@ -367,3 +460,4 @@
// Done, return MEDIAN_OK.
return MEDIAN_OK;
}
+// ----------------------------------------------------------------------------
diff --git a/median.h b/median.h
index c62fbbd..1a89ae6 100644
--- a/median.h
+++ b/median.h
@@ -20,9 +20,13 @@
// functions in this file.
typedef int32_t median_error_t;
+struct metadata_t;
+struct spare_buffer_t;
// A buffer which contains all the state held
// within an approximate median calculator.
-typedef void * median_buffer_t;
+// And another type for scratch data.
+typedef struct metadata * median_buffer_t;
+typedef struct spare_buffer * median_scratch_buffer_t;
// A single data element. For now this is defined
// to be a 64 bit integer. But in principle this could be
@@ -59,6 +63,9 @@
// In all reasonable calls, the returned value will be
// MEDIAN_ERROR_OK.
median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size);
+// Suggest a size of the buffer for scratch space. This is needed
+// so that the buffer is not corrupted by the output and merge steps.
+median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size);
// This routine will init the buffer. It will check if the buffer meets the
// size restriction returned by #median_suggest_buffer_size(). If not, it will
@@ -77,6 +84,10 @@
// In all reasonable calls, the returned value will be
// MEDIAN_ERROR_OK.
median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer);
+// Initialize the scratch buffer.
+// This function is similar in behaviour to the
+// init function for the main buffer.
+median_error_t median_init_scratch_buffer(void * buffer, size_t buffer_size, median_buffer_t m, median_scratch_buffer_t * initialized_buffer);
// This routine inserts a data point into the median computation. It's
// implemented to be really cheap. However, it is not always constant time.
@@ -93,7 +104,7 @@
// then MEDIAN_ERROR_INVALID_PARAMETERS will be returned.
// In all reasonable calls, the returned value will be
// MEDIAN_ERROR_OK.
-median_error_t median_insert_data(median_buffer_t buffer, median_data_t data);
+median_error_t median_insert_data(median_buffer_t buffer, median_data_t data, median_scratch_buffer_t scratch_buffer);
// This routine returns the approximate median of the data seen thusfar.
// This is implemented to be really cheap as well, but not as cheap as the
@@ -108,7 +119,7 @@
// return an error.
// If it does return an error, the error will be
// MEDIAN_ERROR_INVALID_PARAMETERS
-median_error_t median_output_median(const median_buffer_t buffer, median_data_t *approximate_median);
+median_error_t median_output_median(const median_buffer_t buffer, median_data_t *approximate_median, median_scratch_buffer_t scratch_buffer);
// This hexdumps the state of the buffer to stderr so that it can help debug if needed.
median_error_t median_dump_stderr(const median_buffer_t buffer);
diff --git a/median.main.c b/median.main.c
index 49dac5d..bff30d1 100644
--- a/median.main.c
+++ b/median.main.c
@@ -1,9 +1,10 @@
+#include <stdio.h> // fprintf, stderr
#include <stdlib.h> // size_t
#include <assert.h> // assert
#include "median.h"
#include "hex.h"
-#define TEST_SIZE 1024*1024*128 // 128M
+#define TEST_SIZE 1024*1024*256 // 256M
// A sinple main routine meant as an example,
// and also to unit test most of the intersting
// functions of the median module.
@@ -11,21 +12,29 @@
// Call suggest buffer size to determine the size of the buffer
// we need to build the data structure.
size_t n = 0;
- assert(median_suggest_buffer_size(.001, TEST_SIZE , &n) == MEDIAN_OK);
// Allocate the buffer of the required size. And then
// initialize it (median_init_buffer).
+ assert(median_suggest_buffer_size(.001, TEST_SIZE , &n) == MEDIAN_OK);
void * b = malloc(n);
median_buffer_t buf;
assert(median_init_buffer(b, n, .001, TEST_SIZE, &buf) == MEDIAN_OK);
+ // Initialize the spare buffer.
+ assert(median_suggest_scratch_buffer_size(buf, &n) == MEDIAN_OK);
+ void * s = malloc(n);
+ median_scratch_buffer_t scratch;
+ assert(median_init_scratch_buffer(s,n,buf,&scratch) == MEDIAN_OK);
+
// Insert some data.
for(median_data_t i = 1; i < TEST_SIZE; i++) { // 1G
- assert(median_insert_data(buf,i) == MEDIAN_OK);
+ assert(median_insert_data(buf,i,scratch) == MEDIAN_OK);
}
- // Dump it.
- assert(median_dump_stderr(buf) == MEDIAN_OK);
+ median_data_t median;
+ assert(median_output_median(buf, &median,scratch) == MEDIAN_OK);
+
+ fprintf(stderr, "And the median is %x\n", (unsigned)median);
// All done, return 0
return 0;