| #include "median.h" |
| #include "hex.h" |
| #include <strings.h> // size_t |
| #include <stdio.h> // fprintf |
| #include <stdlib.h> // qsort |
| #include <string.h> // memcpy |
| |
| // The structure of the data buffer is this: |
| // ---------------------------------------------------------------------------- |
| // | Metadata | BlockData | Data * nMembers | BlockData | Data * nMembers ..... |
| // ---------------------------------------------------------------------------- |
| // Where the number of blocks is nLevels and stored in Metadata, and nMembers |
| // is stored in Metadata as well. |
| // The buffer is cast into this struct. |
| typedef struct metadata { |
| unsigned int gid; // The next available global id |
| size_t nLevels; // Number of buffers available. |
| size_t nMembers; // The number of members in each block. |
| size_t currentBufferIdx; // The index of the currently active buffer. |
| size_t spareSize; // How much spare space is there in the buffer. |
| unsigned char data[0]; // The start of the data. |
| } metadata_t; |
| |
| typedef struct bufferdata { |
| unsigned int id; // Block identity. |
| size_t level; // The level of this buffer. |
| size_t occupancy; // Number of occupants for this buffer. |
| median_data_t data[0]; // The data for this buffer. |
| } blockdata_t; |
| |
| |
| // ---------------------------------------------------------------------------- |
| // Helper functions. |
| // |
| // This turns epsilon into a suggested size for each block. |
| size_t calculate_nMembers(double epsilon) { |
| return (size_t) ((1.0 / epsilon) + 1); |
| } |
| |
| // This turns maxN into the number of levels needed. It's ceiling(log2(maxN)). |
| size_t calculate_nLevels(size_t maxN) { |
| size_t nLevels = 1; |
| while((maxN = (maxN >> 1))) nLevels++; |
| return nLevels; |
| } |
| |
| // This returns the blocksize. |
| size_t blocksize(metadata_t * m) { |
| return sizeof(blockdata_t) + m->nMembers * sizeof(median_data_t); |
| } |
| |
| // This returns the size of the entire buffer. |
| size_t buffersize(metadata_t * m) { |
| return sizeof(metadata_t) + m->nLevels * blocksize(m); |
| } |
| |
| // This is largely so that the "output" function |
| // can be run without ruining the buffer. |
| size_t sparespace(metadata_t * m) { |
| return 2 * buffersize(m); |
| } |
| |
| // Returns the first byte address which is out of bounds. |
| unsigned char * out_of_bounds(metadata_t * m) { |
| return m->data + buffersize(m) - sizeof(blockdata_t); |
| } |
| |
| // Get the current block. |
| blockdata_t * get_current_block(metadata_t *m) { |
| size_t offset = m->currentBufferIdx * blocksize(m); |
| return (blockdata_t *)(m->data + offset); |
| } |
| // Does the current block have space? |
| unsigned int current_block_has_space(metadata_t* m) { |
| return (get_current_block(m)->occupancy < m->nMembers); |
| } |
| |
| // First byte of the spare space. |
| void * spare_space(metadata_t * m) { |
| return out_of_bounds(m); |
| } |
| |
| // This will only be called in a safe situation. It's an internal function. |
| // Does not bound check! |
| // Do not call it unless you know what you are doing. |
| void insert_data_into_current_block(metadata_t *m, median_data_t data) { |
| blockdata_t * b = get_current_block(m); |
| b->data[b->occupancy++] = data; |
| } |
| |
| // Check if there is space in some block and if there is, then |
| // make that block the current block. |
| unsigned int occupy_available_empty_block(metadata_t * m) { |
| size_t idx = 0; |
| for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. |
| ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. |
| ptr += blocksize(m)) { // Advance by a block |
| blockdata_t * b = (blockdata_t *)ptr; |
| if(b->occupancy < m->nMembers) { |
| // Empty block found! |
| m->currentBufferIdx = idx; |
| return 1; |
| } else { |
| idx++; |
| } |
| } |
| // Failed to find an empty block. |
| return 0; |
| } |
| |
| // A simple compare operator for the qsort routine. |
| int compare_data(const void *a, const void *b){ |
| median_data_t * i = (median_data_t *)a; |
| median_data_t * j = (median_data_t *)b; |
| return (*i < *j)?-1:+1; |
| } |
| |
| unsigned int merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j){ |
| // Prepare an array for the data. |
| unsigned char * d = (unsigned char *)spare_space(m); |
| size_t datasz = m->nMembers * sizeof(median_data_t); |
| bzero(d, 2*datasz); |
| |
| // This is the size of the data. |
| // Copy the data to where it needs to be. |
| memcpy(d, i->data, datasz); |
| memcpy(d + datasz, j->data, datasz); |
| |
| // Sort it. In principle we do not need to |
| // sort things, instead we should only sort |
| // when level is 0 and merge otherwise. |
| // For now, since most of the time, we will |
| // be consuming level 0 buffers, I have left this |
| // inefficiency in. |
| // But we should fix it. |
| qsort(d,2*m->nMembers,sizeof(median_data_t),&compare_data); |
| median_data_t * sorted_data = (median_data_t *)d; |
| |
| // Now interleave the sorted buffer. |
| // We should track parity and take the |
| // odd and even values alternately for each |
| // merge. |
| for(size_t idx = 0; idx < 2*m->nMembers; idx+=2){ |
| i->data[idx/2] = sorted_data[idx]; |
| } |
| |
| // Update the metadata. i is full, j is empty. |
| // half the items are gone. |
| i->level++; |
| j->level = 0; |
| j->occupancy = 0; |
| bzero(j->data, datasz); |
| return 1; |
| } |
| |
| // Find the smallest level such that there are two blocks at the same level. |
| unsigned int identify_qualified_level(metadata_t *m, size_t *lvl) { |
| // Zero out the count buffer. |
| size_t * levelcount = (size_t *)spare_space(m); |
| bzero(levelcount, m->nLevels * sizeof(size_t)); |
| // Iterate over the blocks, and count up the blocks at each level. |
| for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. |
| ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. |
| ptr += blocksize(m)) { // Advance by a block |
| blockdata_t * b = (blockdata_t*) ptr; |
| levelcount[b->level]++; |
| } |
| // Look for a level where the count is at least 2. |
| for(size_t idx = 0; idx < m->nLevels; idx++) { |
| if(levelcount[idx] > 1) { |
| // Found one, return it. |
| *lvl = idx; |
| return 1; |
| } |
| } |
| // There is no such thing. |
| return 0; |
| } |
| |
| unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j) { |
| size_t qualified_level; |
| // Initial values to return. |
| *i = 0; |
| *j = 0; |
| // Find the qualified level. |
| if (!identify_qualified_level(m,&qualified_level)) { |
| return 0; |
| } |
| // State machine to find the two buffers. |
| for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. |
| ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. |
| ptr += blocksize(m)) { // Advance by a block |
| blockdata_t * b = (blockdata_t*) ptr; |
| if (!(*i) && (b->level == qualified_level)) { |
| // Found the first one. |
| *i = b; |
| continue; |
| } |
| if (!(*j) && (b->level == qualified_level)) { |
| // Found both. |
| *j = b; |
| break; |
| } |
| } |
| return 1; |
| } |
| |
| |
| unsigned int create_empty_block(metadata_t * m) { |
| blockdata_t *i,*j; |
| if(!identify_compatible_block_pair(m, &i, &j)){ |
| // Nothing to do unless there is a compatible block pair |
| // to merge. |
| return 0; |
| } |
| return merge_blocks(m,i,j); |
| } |
| |
| // Shift the current block. |
| unsigned int shift_current_block(metadata_t * m) { |
| if(!occupy_available_empty_block(m)) { |
| if(create_empty_block(m)) { |
| // Now there should be an empty block! |
| // So occupy it. |
| occupy_available_empty_block(m); |
| } else { |
| // Stuck we are, block not. |
| return 0; |
| } |
| } |
| return 1; |
| } |
| |
| |
| // ---------------------------------------------------------------------------- |
| // Implementation of the interface functions (specified in median.h). |
| |
| // Suggest the size of the buffer needed for a fixed epsilon and maxN. |
| median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size){ |
| // First check for sane parameters. |
| |
| // Check that maxN is not zero. |
| if (maxN <= 0) return MEDIAN_ERROR_INVALID_PARAMETERS; |
| |
| // If n is too large, (i.e. epsilon is too small), you are looking for too fine an approximation. |
| // This is not the right algorithm. Maybe consider random sampling instead. |
| if (calculate_nMembers(epsilon) > 100000) return MEDIAN_ERROR_INVALID_PARAMETERS; |
| |
| // Calculate the required buffer size for the right nLevels and nMembers |
| // value. |
| metadata_t m; |
| m.nLevels = calculate_nLevels(maxN); |
| m.nMembers = calculate_nMembers(epsilon); |
| |
| // The size is the size of the buffer and the spare needed for merging |
| // and outputing. If we have many ongoing buffers, we can do with a single |
| // spare space buffer. We can figure out that optimization later. |
| // Ideally, this will be done by keeping a pointer to the global spare space |
| // within the metadata_t block. I prefer keeping this data structure |
| // relocatable and serializable. So we may need to think about what the |
| // alternatives are. |
| *suggested_size = buffersize(&m)+sparespace(&m); |
| |
| // Things worked. Return MEDIAN_OK |
| return MEDIAN_OK; |
| } |
| |
| // Initialize an allocated buffer. |
| median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer){ |
| |
| // Initialize. |
| size_t expectedSize = 0; |
| median_error_t error = MEDIAN_OK; |
| |
| // First run median_suggest_buffer_size |
| if ((error = median_suggest_buffer_size(epsilon,maxN,&expectedSize)) != MEDIAN_OK) return error; |
| // Ensure that the buffer is large enough. |
| if (expectedSize > buffer_size) return MEDIAN_ERROR_BUFFER_TOO_SMALL; |
| // Cast the buffer as metadata_t so that we can initialize the metadata block. |
| metadata_t * m = (metadata_t *)buffer; |
| // Initialize the metadata block. |
| m->gid = 1; |
| m->nMembers = calculate_nMembers(epsilon); |
| m->nLevels = calculate_nLevels(maxN); |
| m->currentBufferIdx = 0; |
| m->spareSize = buffer_size - buffersize(m); |
| // Zero all the data following the metadata block. |
| // Strictly this should not be necessary, but I'm a coward! |
| bzero(m->data, buffersize(m) - sizeof(metadata_t)); |
| |
| // cast the data part as an array of blockdata_t fronted blocks. |
| // and initialize each of the blocks. |
| for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. |
| ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. |
| ptr += blocksize(m)) { // Advance by a block |
| blockdata_t * t = (blockdata_t *) ptr; |
| t->id = (m->gid)++; |
| t->level = 0; |
| t->occupancy = 0; |
| } |
| |
| // Populate the return value. |
| *initialized_buffer = (median_buffer_t)buffer; |
| return error; |
| } |
| |
| |
| // Insert data into the buffer. |
| // We could have made this more concise and pretty if we used short-circuit |
| // execution of the boolean condition.. |
| // |
| // if (current_block_has_space(m) || shift_current_block(m)) { |
| // insert_data_into_current_block(m,data); |
| // return MEDIAN_OK; |
| // } else { |
| // return MEDIAN_ERROR_MAX_N_EXCEEDED; |
| // } |
| // |
| // But this code is harder to read, because of the side |
| // effect issue. |
| median_error_t median_insert_data(median_buffer_t buffer, |
| median_data_t data){ |
| metadata_t * m = (metadata_t *) buffer; |
| // If there is space in the current block.. |
| if(current_block_has_space(m)) { |
| // Insert data into the current block. This |
| // call is to an unchecked method, which will |
| // never fail. |
| insert_data_into_current_block(m,data); |
| } else { |
| // Current block is full. So try and shift it. |
| if(shift_current_block(m)) { |
| // The current block has shifted successfully. |
| // So we can now call the unchecked method again. |
| insert_data_into_current_block(m,data); |
| } else { |
| // Failed to shift block. Stuck we are. |
| return MEDIAN_ERROR_MAX_N_EXCEEDED; |
| } |
| } |
| // Everything worked. So return MEDIAN_OK. |
| return MEDIAN_OK; |
| } |
| |
| median_error_t median_output_median(const median_buffer_t buffer, |
| median_data_t *approximate_median){ |
| return MEDIAN_OK; |
| } |
| |
| // Dump the state to stderr. |
| median_error_t median_dump_stderr(const median_buffer_t buffer){ |
| // First print out all the metadata. |
| metadata_t * m = (metadata_t *) buffer; |
| blockdata_t * current = get_current_block(m); |
| fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n SpareSize:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id,(unsigned)m->spareSize); |
| |
| // Now dump the blocks. |
| for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. |
| ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. |
| ptr += blocksize(m)) { // Advance by a block |
| blockdata_t * b = (blockdata_t *)ptr; |
| fprintf(stderr, "Block:%d\n Level:%d\n Occupancy:%d\n", b->id,(unsigned)b->level,(unsigned)b->occupancy); |
| // For every block, dump the first 8 keys in the block. |
| stderr_hexdmp("Data:",(unsigned char *)(b->data),8*sizeof(median_data_t)); |
| } |
| |
| // Done, return MEDIAN_OK. |
| return MEDIAN_OK; |
| } |