blob: 9e2a5724caefc2c06fdc847f0b36cff571704dc7 [file] [log] [blame]
#include "median.h"
#include "hex.h"
#include <strings.h> // size_t
#include <stdio.h> // fprintf
#include <stdlib.h> // qsort
#include <string.h> // memcpy
// The structure of the data buffer is this:
// ----------------------------------------------------------------------------
// | Metadata | BlockData | Data * nMembers | BlockData | Data * nMembers .....
// ----------------------------------------------------------------------------
// Where the number of blocks is nLevels and stored in Metadata, and nMembers
// is stored in Metadata as well.
// The buffer is cast into this struct.
typedef struct bufferdata {
unsigned int id; // Block identity.
size_t level; // The level of this buffer.
size_t occupancy; // Number of occupants for this buffer.
median_data_t data[0]; // The data for this buffer.
} blockdata_t;
typedef struct metadata {
unsigned int gid; // The next available global id
size_t nLevels; // Number of buffers available.
size_t nMembers; // The number of members in each block.
size_t currentBufferIdx; // The index of the currently active buffer.
unsigned char data[0]; // The start of the data.
blockdata_t blocks[0]; // Makes the code cleaner and avoids unnecessary casting.
} metadata_t;
typedef struct sortrec {
median_data_t d;
size_t sz;
} sort_rec_t;
typedef struct spare_buffer {
size_t spareSize; // Size in bytes of the spare block.
size_t recordSize; // Size in SortRecords
size_t datumSize; // Size in number of Datum elements.
unsigned char data[0]; // Start of the data block.
sort_rec_t records[0]; // The same point cast as a sort_rec
median_data_t median_data[0]; // The same point cast as a median datum
size_t sizes[0]; // The same point case as size_t
} spare_buffer_t;
// ----------------------------------------------------------------------------
// Helper functions.
//
// This turns epsilon into a suggested size for each block.
size_t calculate_nMembers(double epsilon) {
return (size_t) ((1.0 / epsilon) + 1);
}
// This turns maxN into the number of levels needed. It's ceiling(log2(maxN)).
size_t calculate_nLevels(size_t maxN) {
size_t nLevels = 1;
while((maxN = (maxN >> 1))) nLevels++;
return nLevels;
}
// This returns the blocksize in bytes.
size_t blocksize(metadata_t * m) {
return sizeof(blockdata_t) + m->nMembers * sizeof(median_data_t);
}
// This return the start of the next block given
// a pointer to a block.
blockdata_t * nextblock(blockdata_t * blk, metadata_t * m) {
return (blockdata_t *) (blk->data + m->nMembers);
}
// This returns the size of the entire buffer.
size_t buffersize(metadata_t * m) {
return sizeof(metadata_t) + m->nLevels * blocksize(m);
}
// This is largely so that the "output" function
// can be run without ruining the buffer.
size_t sparespace(metadata_t * m) {
return sizeof(spare_buffer_t) + m->nLevels * m->nMembers * sizeof(sort_rec_t);
}
// Returns the first byte address which is too high to start a block regardless
// of size.. One blockdata worth less than the actual end of the buffer plus
// one.
blockdata_t * out_of_bounds(metadata_t * m) {
return (blockdata_t *) (m->data + buffersize(m) - blocksize(m));
}
// Get the current block.
blockdata_t * get_current_block(metadata_t *m) {
size_t offset = m->currentBufferIdx * blocksize(m);
return (blockdata_t *)(m->data + offset);
}
// Does the current block have space?
unsigned int current_block_has_space(metadata_t* m) {
return (get_current_block(m)->occupancy < m->nMembers);
}
// This will only be called in a safe situation. It's an internal function.
// Does not bound check!
// Do not call it unless you know what you are doing.
void insert_data_into_current_block(metadata_t *m, median_data_t data) {
blockdata_t * b = get_current_block(m);
b->data[b->occupancy++] = data;
}
// Check if there is space in some block and if there is, then
// make that block the current block.
unsigned int occupy_available_empty_block(metadata_t * m) {
// Iterate over the blocks.
for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
if(b->occupancy < m->nMembers) {
// Empty block found!
m->currentBufferIdx = b->id;
return 1;
}
}
// Failed to find an empty block.
return 0;
}
// A simple compare operator for the qsort routine.
int compare_data(const void *a, const void *b){
median_data_t * i = (median_data_t *)a;
median_data_t * j = (median_data_t *)b;
return (*i < *j)?-1:+1;
}
unsigned int merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j, median_scratch_buffer_t spare){
// Prepare an array for the data.
unsigned char * d = spare->data;
size_t datasz = m->nMembers * sizeof(median_data_t);
if (2*datasz > spare->spareSize) {
return 0;
}
bzero(d, 2*datasz);
// This is the size of the data.
// Copy the data to where it needs to be.
memcpy(d, i->data, datasz);
memcpy(d + datasz, j->data, datasz);
// Sort it. In principle we do not need to
// sort things, instead we should only sort
// when level is 0 and merge otherwise.
// For now, since most of the time, we will
// be consuming level 0 buffers, I have left this
// inefficiency in.
// But we should fix it.
qsort(d,2*m->nMembers,sizeof(median_data_t),&compare_data);
median_data_t * sorted_data = spare->median_data;
// Now interleave the sorted buffer.
// We should track parity and take the
// odd and even values alternately for each
// merge.
for(size_t idx = 0; idx < 2*m->nMembers; idx+=2){
i->data[idx/2] = sorted_data[idx];
}
// Update the metadata. i is full, j is empty.
// half the items are gone.
i->level++;
// Reset j to clean state.
j->level = 0;
j->occupancy = 0;
bzero(j->data, datasz);
return 1;
}
// Find the smallest level such that there are two blocks at the same level.
unsigned int identify_qualified_level(metadata_t *m, size_t *lvl, median_scratch_buffer_t spare) {
// Zero out the count buffer after validating the
// spare space buffer.
size_t datasz = m->nLevels * sizeof(size_t);
if (datasz > spare->spareSize) {
return 0;
}
// Prepare the size array.
size_t * levelcount = spare->sizes;
bzero(levelcount, datasz);
// Count up the level population.
for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
levelcount[b->level]++;
}
// Look for a level where the count is at least 2.
for(size_t idx = 0; idx < m->nLevels; idx++) {
if(levelcount[idx] > 1) {
// Found one, return it.
*lvl = idx;
return 1;
}
}
// There is no such thing.
return 0;
}
// This identifies the pair of blocks to merge.
unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j, median_scratch_buffer_t spare) {
// The qualified level will reside here.
// Find the qualified level.
size_t qualified_level = 0;
if (!identify_qualified_level(m,&qualified_level,spare)) {
return 0;
}
// Initial values to return.
*i = 0;
*j = 0;
// State machine to find the two buffers.
// Iterate over the blocks.
for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
if (!(*i) && (b->level == qualified_level)) {
*i = b; continue; // Found the first one.
}
if (!(*j) && (b->level == qualified_level)) {
*j = b; return 1; // Found the second one.
}
}
// Loop should not exit before the two are found.
return 0;
}
// THis routine creates an empty block by merging two available blocks into a
// single block. Consequently this frees up one of them.
// If two compatible blocks cannot be found, then this routine returns 0.
// It also returns 0 if for some reason the two blocks cannot be merged.
unsigned int create_empty_block(metadata_t * m, median_scratch_buffer_t spare) {
blockdata_t *i,*j;
if(!identify_compatible_block_pair(m, &i, &j, spare)){
return 0;
}
return merge_blocks(m,i,j,spare);
}
// Shift the current block. If there is an empty block, great!
// If not, then empty a block by merging two blocks and then occupy the
// freed up block.
unsigned int shift_current_block(metadata_t * m, median_scratch_buffer_t spare) {
if(!occupy_available_empty_block(m)) {
if(create_empty_block(m,spare)) { // Now there should be an empty block!
if(!occupy_available_empty_block(m)) {
return 0; // Stuck we are. Empty block is not, even though create it we did.
}
} else {
return 0; // Stuck we are. Block not.
}
}
return 1; // Occupy a block we did.
}
// ----------------------------------------------------------------------------
// Implementation of the interface functions (specified in median.h).
// ----------------------------------------------------------------------------
// ----------------------------------------------------------------------------
// Suggest the size of the buffer needed for a fixed epsilon and maxN.
// ----------------------------------------------------------------------------
median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size){
// First check for sane parameters.
// Check that maxN is not zero.
if (maxN <= 0) return MEDIAN_ERROR_INVALID_PARAMETERS;
// If n is too large, (i.e. epsilon is too small), you are looking for too fine an approximation.
// This is not the right algorithm. Maybe consider random sampling instead.
if (calculate_nMembers(epsilon) > 100000) return MEDIAN_ERROR_INVALID_PARAMETERS;
// Calculate the required buffer size for the right nLevels and nMembers
// value.
metadata_t m;
m.nLevels = calculate_nLevels(maxN);
m.nMembers = calculate_nMembers(epsilon);
// The size is the size of the buffer and the spare needed for merging
// and outputing. If we have many ongoing buffers, we can do with a single
// spare space buffer. We can figure out that optimization later.
// Ideally, this will be done by keeping a pointer to the global spare space
// within the metadata_t block. I prefer keeping this data structure
// relocatable and serializable. So we may need to think about what the
// alternatives are.
*suggested_size = buffersize(&m);
// Things worked. Return MEDIAN_OK
return MEDIAN_OK;
}
// ----------------------------------------------------------------------------
// Suggest a buffer size for the scratch buffer.
// ----------------------------------------------------------------------------
median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size){
metadata_t * md = (metadata_t *) m;
*suggested_size = sparespace(md);
return MEDIAN_OK;
}
// ----------------------------------------------------------------------------
// Initialize an allocated buffer.
// ----------------------------------------------------------------------------
median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer){
// Initialize.
size_t expectedSize = 0;
median_error_t error = MEDIAN_OK;
// First run median_suggest_buffer_size
if ((error = median_suggest_buffer_size(epsilon,maxN,&expectedSize)) != MEDIAN_OK) return error;
// Ensure that the buffer is large enough.
if (expectedSize > buffer_size) return MEDIAN_ERROR_BUFFER_TOO_SMALL;
// Cast the buffer as metadata_t so that we can initialize the metadata block.
metadata_t * m = (metadata_t *)buffer;
// Initialize the metadata block.
m->gid = 0;
m->nMembers = calculate_nMembers(epsilon);
m->nLevels = calculate_nLevels(maxN);
m->currentBufferIdx = 0;
// Zero all the data following the metadata block.
// Strictly this should not be necessary, but I'm a coward!
bzero(m->data, buffersize(m) - sizeof(metadata_t));
// cast the data part as an array of blockdata_t fronted blocks.
// and initialize each of the blocks.
// Iterate over the blocks.
for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
b->id = (m->gid)++;
b->level = 0;
b->occupancy = 0;
}
// Populate the return value.
*initialized_buffer = (median_buffer_t)buffer;
return error;
}
// ----------------------------------------------------------------------------
// Initialize the scratch buffer.
// ----------------------------------------------------------------------------
median_error_t median_init_scratch_buffer(void * buffer, size_t buffer_size, median_buffer_t m, median_scratch_buffer_t * initialized_buffer){
median_scratch_buffer_t s = buffer;
s->spareSize = buffer_size - sizeof(spare_buffer_t);
*initialized_buffer = s;
return MEDIAN_OK;
}
// ----------------------------------------------------------------------------
// Insert data into the buffer.
// We could have made this more concise and pretty if we used short-circuit
// execution of the boolean condition..
//
// if (current_block_has_space(m) || shift_current_block(m)) {
// insert_data_into_current_block(m,data);
// return MEDIAN_OK;
// } else {
// return MEDIAN_ERROR_MAX_N_EXCEEDED;
// }
//
// But this code is harder to read, because of the side
// effect issue.
// ----------------------------------------------------------------------------
median_error_t median_insert_data(median_buffer_t buffer,
median_data_t data,
median_scratch_buffer_t scratch){
metadata_t * m = (metadata_t *) buffer;
// If there is space in the current block..
if(current_block_has_space(m)) {
// Insert data into the current block. This
// call is to an unchecked method, which will
// never fail.
insert_data_into_current_block(m,data);
} else {
// Current block is full. So try and shift it.
if(shift_current_block(m, scratch)) {
// The current block has shifted successfully.
// So we can now call the unchecked method again.
insert_data_into_current_block(m,data);
} else {
// Failed to shift block. Stuck we are.
return MEDIAN_ERROR_MAX_N_EXCEEDED;
}
}
// Everything worked. So return MEDIAN_OK.
return MEDIAN_OK;
}
// ----------------------------------------------------------------------------
// Compare two sort records.
// ----------------------------------------------------------------------------
int compare_sort_rec(const void * a, const void *b) {
return ((sort_rec_t *)a)->d < ((sort_rec_t*)b)->d?-1:1;
}
// ----------------------------------------------------------------------------
// Output the estimated median.
// ----------------------------------------------------------------------------
median_error_t median_output_median(const median_buffer_t buffer,
median_data_t *approximate_median,
median_scratch_buffer_t scratch){
// Initialize the data buffer to zero.
metadata_t * m = (metadata_t *) buffer;
size_t datasz = m->nMembers * m->nLevels * sizeof(sort_rec_t);
if(datasz > scratch->spareSize) {
return MEDIAN_ERROR_BUFFER_TOO_SMALL;
}
sort_rec_t * s = (sort_rec_t *) scratch->data;
bzero(s, datasz);
// Initialize the count and weight values.
size_t count = 0;
size_t weight = 0;
// First construct the sort buffer.
for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
for(size_t i = 0; i < b->occupancy; i++) {
s->d = b->data[i];
s->sz = (1 << b->level);
weight += s->sz;
s++; count++;
}
}
qsort(scratch->data,count,sizeof(sort_rec_t),&compare_sort_rec);
// This is the weight at which the median will be found.
weight = weight/2;
// We now need to pick out the
// median.
for(s = (sort_rec_t *) scratch->data;
weight > s->sz;
weight -= (s++)->sz) ;
*approximate_median = s->d;
return MEDIAN_OK;
}
// ----------------------------------------------------------------------------
// Dump the state to stderr.
// ----------------------------------------------------------------------------
median_error_t median_dump_stderr(const median_buffer_t buffer){
// First print out all the metadata.
metadata_t * m = (metadata_t *) buffer;
blockdata_t * current = get_current_block(m);
fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id);
for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
// Now dump the blocks.
fprintf(stderr, "Block:%d\n Level:%d\n Occupancy:%d\n", b->id,(unsigned)b->level,(unsigned)b->occupancy);
// For every block, dump the first 8 keys in the block.
stderr_hexdmp("Data:",(unsigned char *)(b->data),8*sizeof(median_data_t));
}
// Done, return MEDIAN_OK.
return MEDIAN_OK;
}
// ----------------------------------------------------------------------------