Some cleanup to separate the buffers Output and state are now managed in separate buffers.
diff --git a/median.c b/median.c index 1dad56d..9e2a572 100644 --- a/median.c +++ b/median.c
@@ -12,15 +12,6 @@ // Where the number of blocks is nLevels and stored in Metadata, and nMembers // is stored in Metadata as well. // The buffer is cast into this struct. -typedef struct metadata { - unsigned int gid; // The next available global id - size_t nLevels; // Number of buffers available. - size_t nMembers; // The number of members in each block. - size_t currentBufferIdx; // The index of the currently active buffer. - size_t spareSize; // How much spare space is there in the buffer. - unsigned char data[0]; // The start of the data. -} metadata_t; - typedef struct bufferdata { unsigned int id; // Block identity. size_t level; // The level of this buffer. @@ -28,6 +19,30 @@ median_data_t data[0]; // The data for this buffer. } blockdata_t; +typedef struct metadata { + unsigned int gid; // The next available global id + size_t nLevels; // Number of buffers available. + size_t nMembers; // The number of members in each block. + size_t currentBufferIdx; // The index of the currently active buffer. + unsigned char data[0]; // The start of the data. + blockdata_t blocks[0]; // Makes the code cleaner and avoids unnecessary casting. +} metadata_t; + +typedef struct sortrec { + median_data_t d; + size_t sz; +} sort_rec_t; + +typedef struct spare_buffer { + size_t spareSize; // Size in bytes of the spare block. + size_t recordSize; // Size in SortRecords + size_t datumSize; // Size in number of Datum elements. + unsigned char data[0]; // Start of the data block. + sort_rec_t records[0]; // The same point cast as a sort_rec + median_data_t median_data[0]; // The same point cast as a median datum + size_t sizes[0]; // The same point case as size_t +} spare_buffer_t; + // ---------------------------------------------------------------------------- // Helper functions. @@ -44,11 +59,17 @@ return nLevels; } -// This returns the blocksize. +// This returns the blocksize in bytes. size_t blocksize(metadata_t * m) { return sizeof(blockdata_t) + m->nMembers * sizeof(median_data_t); } +// This return the start of the next block given +// a pointer to a block. +blockdata_t * nextblock(blockdata_t * blk, metadata_t * m) { + return (blockdata_t *) (blk->data + m->nMembers); +} + // This returns the size of the entire buffer. size_t buffersize(metadata_t * m) { return sizeof(metadata_t) + m->nLevels * blocksize(m); @@ -57,12 +78,14 @@ // This is largely so that the "output" function // can be run without ruining the buffer. size_t sparespace(metadata_t * m) { - return 2 * buffersize(m); + return sizeof(spare_buffer_t) + m->nLevels * m->nMembers * sizeof(sort_rec_t); } -// Returns the first byte address which is out of bounds. -unsigned char * out_of_bounds(metadata_t * m) { - return m->data + buffersize(m) - sizeof(blockdata_t); +// Returns the first byte address which is too high to start a block regardless +// of size.. One blockdata worth less than the actual end of the buffer plus +// one. +blockdata_t * out_of_bounds(metadata_t * m) { + return (blockdata_t *) (m->data + buffersize(m) - blocksize(m)); } // Get the current block. @@ -70,16 +93,12 @@ size_t offset = m->currentBufferIdx * blocksize(m); return (blockdata_t *)(m->data + offset); } + // Does the current block have space? unsigned int current_block_has_space(metadata_t* m) { return (get_current_block(m)->occupancy < m->nMembers); } -// First byte of the spare space. -void * spare_space(metadata_t * m) { - return out_of_bounds(m); -} - // This will only be called in a safe situation. It's an internal function. // Does not bound check! // Do not call it unless you know what you are doing. @@ -91,17 +110,12 @@ // Check if there is space in some block and if there is, then // make that block the current block. unsigned int occupy_available_empty_block(metadata_t * m) { - size_t idx = 0; - for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. - ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. - ptr += blocksize(m)) { // Advance by a block - blockdata_t * b = (blockdata_t *)ptr; + // Iterate over the blocks. + for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { if(b->occupancy < m->nMembers) { // Empty block found! - m->currentBufferIdx = idx; + m->currentBufferIdx = b->id; return 1; - } else { - idx++; } } // Failed to find an empty block. @@ -115,10 +129,13 @@ return (*i < *j)?-1:+1; } -unsigned int merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j){ +unsigned int merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j, median_scratch_buffer_t spare){ // Prepare an array for the data. - unsigned char * d = (unsigned char *)spare_space(m); + unsigned char * d = spare->data; size_t datasz = m->nMembers * sizeof(median_data_t); + if (2*datasz > spare->spareSize) { + return 0; + } bzero(d, 2*datasz); // This is the size of the data. @@ -134,7 +151,7 @@ // inefficiency in. // But we should fix it. qsort(d,2*m->nMembers,sizeof(median_data_t),&compare_data); - median_data_t * sorted_data = (median_data_t *)d; + median_data_t * sorted_data = spare->median_data; // Now interleave the sorted buffer. // We should track parity and take the @@ -147,6 +164,8 @@ // Update the metadata. i is full, j is empty. // half the items are gone. i->level++; + + // Reset j to clean state. j->level = 0; j->occupancy = 0; bzero(j->data, datasz); @@ -154,17 +173,23 @@ } // Find the smallest level such that there are two blocks at the same level. -unsigned int identify_qualified_level(metadata_t *m, size_t *lvl) { - // Zero out the count buffer. - size_t * levelcount = (size_t *)spare_space(m); - bzero(levelcount, m->nLevels * sizeof(size_t)); - // Iterate over the blocks, and count up the blocks at each level. - for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. - ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. - ptr += blocksize(m)) { // Advance by a block - blockdata_t * b = (blockdata_t*) ptr; +unsigned int identify_qualified_level(metadata_t *m, size_t *lvl, median_scratch_buffer_t spare) { + // Zero out the count buffer after validating the + // spare space buffer. + size_t datasz = m->nLevels * sizeof(size_t); + if (datasz > spare->spareSize) { + return 0; + } + + // Prepare the size array. + size_t * levelcount = spare->sizes; + bzero(levelcount, datasz); + + // Count up the level population. + for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { levelcount[b->level]++; } + // Look for a level where the count is at least 2. for(size_t idx = 0; idx < m->nLevels; idx++) { if(levelcount[idx] > 1) { @@ -173,69 +198,75 @@ return 1; } } + // There is no such thing. return 0; } -unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j) { - size_t qualified_level; +// This identifies the pair of blocks to merge. +unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j, median_scratch_buffer_t spare) { + // The qualified level will reside here. + // Find the qualified level. + size_t qualified_level = 0; + if (!identify_qualified_level(m,&qualified_level,spare)) { + return 0; + } + // Initial values to return. *i = 0; *j = 0; - // Find the qualified level. - if (!identify_qualified_level(m,&qualified_level)) { - return 0; - } + // State machine to find the two buffers. - for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. - ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. - ptr += blocksize(m)) { // Advance by a block - blockdata_t * b = (blockdata_t*) ptr; + // Iterate over the blocks. + for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { if (!(*i) && (b->level == qualified_level)) { - // Found the first one. - *i = b; - continue; + *i = b; continue; // Found the first one. } if (!(*j) && (b->level == qualified_level)) { - // Found both. - *j = b; - break; + *j = b; return 1; // Found the second one. } } - return 1; + // Loop should not exit before the two are found. + return 0; } -unsigned int create_empty_block(metadata_t * m) { +// THis routine creates an empty block by merging two available blocks into a +// single block. Consequently this frees up one of them. +// If two compatible blocks cannot be found, then this routine returns 0. +// It also returns 0 if for some reason the two blocks cannot be merged. +unsigned int create_empty_block(metadata_t * m, median_scratch_buffer_t spare) { blockdata_t *i,*j; - if(!identify_compatible_block_pair(m, &i, &j)){ - // Nothing to do unless there is a compatible block pair - // to merge. + if(!identify_compatible_block_pair(m, &i, &j, spare)){ return 0; } - return merge_blocks(m,i,j); + return merge_blocks(m,i,j,spare); } -// Shift the current block. -unsigned int shift_current_block(metadata_t * m) { +// Shift the current block. If there is an empty block, great! +// If not, then empty a block by merging two blocks and then occupy the +// freed up block. +unsigned int shift_current_block(metadata_t * m, median_scratch_buffer_t spare) { if(!occupy_available_empty_block(m)) { - if(create_empty_block(m)) { - // Now there should be an empty block! - // So occupy it. - occupy_available_empty_block(m); + if(create_empty_block(m,spare)) { // Now there should be an empty block! + if(!occupy_available_empty_block(m)) { + return 0; // Stuck we are. Empty block is not, even though create it we did. + } } else { - // Stuck we are, block not. - return 0; + return 0; // Stuck we are. Block not. } } - return 1; + return 1; // Occupy a block we did. } // ---------------------------------------------------------------------------- // Implementation of the interface functions (specified in median.h). +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- // Suggest the size of the buffer needed for a fixed epsilon and maxN. +// ---------------------------------------------------------------------------- median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size){ // First check for sane parameters. @@ -259,13 +290,24 @@ // within the metadata_t block. I prefer keeping this data structure // relocatable and serializable. So we may need to think about what the // alternatives are. - *suggested_size = buffersize(&m)+sparespace(&m); + *suggested_size = buffersize(&m); // Things worked. Return MEDIAN_OK return MEDIAN_OK; } +// ---------------------------------------------------------------------------- +// Suggest a buffer size for the scratch buffer. +// ---------------------------------------------------------------------------- +median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size){ + metadata_t * md = (metadata_t *) m; + *suggested_size = sparespace(md); + return MEDIAN_OK; +} + +// ---------------------------------------------------------------------------- // Initialize an allocated buffer. +// ---------------------------------------------------------------------------- median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer){ // Initialize. @@ -279,24 +321,21 @@ // Cast the buffer as metadata_t so that we can initialize the metadata block. metadata_t * m = (metadata_t *)buffer; // Initialize the metadata block. - m->gid = 1; + m->gid = 0; m->nMembers = calculate_nMembers(epsilon); m->nLevels = calculate_nLevels(maxN); m->currentBufferIdx = 0; - m->spareSize = buffer_size - buffersize(m); // Zero all the data following the metadata block. // Strictly this should not be necessary, but I'm a coward! bzero(m->data, buffersize(m) - sizeof(metadata_t)); // cast the data part as an array of blockdata_t fronted blocks. // and initialize each of the blocks. - for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. - ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. - ptr += blocksize(m)) { // Advance by a block - blockdata_t * t = (blockdata_t *) ptr; - t->id = (m->gid)++; - t->level = 0; - t->occupancy = 0; + // Iterate over the blocks. + for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { + b->id = (m->gid)++; + b->level = 0; + b->occupancy = 0; } // Populate the return value. @@ -304,7 +343,17 @@ return error; } +// ---------------------------------------------------------------------------- +// Initialize the scratch buffer. +// ---------------------------------------------------------------------------- +median_error_t median_init_scratch_buffer(void * buffer, size_t buffer_size, median_buffer_t m, median_scratch_buffer_t * initialized_buffer){ + median_scratch_buffer_t s = buffer; + s->spareSize = buffer_size - sizeof(spare_buffer_t); + *initialized_buffer = s; + return MEDIAN_OK; +} +// ---------------------------------------------------------------------------- // Insert data into the buffer. // We could have made this more concise and pretty if we used short-circuit // execution of the boolean condition.. @@ -318,8 +367,10 @@ // // But this code is harder to read, because of the side // effect issue. +// ---------------------------------------------------------------------------- median_error_t median_insert_data(median_buffer_t buffer, - median_data_t data){ + median_data_t data, + median_scratch_buffer_t scratch){ metadata_t * m = (metadata_t *) buffer; // If there is space in the current block.. if(current_block_has_space(m)) { @@ -329,7 +380,7 @@ insert_data_into_current_block(m,data); } else { // Current block is full. So try and shift it. - if(shift_current_block(m)) { + if(shift_current_block(m, scratch)) { // The current block has shifted successfully. // So we can now call the unchecked method again. insert_data_into_current_block(m,data); @@ -342,23 +393,65 @@ return MEDIAN_OK; } +// ---------------------------------------------------------------------------- +// Compare two sort records. +// ---------------------------------------------------------------------------- +int compare_sort_rec(const void * a, const void *b) { + return ((sort_rec_t *)a)->d < ((sort_rec_t*)b)->d?-1:1; +} + +// ---------------------------------------------------------------------------- +// Output the estimated median. +// ---------------------------------------------------------------------------- median_error_t median_output_median(const median_buffer_t buffer, - median_data_t *approximate_median){ + median_data_t *approximate_median, + median_scratch_buffer_t scratch){ + // Initialize the data buffer to zero. + metadata_t * m = (metadata_t *) buffer; + size_t datasz = m->nMembers * m->nLevels * sizeof(sort_rec_t); + if(datasz > scratch->spareSize) { + return MEDIAN_ERROR_BUFFER_TOO_SMALL; + } + sort_rec_t * s = (sort_rec_t *) scratch->data; + bzero(s, datasz); + // Initialize the count and weight values. + size_t count = 0; + size_t weight = 0; + // First construct the sort buffer. + for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { + for(size_t i = 0; i < b->occupancy; i++) { + s->d = b->data[i]; + s->sz = (1 << b->level); + weight += s->sz; + s++; count++; + } + } + qsort(scratch->data,count,sizeof(sort_rec_t),&compare_sort_rec); + + // This is the weight at which the median will be found. + weight = weight/2; + + // We now need to pick out the + // median. + for(s = (sort_rec_t *) scratch->data; + weight > s->sz; + weight -= (s++)->sz) ; + + *approximate_median = s->d; return MEDIAN_OK; } +// ---------------------------------------------------------------------------- // Dump the state to stderr. +// ---------------------------------------------------------------------------- median_error_t median_dump_stderr(const median_buffer_t buffer){ // First print out all the metadata. metadata_t * m = (metadata_t *) buffer; blockdata_t * current = get_current_block(m); - fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n SpareSize:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id,(unsigned)m->spareSize); + fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id); + for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { // Now dump the blocks. - for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. - ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. - ptr += blocksize(m)) { // Advance by a block - blockdata_t * b = (blockdata_t *)ptr; fprintf(stderr, "Block:%d\n Level:%d\n Occupancy:%d\n", b->id,(unsigned)b->level,(unsigned)b->occupancy); // For every block, dump the first 8 keys in the block. stderr_hexdmp("Data:",(unsigned char *)(b->data),8*sizeof(median_data_t)); @@ -367,3 +460,4 @@ // Done, return MEDIAN_OK. return MEDIAN_OK; } +// ----------------------------------------------------------------------------
diff --git a/median.h b/median.h index c62fbbd..1a89ae6 100644 --- a/median.h +++ b/median.h
@@ -20,9 +20,13 @@ // functions in this file. typedef int32_t median_error_t; +struct metadata_t; +struct spare_buffer_t; // A buffer which contains all the state held // within an approximate median calculator. -typedef void * median_buffer_t; +// And another type for scratch data. +typedef struct metadata * median_buffer_t; +typedef struct spare_buffer * median_scratch_buffer_t; // A single data element. For now this is defined // to be a 64 bit integer. But in principle this could be @@ -59,6 +63,9 @@ // In all reasonable calls, the returned value will be // MEDIAN_ERROR_OK. median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size); +// Suggest a size of the buffer for scratch space. This is needed +// so that the buffer is not corrupted by the output and merge steps. +median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size); // This routine will init the buffer. It will check if the buffer meets the // size restriction returned by #median_suggest_buffer_size(). If not, it will @@ -77,6 +84,10 @@ // In all reasonable calls, the returned value will be // MEDIAN_ERROR_OK. median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer); +// Initialize the scratch buffer. +// This function is similar in behaviour to the +// init function for the main buffer. +median_error_t median_init_scratch_buffer(void * buffer, size_t buffer_size, median_buffer_t m, median_scratch_buffer_t * initialized_buffer); // This routine inserts a data point into the median computation. It's // implemented to be really cheap. However, it is not always constant time. @@ -93,7 +104,7 @@ // then MEDIAN_ERROR_INVALID_PARAMETERS will be returned. // In all reasonable calls, the returned value will be // MEDIAN_ERROR_OK. -median_error_t median_insert_data(median_buffer_t buffer, median_data_t data); +median_error_t median_insert_data(median_buffer_t buffer, median_data_t data, median_scratch_buffer_t scratch_buffer); // This routine returns the approximate median of the data seen thusfar. // This is implemented to be really cheap as well, but not as cheap as the @@ -108,7 +119,7 @@ // return an error. // If it does return an error, the error will be // MEDIAN_ERROR_INVALID_PARAMETERS -median_error_t median_output_median(const median_buffer_t buffer, median_data_t *approximate_median); +median_error_t median_output_median(const median_buffer_t buffer, median_data_t *approximate_median, median_scratch_buffer_t scratch_buffer); // This hexdumps the state of the buffer to stderr so that it can help debug if needed. median_error_t median_dump_stderr(const median_buffer_t buffer);
diff --git a/median.main.c b/median.main.c index 49dac5d..bff30d1 100644 --- a/median.main.c +++ b/median.main.c
@@ -1,9 +1,10 @@ +#include <stdio.h> // fprintf, stderr #include <stdlib.h> // size_t #include <assert.h> // assert #include "median.h" #include "hex.h" -#define TEST_SIZE 1024*1024*128 // 128M +#define TEST_SIZE 1024*1024*256 // 256M // A sinple main routine meant as an example, // and also to unit test most of the intersting // functions of the median module. @@ -11,21 +12,29 @@ // Call suggest buffer size to determine the size of the buffer // we need to build the data structure. size_t n = 0; - assert(median_suggest_buffer_size(.001, TEST_SIZE , &n) == MEDIAN_OK); // Allocate the buffer of the required size. And then // initialize it (median_init_buffer). + assert(median_suggest_buffer_size(.001, TEST_SIZE , &n) == MEDIAN_OK); void * b = malloc(n); median_buffer_t buf; assert(median_init_buffer(b, n, .001, TEST_SIZE, &buf) == MEDIAN_OK); + // Initialize the spare buffer. + assert(median_suggest_scratch_buffer_size(buf, &n) == MEDIAN_OK); + void * s = malloc(n); + median_scratch_buffer_t scratch; + assert(median_init_scratch_buffer(s,n,buf,&scratch) == MEDIAN_OK); + // Insert some data. for(median_data_t i = 1; i < TEST_SIZE; i++) { // 1G - assert(median_insert_data(buf,i) == MEDIAN_OK); + assert(median_insert_data(buf,i,scratch) == MEDIAN_OK); } - // Dump it. - assert(median_dump_stderr(buf) == MEDIAN_OK); + median_data_t median; + assert(median_output_median(buf, &median,scratch) == MEDIAN_OK); + + fprintf(stderr, "And the median is %x\n", (unsigned)median); // All done, return 0 return 0;