| #include "median.h" |
| #include <pthread.h> // pthread_mutex_* |
| #include <stdio.h> // fprintf |
| #include <stdlib.h> // qsort |
| #include <string.h> // memcpy |
| #include <strings.h> // size_t |
| #include "hex.h" |
| |
| // The structure of the data buffer is this: |
| // ---------------------------------------------------------------------------- |
| // | Metadata | BlockData | Data * nMembers | BlockData | Data * nMembers ..... |
| // ---------------------------------------------------------------------------- |
| // Where the number of blocks is nLevels and stored in Metadata, and nMembers |
| // is stored in Metadata as well. |
| // The buffer is cast into this struct. |
| |
| // ---------------------------------------------------------------------------- |
| // The block header. |
| // ---------------------------------------------------------------------------- |
| struct blockdata { |
| unsigned int id; // Block identity. |
| size_t level; // The level of this buffer. |
| size_t occupancy; // Number of occupants for this buffer. |
| unsigned char data[0]; // Start of the data block, cast as unsigned char. |
| median_data_t |
| median_data[0]; // The data for this buffer cast as median_data_t. |
| }; |
| |
| // ---------------------------------------------------------------------------- |
| // The overall header |
| // ---------------------------------------------------------------------------- |
| struct metadata { |
| pthread_mutex_t mutex_ptr[0]; // Pointer to the mutex. |
| pthread_mutex_t mutex; // The mutex for this median counter. |
| unsigned int gid; // The next available global id |
| size_t nLevels; // Number of buffers available. |
| size_t nMembers; // The number of members in each block. |
| size_t currentBufferIdx; // The index of the currently active buffer. |
| unsigned char data[0]; // The start of the data. |
| struct blockdata block_data[0]; // Makes the code cleaner and avoids |
| // unnecessary casting. |
| }; |
| |
| // ---------------------------------------------------------------------------- |
| // These structs are used internally for algorithmic work. |
| // |
| // |
| |
| // ---------------------------------------------------------------------------- |
| // The record used for sorting the output |
| // ---------------------------------------------------------------------------- |
| struct sortrec { |
| median_data_t d; // The data point. |
| size_t sz; // The leveled size of the point. |
| }; |
| |
| // ---------------------------------------------------------------------------- |
| // The spare buffer header. |
| // ---------------------------------------------------------------------------- |
| struct spare_buffer { |
| size_t spareSize; // Size in bytes of the spare block. |
| unsigned char data[0]; // Start of the data block. |
| struct sortrec sortrec_data[0]; // The same point cast as a sortrec |
| median_data_t median_data[0]; // The same point cast as a median datum |
| size_t size_data[0]; // The same point case as size_t |
| }; |
| |
| // ---------------------------------------------------------------------------- |
| // This is the struct used to hold a JSON output string. |
| // ---------------------------------------------------------------------------- |
| // A buffer to hold the output string. |
| // ---------------------------------------------------------------------------- |
| struct output_buffer { |
| size_t outputBufferSize; // The buffer size in char |
| char mesg[0]; // The message. |
| }; |
| |
| // ---------------------------------------------------------------------------- |
| // Helper functions. |
| // |
| // This turns epsilon into a suggested size for each block. |
| size_t calculate_nMembers(double epsilon) { |
| return (size_t)((1.0 / epsilon) + 1); |
| } |
| |
| // This turns maxN into the number of levels needed. It's ceiling(log2(maxN)). |
| size_t calculate_nLevels(size_t maxN) { |
| size_t nLevels = 1; |
| while ((maxN = (maxN >> 1))) nLevels++; |
| return nLevels; |
| } |
| |
| // This returns the blocksize in bytes. |
| // The blocksize is the header plus m->nMembers many |
| // data points, each of type median_data_t |
| size_t blocksize(struct metadata *m) { |
| return sizeof(struct blockdata) + m->nMembers * sizeof(median_data_t); |
| } |
| |
| // This return the start of the next block given |
| // a pointer to a block. The easiest way is to jump |
| // m->nMembers median_data_t items away. |
| struct blockdata *nextblock(struct blockdata *blk, struct metadata *m) { |
| return (struct blockdata *)(blk->median_data + m->nMembers); |
| } |
| |
| // This returns the size of the entire buffer in bytes. |
| // The size of the metadata block, plus m->nLevels many |
| // blocks each of size blocksize(m) |
| size_t buffersize(struct metadata *m) { |
| return sizeof(struct metadata) + m->nLevels * blocksize(m); |
| } |
| |
| // Returns the first byte address which is too high to start a block regardless |
| // of size.. One blockdata worth less than the actual end of the buffer plus |
| // one. The computation is done in byte units and then cast as a struct |
| // blockdata pointer. |
| struct blockdata *out_of_bounds(struct metadata *m) { |
| return (struct blockdata *)(m->data + buffersize(m) - |
| sizeof(struct metadata) - blocksize(m)); |
| } |
| |
| // Get the current block. |
| // The computation is done in bytes, and then cast as a struct blockdata |
| // pointer. |
| struct blockdata *get_current_block(struct metadata *m) { |
| size_t offset = m->currentBufferIdx * blocksize(m); |
| return (struct blockdata *)(m->data + offset); |
| } |
| |
| // This function inserts data into the current block if possible. |
| // Otherwise, it returns false. |
| unsigned int insert_into_current_block_if_possible(struct metadata *m, |
| median_data_t data) { |
| struct blockdata *b = get_current_block(m); |
| if (b->occupancy < m->nMembers) { |
| b->median_data[b->occupancy++] = data; |
| return 1; |
| } |
| return 0; |
| } |
| |
| // Check if there is space in some block and if there is, then |
| // make that block the current block. |
| unsigned int occupy_available_empty_block(struct metadata *m) { |
| // Iterate over the blocks. |
| for (struct blockdata *b = m->block_data; b < out_of_bounds(m); |
| b = nextblock(b, m)) { |
| if (b->occupancy < m->nMembers) { |
| // Empty block found! |
| m->currentBufferIdx = b->id; |
| return 1; |
| } |
| } |
| // Failed to find an empty block. |
| return 0; |
| } |
| |
| // A simple compare operator for the qsort routine. |
| int compare_data(const void *a, const void *b) { |
| median_data_t *i = (median_data_t *)a; |
| median_data_t *j = (median_data_t *)b; |
| return (*i < *j) ? -1 : +1; |
| } |
| |
| unsigned int merge_blocks(struct metadata *m, struct blockdata *i, |
| struct blockdata *j, median_scratch_buffer_t spare) { |
| // Prepare an array for the data. |
| unsigned char *d = spare->data; |
| size_t datasz = m->nMembers * sizeof(median_data_t); |
| if (2 * datasz > spare->spareSize) { |
| return 0; |
| } |
| bzero(d, 2 * datasz); |
| |
| // This is the size of the data. |
| // Copy the data to where it needs to be. |
| memcpy(d, i->data, datasz); |
| memcpy(d + datasz, j->data, datasz); |
| |
| // Sort it. In principle we do not need to |
| // sort things, instead we should only sort |
| // when level is 0 and merge otherwise. |
| // For now, since most of the time, we will |
| // be consuming level 0 buffers, I have left this |
| // inefficiency in. |
| // But we should fix it. |
| qsort(spare->median_data, 2 * m->nMembers, sizeof(median_data_t), |
| &compare_data); |
| |
| // Now interleave the sorted buffer. |
| // We should track parity and take the |
| // odd and even values alternately for each |
| // merge. So this loop should look like |
| // for (size_t idx = parity; idx < 2*......) |
| // Here parity should be equally likely to be |
| // zero or one. |
| // However, this does make the routine somewhat non |
| // deterministic, but offers some extra accuracy. |
| for (size_t idx = 0; idx < 2 * m->nMembers; idx += 2) { |
| i->median_data[idx / 2] = spare->median_data[idx]; |
| } |
| |
| // Update the metadata. i is full, j is empty. |
| // half the items are gone. |
| i->level++; |
| |
| // Reset j to clean state. |
| j->level = 0; |
| j->occupancy = 0; |
| bzero(j->data, datasz); |
| |
| // Done, so go back. |
| return 1; |
| } |
| |
| // Find the smallest level such that there are two blocks at the same level. |
| unsigned int identify_qualified_level(struct metadata *m, size_t *lvl, |
| median_scratch_buffer_t spare) { |
| // Zero out the count buffer after validating the |
| // spare space buffer. |
| size_t datasz = m->nLevels * sizeof(size_t); |
| if (datasz > spare->spareSize) { |
| return 0; |
| } |
| |
| // Prepare the size array. |
| size_t *levelcount = spare->size_data; |
| bzero(levelcount, datasz); |
| |
| // Count up the level population. |
| for (struct blockdata *b = m->block_data; b < out_of_bounds(m); |
| b = nextblock(b, m)) { |
| levelcount[b->level]++; |
| } |
| |
| // Look for a level where the count is at least 2. |
| for (size_t idx = 0; idx < m->nLevels; idx++) { |
| if (levelcount[idx] > 1) { |
| // Found one, return it. |
| *lvl = idx; |
| return 1; |
| } |
| } |
| |
| // There is no such thing. |
| return 0; |
| } |
| |
| // This identifies the pair of blocks to merge. |
| unsigned int identify_compatible_block_pair(struct metadata *m, |
| struct blockdata **i, |
| struct blockdata **j, |
| median_scratch_buffer_t spare) { |
| // The qualified level will reside here. |
| // Find the qualified level. |
| size_t qualified_level = 0; |
| if (!identify_qualified_level(m, &qualified_level, spare)) { |
| return 0; |
| } |
| |
| // Initial values to return. |
| *i = 0; |
| *j = 0; |
| |
| // State machine to find the two buffers. |
| // Iterate over the blocks. |
| for (struct blockdata *b = m->block_data; b < out_of_bounds(m); |
| b = nextblock(b, m)) { |
| if (!(*i) && (b->level == qualified_level)) { |
| *i = b; |
| continue; // Found the first one. |
| } |
| if (!(*j) && (b->level == qualified_level)) { |
| *j = b; |
| return 1; // Found the second one. |
| } |
| } |
| // Loop should not exit before the two are found. |
| return 0; |
| } |
| |
| // THis routine creates an empty block by merging two available blocks into a |
| // single block. Consequently this frees up one of them. |
| // If two compatible blocks cannot be found, then this routine returns 0. |
| // It also returns 0 if for some reason the two blocks cannot be merged. |
| unsigned int create_empty_block(struct metadata *m, |
| median_scratch_buffer_t spare) { |
| struct blockdata *i, *j; |
| if (!identify_compatible_block_pair(m, &i, &j, spare)) { |
| return 0; |
| } |
| return merge_blocks(m, i, j, spare); |
| } |
| |
| // Shift the current block. If there is an empty block, great! |
| // If not, then empty a block by merging two blocks and then occupy the |
| // freed up block. |
| unsigned int shift_current_block(struct metadata *m, |
| median_scratch_buffer_t spare) { |
| if (!occupy_available_empty_block(m)) { |
| if (create_empty_block(m, spare)) { // Now there should be an empty block! |
| if (!occupy_available_empty_block(m)) { |
| return 0; // Stuck we are. Empty block is not, even though create it we |
| // did. |
| } |
| } else { |
| return 0; // Stuck we are. Block not. |
| } |
| } |
| return 1; // Occupy a block we did. |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Implementation of the interface functions (specified in median.h). |
| // ---------------------------------------------------------------------------- |
| |
| // ---------------------------------------------------------------------------- |
| // Suggest the size of the buffer needed for a fixed epsilon and maxN. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_suggest_buffer_size(double epsilon, |
| size_t maxN, |
| size_t nWindows, |
| size_t *suggested_size) { |
| // First check for sane parameters. |
| |
| // Check that maxN is not zero. |
| if (maxN <= 0) return MEDIAN_ERROR_INVALID_PARAMETERS; |
| |
| // If n is too large, (i.e. epsilon is too small), you are looking for too |
| // fine an approximation. This is not the right algorithm. Maybe consider |
| // random sampling instead. |
| if (calculate_nMembers(epsilon) > 100000) |
| return MEDIAN_ERROR_INVALID_PARAMETERS; |
| |
| // Calculate the required buffer size for the right nLevels and nMembers |
| // value. |
| struct metadata m; |
| m.nLevels = calculate_nLevels(maxN); |
| m.nMembers = calculate_nMembers(epsilon); |
| |
| // The size is the size of the buffer and the spare needed for merging |
| // and outputing. If we have many ongoing buffers, we can do with a single |
| // spare space buffer. We can figure out that optimization later. |
| // Ideally, this will be done by keeping a pointer to the global spare space |
| // within the struct metadata block. I prefer keeping this data structure |
| // relocatable and serializable. So we may need to think about what the |
| // alternatives are. |
| *suggested_size = buffersize(&m); |
| |
| // Things worked. Return MEDIAN_OK |
| return MEDIAN_OK; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Suggest a buffer size for the scratch buffer. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, |
| size_t *suggested_size) { |
| *suggested_size = m->nLevels * m->nMembers * sizeof(struct sortrec) + |
| sizeof(struct spare_buffer); |
| return MEDIAN_OK; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Suggest a size of the output buffer. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_suggest_json_string_size(median_buffer_t m, |
| size_t *suggested_size) { |
| // ToDo improve this estimate. |
| *suggested_size = 256 * 1024; // Constant size of 256K for now. |
| return MEDIAN_OK; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Initialize an allocated buffer. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_init_buffer(void *buffer, size_t buffer_size, |
| double epsilon, size_t maxN, size_t nWindows, |
| median_buffer_t *initialized_buffer) { |
| // Initialize. |
| size_t expectedSize = 0; |
| median_error_t error = MEDIAN_OK; |
| |
| // First run median_suggest_buffer_size |
| if ((error = median_suggest_buffer_size(epsilon, maxN, nWindows, &expectedSize)) != |
| MEDIAN_OK) |
| return error; |
| // Ensure that the buffer is large enough. |
| if (expectedSize > buffer_size) return MEDIAN_ERROR_BUFFER_TOO_SMALL; |
| // Cast the buffer as struct metadata so that we can initialize the metadata |
| // block. |
| struct metadata *m = (struct metadata *)buffer; |
| // Initialize the mutex. |
| pthread_mutex_init(m->mutex_ptr, NULL); |
| // Initialize the metadata block. |
| m->gid = 0; |
| m->nMembers = calculate_nMembers(epsilon); |
| m->nLevels = calculate_nLevels(maxN); |
| m->currentBufferIdx = 0; |
| // Zero all the data following the metadata block. |
| // Strictly this should not be necessary, but I'm a coward! |
| bzero(m->data, buffersize(m) - sizeof(struct metadata)); |
| |
| // cast the data part as an array of struct blockdata fronted blocks. |
| // and initialize each of the blocks. |
| // Iterate over the blocks. |
| for (struct blockdata *b = m->block_data; b < out_of_bounds(m); |
| b = nextblock(b, m)) { |
| b->id = (m->gid)++; |
| b->level = 0; |
| b->occupancy = 0; |
| } |
| |
| // Populate the return value. |
| *initialized_buffer = (median_buffer_t)buffer; |
| return error; |
| } |
| |
| // Deinit the buffer, frees up the mutex. |
| median_error_t median_deinit_buffer(median_buffer_t m) { |
| pthread_mutex_destroy(m->mutex_ptr); |
| return MEDIAN_OK; |
| } |
| |
| // Nothing to do, yet! |
| median_error_t median_deinit_scratch_buffer(median_scratch_buffer_t s) { |
| return MEDIAN_OK; |
| } |
| |
| // Nothing to do, yet! |
| median_error_t median_deinit_json_string(median_json_string_t o) { |
| return MEDIAN_OK; |
| } |
| // ---------------------------------------------------------------------------- |
| // Initialize the scratch buffer. |
| // ---------------------------------------------------------------------------- |
| median_error_t median_init_scratch_buffer( |
| void *buffer, size_t buffer_size, median_buffer_t m, |
| median_scratch_buffer_t *initialized_buffer) { |
| median_scratch_buffer_t s = buffer; |
| s->spareSize = buffer_size - sizeof(struct spare_buffer); |
| *initialized_buffer = s; |
| return MEDIAN_OK; |
| } |
| |
| median_error_t median_init_json_string(void *buffer, size_t buffer_size, |
| median_buffer_t m, |
| median_json_string_t *output_buffer) { |
| median_json_string_t o = buffer; |
| o->outputBufferSize = buffer_size - sizeof(struct output_buffer); |
| *output_buffer = o; |
| return MEDIAN_OK; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Insert data into the buffer. |
| // ---------------------------------------------------------------------------- |
| |
| // Unprotected function. |
| median_error_t _median_insert_data(median_buffer_t m, median_data_t data, |
| median_scratch_buffer_t scratch) { |
| // If there is space in the current block.. |
| if (!insert_into_current_block_if_possible(m, data)) { |
| // Shift |
| if (!shift_current_block(m, scratch)) { |
| return MEDIAN_ERROR_MAX_N_EXCEEDED; |
| } |
| // Try to insert again, this should work because |
| // we just shifted. |
| if (!insert_into_current_block_if_possible(m, data)) { |
| return MEDIAN_ERROR_BUG; |
| } |
| } |
| return MEDIAN_OK; |
| } |
| |
| // Locked function. |
| median_error_t median_insert_data(median_buffer_t m, median_data_t data, |
| median_scratch_buffer_t scratch) { |
| // Lock. |
| pthread_mutex_lock(m->mutex_ptr); |
| // Call the unprotected routine. |
| median_error_t ret = _median_insert_data(m, data, scratch); |
| // Unlock |
| pthread_mutex_unlock(m->mutex_ptr); |
| // return |
| return ret; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Compare two sort records. |
| // ---------------------------------------------------------------------------- |
| int compare_sortrec(const void *a, const void *b) { |
| return ((struct sortrec *)a)->d < ((struct sortrec *)b)->d ? -1 : 1; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // Output the estimated median. |
| // ---------------------------------------------------------------------------- |
| |
| // Unprotected implementation. |
| median_error_t _median_output_median(const median_buffer_t m, |
| median_data_t *approximate_median, |
| median_scratch_buffer_t scratch) { |
| // Initialize the data buffer to zero. |
| size_t datasz = m->nMembers * m->nLevels * sizeof(struct sortrec); |
| if (datasz > scratch->spareSize) { |
| return MEDIAN_ERROR_BUFFER_TOO_SMALL; |
| } |
| struct sortrec *s = scratch->sortrec_data; |
| bzero(s, datasz); |
| // Initialize the count and weight values. |
| size_t count = 0; |
| size_t weight = 0; |
| // First construct the sort buffer. |
| for (struct blockdata *b = m->block_data; b < out_of_bounds(m); |
| b = nextblock(b, m)) { |
| for (size_t i = 0; i < b->occupancy; i++) { |
| s->d = b->median_data[i]; |
| s->sz = (1 << b->level); |
| weight += s->sz; |
| s++; |
| count++; |
| } |
| } |
| qsort(scratch->sortrec_data, count, sizeof(struct sortrec), &compare_sortrec); |
| |
| // This is the weight at which the median will be found. |
| weight = weight / 2; |
| |
| // We now need to pick out the |
| // median. |
| for (s = scratch->sortrec_data; weight > s->sz; weight -= (s++)->sz) |
| ; |
| |
| *approximate_median = s->d; |
| return MEDIAN_OK; |
| } |
| |
| // Locked call. |
| median_error_t median_output_median(const median_buffer_t m, |
| median_data_t *approximate_median, |
| median_scratch_buffer_t scratch) { |
| pthread_mutex_lock(m->mutex_ptr); |
| median_error_t ret = _median_output_median(m, approximate_median, scratch); |
| pthread_mutex_unlock(m->mutex_ptr); |
| return ret; |
| } |
| |
| median_error_t _median_output_json(const median_buffer_t m, |
| median_json_string_t output) { |
| return MEDIAN_OK; |
| } |
| median_error_t median_output_json(const median_buffer_t m, |
| median_json_string_t output) { |
| pthread_mutex_lock(m->mutex_ptr); |
| median_error_t ret = _median_output_json(m, output); |
| pthread_mutex_unlock(m->mutex_ptr); |
| return ret; |
| } |
| // ---------------------------------------------------------------------------- |
| // Dump the state to stderr. |
| // ---------------------------------------------------------------------------- |
| |
| // Unprotected call. |
| median_error_t _median_dump_stderr(const median_buffer_t m) { |
| // First print out all the metadata. |
| struct blockdata *current = get_current_block(m); |
| fprintf( |
| stderr, |
| "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n", |
| m->gid, (unsigned)m->nMembers, (unsigned)m->nLevels, current->id); |
| |
| for (struct blockdata *b = m->block_data; b < out_of_bounds(m); |
| b = nextblock(b, m)) { |
| // Now dump the blocks. |
| fprintf(stderr, "Block:%d\n Level:%d\n Occupancy:%d\n", b->id, |
| (unsigned)b->level, (unsigned)b->occupancy); |
| // For every block, dump the first 8 keys in the block. |
| stderr_hexdmp("Data:", b->data, 8 * sizeof(median_data_t)); |
| } |
| |
| // Done, return MEDIAN_OK. |
| return MEDIAN_OK; |
| } |
| |
| // Protected call. |
| median_error_t median_dump_stderr(const median_buffer_t m) { |
| pthread_mutex_lock(m->mutex_ptr); |
| median_error_t ret = _median_dump_stderr(m); |
| pthread_mutex_unlock(m->mutex_ptr); |
| return ret; |
| } |
| |
| // ---------------------------------------------------------------------------- |