Merge code done. We need to do the output phase.
diff --git a/Makefile b/Makefile index 27fbc86..bf373a8 100644 --- a/Makefile +++ b/Makefile
@@ -5,7 +5,7 @@ MAINO = $(patsubst %.c,%.o,$(MAIN)) DEPS = $(patsubst %.c,%.d,$(SRCS) $(MAIN)) EXECS = $(patsubst %.c,%,$(MAIN)) -FLAGS = -std=c99 -Wall +FLAGS = -std=c99 -Wall -g all : $(EXECS) clean : rm -f $(OBJS) $(DEPS) $(MAINO) $(EXECS)
diff --git a/median.c b/median.c index 6c413ef..1dad56d 100644 --- a/median.c +++ b/median.c
@@ -1,7 +1,9 @@ #include "median.h" #include "hex.h" -#include <strings.h> -#include <stdio.h> +#include <strings.h> // size_t +#include <stdio.h> // fprintf +#include <stdlib.h> // qsort +#include <string.h> // memcpy // The structure of the data buffer is this: // ---------------------------------------------------------------------------- @@ -15,6 +17,7 @@ size_t nLevels; // Number of buffers available. size_t nMembers; // The number of members in each block. size_t currentBufferIdx; // The index of the currently active buffer. + size_t spareSize; // How much spare space is there in the buffer. unsigned char data[0]; // The start of the data. } metadata_t; @@ -51,6 +54,12 @@ return sizeof(metadata_t) + m->nLevels * blocksize(m); } +// This is largely so that the "output" function +// can be run without ruining the buffer. +size_t sparespace(metadata_t * m) { + return 2 * buffersize(m); +} + // Returns the first byte address which is out of bounds. unsigned char * out_of_bounds(metadata_t * m) { return m->data + buffersize(m) - sizeof(blockdata_t); @@ -66,6 +75,11 @@ return (get_current_block(m)->occupancy < m->nMembers); } +// First byte of the spare space. +void * spare_space(metadata_t * m) { + return out_of_bounds(m); +} + // This will only be called in a safe situation. It's an internal function. // Does not bound check! // Do not call it unless you know what you are doing. @@ -90,14 +104,118 @@ idx++; } } + // Failed to find an empty block. return 0; } -unsigned int create_empty_block(metadata_t * m) { - // TODO +// A simple compare operator for the qsort routine. +int compare_data(const void *a, const void *b){ + median_data_t * i = (median_data_t *)a; + median_data_t * j = (median_data_t *)b; + return (*i < *j)?-1:+1; +} + +unsigned int merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j){ + // Prepare an array for the data. + unsigned char * d = (unsigned char *)spare_space(m); + size_t datasz = m->nMembers * sizeof(median_data_t); + bzero(d, 2*datasz); + + // This is the size of the data. + // Copy the data to where it needs to be. + memcpy(d, i->data, datasz); + memcpy(d + datasz, j->data, datasz); + + // Sort it. In principle we do not need to + // sort things, instead we should only sort + // when level is 0 and merge otherwise. + // For now, since most of the time, we will + // be consuming level 0 buffers, I have left this + // inefficiency in. + // But we should fix it. + qsort(d,2*m->nMembers,sizeof(median_data_t),&compare_data); + median_data_t * sorted_data = (median_data_t *)d; + + // Now interleave the sorted buffer. + // We should track parity and take the + // odd and even values alternately for each + // merge. + for(size_t idx = 0; idx < 2*m->nMembers; idx+=2){ + i->data[idx/2] = sorted_data[idx]; + } + + // Update the metadata. i is full, j is empty. + // half the items are gone. + i->level++; + j->level = 0; + j->occupancy = 0; + bzero(j->data, datasz); + return 1; +} + +// Find the smallest level such that there are two blocks at the same level. +unsigned int identify_qualified_level(metadata_t *m, size_t *lvl) { + // Zero out the count buffer. + size_t * levelcount = (size_t *)spare_space(m); + bzero(levelcount, m->nLevels * sizeof(size_t)); + // Iterate over the blocks, and count up the blocks at each level. + for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. + ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. + ptr += blocksize(m)) { // Advance by a block + blockdata_t * b = (blockdata_t*) ptr; + levelcount[b->level]++; + } + // Look for a level where the count is at least 2. + for(size_t idx = 0; idx < m->nLevels; idx++) { + if(levelcount[idx] > 1) { + // Found one, return it. + *lvl = idx; + return 1; + } + } + // There is no such thing. return 0; } +unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j) { + size_t qualified_level; + // Initial values to return. + *i = 0; + *j = 0; + // Find the qualified level. + if (!identify_qualified_level(m,&qualified_level)) { + return 0; + } + // State machine to find the two buffers. + for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks. + ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer. + ptr += blocksize(m)) { // Advance by a block + blockdata_t * b = (blockdata_t*) ptr; + if (!(*i) && (b->level == qualified_level)) { + // Found the first one. + *i = b; + continue; + } + if (!(*j) && (b->level == qualified_level)) { + // Found both. + *j = b; + break; + } + } + return 1; +} + + +unsigned int create_empty_block(metadata_t * m) { + blockdata_t *i,*j; + if(!identify_compatible_block_pair(m, &i, &j)){ + // Nothing to do unless there is a compatible block pair + // to merge. + return 0; + } + return merge_blocks(m,i,j); +} + // Shift the current block. unsigned int shift_current_block(metadata_t * m) { if(!occupy_available_empty_block(m)) { @@ -116,6 +234,8 @@ // ---------------------------------------------------------------------------- // Implementation of the interface functions (specified in median.h). + +// Suggest the size of the buffer needed for a fixed epsilon and maxN. median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size){ // First check for sane parameters. @@ -131,11 +251,21 @@ metadata_t m; m.nLevels = calculate_nLevels(maxN); m.nMembers = calculate_nMembers(epsilon); - *suggested_size = buffersize(&m); + + // The size is the size of the buffer and the spare needed for merging + // and outputing. If we have many ongoing buffers, we can do with a single + // spare space buffer. We can figure out that optimization later. + // Ideally, this will be done by keeping a pointer to the global spare space + // within the metadata_t block. I prefer keeping this data structure + // relocatable and serializable. So we may need to think about what the + // alternatives are. + *suggested_size = buffersize(&m)+sparespace(&m); // Things worked. Return MEDIAN_OK return MEDIAN_OK; } + +// Initialize an allocated buffer. median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer){ // Initialize. @@ -153,6 +283,7 @@ m->nMembers = calculate_nMembers(epsilon); m->nLevels = calculate_nLevels(maxN); m->currentBufferIdx = 0; + m->spareSize = buffer_size - buffersize(m); // Zero all the data following the metadata block. // Strictly this should not be necessary, but I'm a coward! bzero(m->data, buffersize(m) - sizeof(metadata_t)); @@ -221,7 +352,7 @@ // First print out all the metadata. metadata_t * m = (metadata_t *) buffer; blockdata_t * current = get_current_block(m); - fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id); + fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n SpareSize:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id,(unsigned)m->spareSize); // Now dump the blocks. for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
diff --git a/median.main.c b/median.main.c index 1aef5c7..49dac5d 100644 --- a/median.main.c +++ b/median.main.c
@@ -3,6 +3,7 @@ #include "median.h" #include "hex.h" +#define TEST_SIZE 1024*1024*128 // 128M // A sinple main routine meant as an example, // and also to unit test most of the intersting // functions of the median module. @@ -10,16 +11,16 @@ // Call suggest buffer size to determine the size of the buffer // we need to build the data structure. size_t n = 0; - assert(median_suggest_buffer_size(.001, 1000, &n) == MEDIAN_OK); + assert(median_suggest_buffer_size(.001, TEST_SIZE , &n) == MEDIAN_OK); // Allocate the buffer of the required size. And then // initialize it (median_init_buffer). void * b = malloc(n); median_buffer_t buf; - assert(median_init_buffer(b, n, .001, 1000, &buf) == MEDIAN_OK); + assert(median_init_buffer(b, n, .001, TEST_SIZE, &buf) == MEDIAN_OK); // Insert some data. - for(median_data_t i = 1; i < 3010; i++) { + for(median_data_t i = 1; i < TEST_SIZE; i++) { // 1G assert(median_insert_data(buf,i) == MEDIAN_OK); }