| #include "median.h" |
| #include "hex.h" |
| #include <strings.h> // size_t |
| #include <stdio.h> // fprintf |
| #include <stdlib.h> // qsort |
| #include <string.h> // memcpy |
| #include <pthread.h> // pthread_mutex_* |
| |
| // The structure of the data buffer is this: |
| // ---------------------------------------------------------------------------- |
| // | Metadata | BlockData | Data * nMembers | BlockData | Data * nMembers ..... |
| // ---------------------------------------------------------------------------- |
| // Where the number of blocks is nLevels and stored in Metadata, and nMembers |
| // is stored in Metadata as well. |
| // The buffer is cast into this struct. |
| |
| |
| // ---------------------------------------------------------------------------- |
| // The block header. |
| // ---------------------------------------------------------------------------- |
| typedef struct blockdata { |
| unsigned int id; // Block identity. |
| size_t level; // The level of this buffer. |
| size_t occupancy; // Number of occupants for this buffer. |
| unsigned char data[0]; // Start of the data block, cast as unsigned char. |
| median_data_t median_data[0]; // The data for this buffer cast as median_data_t. |
| } blockdata_t; |
| |
| // ---------------------------------------------------------------------------- |
| // The overall header |
| // ---------------------------------------------------------------------------- |
| typedef struct metadata { |
| pthread_mutex_t mutex_ptr[0]; // Pointer to the mutex. |
| pthread_mutex_t mutex; // The mutex for this median counter. |
| unsigned int gid; // The next available global id |
| size_t nLevels; // Number of buffers available. |
| size_t nMembers; // The number of members in each block. |
| size_t currentBufferIdx; // The index of the currently active buffer. |
| unsigned char data[0]; // The start of the data. |
| blockdata_t block_data[0]; // Makes the code cleaner and avoids unnecessary casting. |
| } metadata_t; |
| |
| // ---------------------------------------------------------------------------- |
| // The record used for sorting the output |
| // ---------------------------------------------------------------------------- |
| typedef struct sortrec { |
| median_data_t d; // The data point. |
| size_t sz; // The leveled size of the point. |
| } sortrec_t; |
| |
| // ---------------------------------------------------------------------------- |
| // The spare buffer space header. |
| // ---------------------------------------------------------------------------- |
| typedef struct spare_buffer { |
| size_t spareSize; // Size in bytes of the spare block. |
| unsigned char data[0]; // Start of the data block. |
| sortrec_t sortrec_data[0]; // The same point cast as a sortrec |
| median_data_t median_data[0]; // The same point cast as a median datum |
| size_t size_data[0]; // The same point case as size_t |
| } spare_buffer_t; |
| |
| |
| // ---------------------------------------------------------------------------- |
| // Helper functions. |
| // |
| // This turns epsilon into a suggested size for each block. |
| size_t calculate_nMembers(double epsilon) { |
| return (size_t) ((1.0 / epsilon) + 1); |
| } |
| |
| // This turns maxN into the number of levels needed. It's ceiling(log2(maxN)). |
| size_t calculate_nLevels(size_t maxN) { |
| size_t nLevels = 1; |
| while((maxN = (maxN >> 1))) nLevels++; |
| return nLevels; |
| } |
| |
| // This returns the blocksize in bytes. |
| // The blocksize is the header plus m->nMembers many |
| // data points, each of type median_data_t |
| size_t blocksize(metadata_t * m) { |
| return sizeof(blockdata_t) + m->nMembers * sizeof(median_data_t); |
| } |
| |
| // This return the start of the next block given |
| // a pointer to a block. The easiest way is to jump |
| // m->nMembers median_data_t items away. |
| blockdata_t * nextblock(blockdata_t * blk, metadata_t * m) { |
| return (blockdata_t *) (blk->median_data + m->nMembers); |
| } |
| |
| // This returns the size of the entire buffer in bytes. |
| // The size of the metadata block, plus m->nLevels many |
| // blocks each of size blocksize(m) |
| size_t buffersize(metadata_t * m) { |
| return sizeof(metadata_t) + m->nLevels * blocksize(m); |
| } |
| |
| // Returns the first byte address which is too high to start a block regardless |
| // of size.. One blockdata worth less than the actual end of the buffer plus |
| // one. The computation is done in byte units and then cast as a blockdata_t |
| // pointer. |
| blockdata_t * out_of_bounds(metadata_t * m) { |
| return (blockdata_t *) (m->data + buffersize(m) - sizeof(metadata_t) - blocksize(m)); |
| } |
| |
| // Get the current block. |
| // The computation is done in bytes, and then cast as a blockdata_t pointer. |
| blockdata_t * get_current_block(metadata_t *m) { |
| size_t offset = m->currentBufferIdx * blocksize(m); |
| return (blockdata_t *)(m->data + offset); |
| } |
| |
| // This function inserts data into the current block if possible. |
| // Otherwise, it returns false. |
| unsigned int insert_into_current_block_if_possible(metadata_t *m, median_data_t data) { |
| blockdata_t * b = get_current_block(m); |
| if (b->occupancy < m->nMembers) { |
| b->median_data[b->occupancy++] = data; |
| return 1; |
| } |
| return 0; |
| } |
| |
| // Check if there is space in some block and if there is, then |
| // make that block the current block. |
| unsigned int occupy_available_empty_block(metadata_t * m) { |
| // Iterate over the blocks. |
| for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { |
| if(b->occupancy < m->nMembers) { |
| // Empty block found! |
| m->currentBufferIdx = b->id; |
| return 1; |
| } |
| } |
| // Failed to find an empty block. |
| return 0; |
| } |
| |
| // A simple compare operator for the qsort routine. |
| int compare_data(const void *a, const void *b){ |
| median_data_t * i = (median_data_t *)a; |
| median_data_t * j = (median_data_t *)b; |
| return (*i < *j)?-1:+1; |
| } |
| |
| unsigned int merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j, median_scratch_buffer_t spare){ |
| // Prepare an array for the data. |
| unsigned char * d = spare->data; |
| size_t datasz = m->nMembers * sizeof(median_data_t); |
| if (2*datasz > spare->spareSize) { |
| return 0; |
| } |
| bzero(d, 2*datasz); |
| |
| // This is the size of the data. |
| // Copy the data to where it needs to be. |
| memcpy(d, i->data, datasz); |
| memcpy(d + datasz, j->data, datasz); |
| |
| // Sort it. In principle we do not need to |
| // sort things, instead we should only sort |
| // when level is 0 and merge otherwise. |
| // For now, since most of the time, we will |
| // be consuming level 0 buffers, I have left this |
| // inefficiency in. |
| // But we should fix it. |
| qsort(spare->median_data,2*m->nMembers,sizeof(median_data_t),&compare_data); |
| |
| // Now interleave the sorted buffer. |
| // We should track parity and take the |
| // odd and even values alternately for each |
| // merge. So this loop should look like |
| // for (size_t idx = parity; idx < 2*......) |
| // Here parity should be equally likely to be |
| // zero or one. |
| // However, this does make the routine somewhat non |
| // deterministic, but offers some extra accuracy. |
| for(size_t idx = 0; idx < 2*m->nMembers; idx+=2){ |
| i->median_data[idx/2] = spare->median_data[idx]; |
| } |
| |
| // Update the metadata. i is full, j is empty. |
| // half the items are gone. |
| i->level++; |
| |
| // Reset j to clean state. |
| j->level = 0; |
| j->occupancy = 0; |
| bzero(j->data, datasz); |
| |
| // Done, so go back. |
| return 1; |
| } |
| |
| // Find the smallest level such that there are two blocks at the same level. |
| unsigned int identify_qualified_level(metadata_t *m, size_t *lvl, median_scratch_buffer_t spare) { |
| // Zero out the count buffer after validating the |
| // spare space buffer. |
| size_t datasz = m->nLevels * sizeof(size_t); |
| if (datasz > spare->spareSize) { |
| return 0; |
| } |
| |
| // Prepare the size array. |
| size_t * levelcount = spare->size_data; |
| bzero(levelcount, datasz); |
| |
| // Count up the level population. |
| for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { |
| levelcount[b->level]++; |
| } |
| |
| // Look for a level where the count is at least 2. |
| for(size_t idx = 0; idx < m->nLevels; idx++) { |
| if(levelcount[idx] > 1) { |
| // Found one, return it. |
| *lvl = idx; |
| return 1; |
| } |
| } |
| |
| // There is no such thing. |
| return 0; |
| } |
| |
| // This identifies the pair of blocks to merge. |
| unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j, median_scratch_buffer_t spare) { |
| // The qualified level will reside here. |
| // Find the qualified level. |
| size_t qualified_level = 0; |
| if (!identify_qualified_level(m,&qualified_level,spare)) { |
| return 0; |
| } |
| |
| // Initial values to return. |
| *i = 0; |
| *j = 0; |
| |
| // State machine to find the two buffers. |
| // Iterate over the blocks. |
| for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { |
| if (!(*i) && (b->level == qualified_level)) { |
| *i = b; continue; // Found the first one. |
| } |
| if (!(*j) && (b->level == qualified_level)) { |
| *j = b; return 1; // Found the second one. |
| } |
| } |
| // Loop should not exit before the two are found. |
| return 0; |
| } |
| |
| |
| // THis routine creates an empty block by merging two available blocks into a |
| // single block. Consequently this frees up one of them. |
| // If two compatible blocks cannot be found, then this routine returns 0. |
| // It also returns 0 if for some reason the two blocks cannot be merged. |
| unsigned int create_empty_block(metadata_t * m, median_scratch_buffer_t spare) { |
| blockdata_t *i,*j; |
| if(!identify_compatible_block_pair(m, &i, &j, spare)){ |
| return 0; |
| } |
| return merge_blocks(m,i,j,spare); |
| } |
| |
| // Shift the current block. If there is an empty block, great! |
| // If not, then empty a block by merging two blocks and then occupy the |
| // freed up block. |
| unsigned int shift_current_block(metadata_t * m, median_scratch_buffer_t spare) { |
| if(!occupy_available_empty_block(m)) { |
| if(create_empty_block(m,spare)) { // Now there should be an empty block! |
| if(!occupy_available_empty_block(m)) { |
| return 0; // Stuck we are. Empty block is not, even though create it we did. |
| } |
| } else { |
| return 0; // Stuck we are. Block not. |
| } |
| } |
| return 1; // Occupy a block we did. |
| } |
| |
| |
| // ---------------------------------------------------------------------------- |
| // Implementation of the interface functions (specified in median.h). |
| // ---------------------------------------------------------------------------- |
| |
| // ---------------------------------------------------------------------------- |
| // Suggest the size of the buffer needed for a fixed epsilon and maxN. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size){ |
| // First check for sane parameters. |
| |
| // Check that maxN is not zero. |
| if (maxN <= 0) return MEDIAN_ERROR_INVALID_PARAMETERS; |
| |
| // If n is too large, (i.e. epsilon is too small), you are looking for too fine an approximation. |
| // This is not the right algorithm. Maybe consider random sampling instead. |
| if (calculate_nMembers(epsilon) > 100000) return MEDIAN_ERROR_INVALID_PARAMETERS; |
| |
| // Calculate the required buffer size for the right nLevels and nMembers |
| // value. |
| metadata_t m; |
| m.nLevels = calculate_nLevels(maxN); |
| m.nMembers = calculate_nMembers(epsilon); |
| |
| // The size is the size of the buffer and the spare needed for merging |
| // and outputing. If we have many ongoing buffers, we can do with a single |
| // spare space buffer. We can figure out that optimization later. |
| // Ideally, this will be done by keeping a pointer to the global spare space |
| // within the metadata_t block. I prefer keeping this data structure |
| // relocatable and serializable. So we may need to think about what the |
| // alternatives are. |
| *suggested_size = buffersize(&m); |
| |
| // Things worked. Return MEDIAN_OK |
| return MEDIAN_OK; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Suggest a buffer size for the scratch buffer. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size){ |
| *suggested_size = m->nLevels * m->nMembers * sizeof(sortrec_t) + sizeof(spare_buffer_t); |
| return MEDIAN_OK; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Initialize an allocated buffer. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer){ |
| // Initialize. |
| size_t expectedSize = 0; |
| median_error_t error = MEDIAN_OK; |
| |
| // First run median_suggest_buffer_size |
| if ((error = median_suggest_buffer_size(epsilon,maxN,&expectedSize)) != MEDIAN_OK) return error; |
| // Ensure that the buffer is large enough. |
| if (expectedSize > buffer_size) return MEDIAN_ERROR_BUFFER_TOO_SMALL; |
| // Cast the buffer as metadata_t so that we can initialize the metadata block. |
| metadata_t * m = (metadata_t *)buffer; |
| // Initialize the mutex. |
| pthread_mutex_init(m->mutex_ptr, NULL); |
| // Initialize the metadata block. |
| m->gid = 0; |
| m->nMembers = calculate_nMembers(epsilon); |
| m->nLevels = calculate_nLevels(maxN); |
| m->currentBufferIdx = 0; |
| // Zero all the data following the metadata block. |
| // Strictly this should not be necessary, but I'm a coward! |
| bzero(m->data, buffersize(m) - sizeof(metadata_t)); |
| |
| // cast the data part as an array of blockdata_t fronted blocks. |
| // and initialize each of the blocks. |
| // Iterate over the blocks. |
| for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { |
| b->id = (m->gid)++; |
| b->level = 0; |
| b->occupancy = 0; |
| } |
| |
| // Populate the return value. |
| *initialized_buffer = (median_buffer_t)buffer; |
| return error; |
| } |
| |
| // Deinit the buffer, frees up the mutex. |
| median_error_t median_deinit_buffer(median_buffer_t m) { |
| pthread_mutex_destroy(m->mutex_ptr); |
| return MEDIAN_OK; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Initialize the scratch buffer. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_init_scratch_buffer(void * buffer, size_t buffer_size, median_buffer_t m, median_scratch_buffer_t * initialized_buffer){ |
| median_scratch_buffer_t s = buffer; |
| s->spareSize = buffer_size - sizeof(spare_buffer_t); |
| *initialized_buffer = s; |
| return MEDIAN_OK; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Insert data into the buffer. |
| // ---------------------------------------------------------------------------- |
| |
| // Unprotected function. |
| median_error_t _median_insert_data(median_buffer_t m, |
| median_data_t data, |
| median_scratch_buffer_t scratch){ |
| // If there is space in the current block.. |
| if(!insert_into_current_block_if_possible(m,data)) { |
| // Shift |
| if(!shift_current_block(m,scratch)) { |
| return MEDIAN_ERROR_MAX_N_EXCEEDED; |
| } |
| // Try to insert again, this should work because |
| // we just shifted. |
| if(!insert_into_current_block_if_possible(m,data)) { |
| return MEDIAN_ERROR_BUG; |
| } |
| } |
| return MEDIAN_OK; |
| } |
| |
| // Locked function. |
| median_error_t median_insert_data(median_buffer_t m, |
| median_data_t data, |
| median_scratch_buffer_t scratch){ |
| // Lock. |
| pthread_mutex_lock(m->mutex_ptr); |
| // Call the unprotected routine. |
| median_error_t ret = _median_insert_data(m,data,scratch); |
| // Unlock |
| pthread_mutex_unlock(m->mutex_ptr); |
| // return |
| return ret; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Compare two sort records. |
| // ---------------------------------------------------------------------------- |
| int compare_sortrec(const void * a, const void *b) { |
| return ((sortrec_t *)a)->d < ((sortrec_t*)b)->d?-1:1; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Output the estimated median. |
| // ---------------------------------------------------------------------------- |
| |
| // Unprotected implementation. |
| median_error_t _median_output_median(const median_buffer_t m, |
| median_data_t *approximate_median, |
| median_scratch_buffer_t scratch){ |
| // Initialize the data buffer to zero. |
| size_t datasz = m->nMembers * m->nLevels * sizeof(sortrec_t); |
| if(datasz > scratch->spareSize) { |
| return MEDIAN_ERROR_BUFFER_TOO_SMALL; |
| } |
| sortrec_t * s = scratch->sortrec_data; |
| bzero(s, datasz); |
| // Initialize the count and weight values. |
| size_t count = 0; |
| size_t weight = 0; |
| // First construct the sort buffer. |
| for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { |
| for(size_t i = 0; i < b->occupancy; i++) { |
| s->d = b->median_data[i]; |
| s->sz = (1 << b->level); |
| weight += s->sz; |
| s++; count++; |
| } |
| } |
| qsort(scratch->sortrec_data,count,sizeof(sortrec_t),&compare_sortrec); |
| |
| // This is the weight at which the median will be found. |
| weight = weight/2; |
| |
| // We now need to pick out the |
| // median. |
| for(s = scratch->sortrec_data; |
| weight > s->sz; |
| weight -= (s++)->sz); |
| |
| *approximate_median = s->d; |
| return MEDIAN_OK; |
| } |
| |
| // Locked call. |
| median_error_t median_output_median(const median_buffer_t m, |
| median_data_t *approximate_median, |
| median_scratch_buffer_t scratch){ |
| pthread_mutex_lock(m->mutex_ptr); |
| median_error_t ret = _median_output_median(m, approximate_median, scratch); |
| pthread_mutex_unlock(m->mutex_ptr); |
| return ret; |
| } |
| // ---------------------------------------------------------------------------- |
| // Dump the state to stderr. |
| // ---------------------------------------------------------------------------- |
| |
| // Unprotected call. |
| median_error_t _median_dump_stderr(const median_buffer_t m){ |
| // First print out all the metadata. |
| blockdata_t * current = get_current_block(m); |
| fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id); |
| |
| for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { |
| // Now dump the blocks. |
| fprintf(stderr, "Block:%d\n Level:%d\n Occupancy:%d\n", b->id,(unsigned)b->level,(unsigned)b->occupancy); |
| // For every block, dump the first 8 keys in the block. |
| stderr_hexdmp("Data:",b->data,8*sizeof(median_data_t)); |
| } |
| |
| // Done, return MEDIAN_OK. |
| return MEDIAN_OK; |
| } |
| |
| // Protected call. |
| median_error_t median_dump_stderr(const median_buffer_t m){ |
| pthread_mutex_lock(m->mutex_ptr); |
| median_error_t ret = _median_dump_stderr(m); |
| pthread_mutex_unlock(m->mutex_ptr); |
| return ret; |
| } |
| |
| // ---------------------------------------------------------------------------- |