Some more code cleanup. Getting rid of the pervasive pointer casting. Made the code more strongly typed. Locked implementation.
diff --git a/median.c b/median.c index 9e2a572..1162fbd 100644 --- a/median.c +++ b/median.c
@@ -4,6 +4,7 @@ #include <stdio.h> // fprintf #include <stdlib.h> // qsort #include <string.h> // memcpy +#include <pthread.h> // pthread_mutex_* // The structure of the data buffer is this: // ---------------------------------------------------------------------------- @@ -12,35 +13,50 @@ // Where the number of blocks is nLevels and stored in Metadata, and nMembers // is stored in Metadata as well. // The buffer is cast into this struct. -typedef struct bufferdata { - unsigned int id; // Block identity. - size_t level; // The level of this buffer. - size_t occupancy; // Number of occupants for this buffer. - median_data_t data[0]; // The data for this buffer. + + +// ---------------------------------------------------------------------------- +// The block header. +// ---------------------------------------------------------------------------- +typedef struct blockdata { + unsigned int id; // Block identity. + size_t level; // The level of this buffer. + size_t occupancy; // Number of occupants for this buffer. + unsigned char data[0]; // Start of the data block, cast as unsigned char. + median_data_t median_data[0]; // The data for this buffer cast as median_data_t. } blockdata_t; +// ---------------------------------------------------------------------------- +// The overall header +// ---------------------------------------------------------------------------- typedef struct metadata { - unsigned int gid; // The next available global id - size_t nLevels; // Number of buffers available. - size_t nMembers; // The number of members in each block. - size_t currentBufferIdx; // The index of the currently active buffer. - unsigned char data[0]; // The start of the data. - blockdata_t blocks[0]; // Makes the code cleaner and avoids unnecessary casting. + pthread_mutex_t mutex_ptr[0]; // Pointer to the mutex. + pthread_mutex_t mutex; // The mutex for this median counter. + unsigned int gid; // The next available global id + size_t nLevels; // Number of buffers available. + size_t nMembers; // The number of members in each block. + size_t currentBufferIdx; // The index of the currently active buffer. + unsigned char data[0]; // The start of the data. + blockdata_t block_data[0]; // Makes the code cleaner and avoids unnecessary casting. } metadata_t; +// ---------------------------------------------------------------------------- +// The record used for sorting the output +// ---------------------------------------------------------------------------- typedef struct sortrec { - median_data_t d; - size_t sz; -} sort_rec_t; + median_data_t d; // The data point. + size_t sz; // The leveled size of the point. +} sortrec_t; +// ---------------------------------------------------------------------------- +// The spare buffer space header. +// ---------------------------------------------------------------------------- typedef struct spare_buffer { - size_t spareSize; // Size in bytes of the spare block. - size_t recordSize; // Size in SortRecords - size_t datumSize; // Size in number of Datum elements. - unsigned char data[0]; // Start of the data block. - sort_rec_t records[0]; // The same point cast as a sort_rec - median_data_t median_data[0]; // The same point cast as a median datum - size_t sizes[0]; // The same point case as size_t + size_t spareSize; // Size in bytes of the spare block. + unsigned char data[0]; // Start of the data block. + sortrec_t sortrec_data[0]; // The same point cast as a sortrec + median_data_t median_data[0]; // The same point cast as a median datum + size_t size_data[0]; // The same point case as size_t } spare_buffer_t; @@ -60,58 +76,57 @@ } // This returns the blocksize in bytes. +// The blocksize is the header plus m->nMembers many +// data points, each of type median_data_t size_t blocksize(metadata_t * m) { return sizeof(blockdata_t) + m->nMembers * sizeof(median_data_t); } // This return the start of the next block given -// a pointer to a block. +// a pointer to a block. The easiest way is to jump +// m->nMembers median_data_t items away. blockdata_t * nextblock(blockdata_t * blk, metadata_t * m) { - return (blockdata_t *) (blk->data + m->nMembers); + return (blockdata_t *) (blk->median_data + m->nMembers); } -// This returns the size of the entire buffer. +// This returns the size of the entire buffer in bytes. +// The size of the metadata block, plus m->nLevels many +// blocks each of size blocksize(m) size_t buffersize(metadata_t * m) { return sizeof(metadata_t) + m->nLevels * blocksize(m); } -// This is largely so that the "output" function -// can be run without ruining the buffer. -size_t sparespace(metadata_t * m) { - return sizeof(spare_buffer_t) + m->nLevels * m->nMembers * sizeof(sort_rec_t); -} - // Returns the first byte address which is too high to start a block regardless // of size.. One blockdata worth less than the actual end of the buffer plus -// one. +// one. The computation is done in byte units and then cast as a blockdata_t +// pointer. blockdata_t * out_of_bounds(metadata_t * m) { - return (blockdata_t *) (m->data + buffersize(m) - blocksize(m)); + return (blockdata_t *) (m->data + buffersize(m) - sizeof(metadata_t) - blocksize(m)); } // Get the current block. +// The computation is done in bytes, and then cast as a blockdata_t pointer. blockdata_t * get_current_block(metadata_t *m) { size_t offset = m->currentBufferIdx * blocksize(m); return (blockdata_t *)(m->data + offset); } -// Does the current block have space? -unsigned int current_block_has_space(metadata_t* m) { - return (get_current_block(m)->occupancy < m->nMembers); -} - -// This will only be called in a safe situation. It's an internal function. -// Does not bound check! -// Do not call it unless you know what you are doing. -void insert_data_into_current_block(metadata_t *m, median_data_t data) { +// This function inserts data into the current block if possible. +// Otherwise, it returns false. +unsigned int insert_into_current_block_if_possible(metadata_t *m, median_data_t data) { blockdata_t * b = get_current_block(m); - b->data[b->occupancy++] = data; + if (b->occupancy < m->nMembers) { + b->median_data[b->occupancy++] = data; + return 1; + } + return 0; } // Check if there is space in some block and if there is, then // make that block the current block. unsigned int occupy_available_empty_block(metadata_t * m) { // Iterate over the blocks. - for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { + for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { if(b->occupancy < m->nMembers) { // Empty block found! m->currentBufferIdx = b->id; @@ -150,15 +165,19 @@ // be consuming level 0 buffers, I have left this // inefficiency in. // But we should fix it. - qsort(d,2*m->nMembers,sizeof(median_data_t),&compare_data); - median_data_t * sorted_data = spare->median_data; + qsort(spare->median_data,2*m->nMembers,sizeof(median_data_t),&compare_data); // Now interleave the sorted buffer. // We should track parity and take the // odd and even values alternately for each - // merge. + // merge. So this loop should look like + // for (size_t idx = parity; idx < 2*......) + // Here parity should be equally likely to be + // zero or one. + // However, this does make the routine somewhat non + // deterministic, but offers some extra accuracy. for(size_t idx = 0; idx < 2*m->nMembers; idx+=2){ - i->data[idx/2] = sorted_data[idx]; + i->median_data[idx/2] = spare->median_data[idx]; } // Update the metadata. i is full, j is empty. @@ -169,6 +188,8 @@ j->level = 0; j->occupancy = 0; bzero(j->data, datasz); + + // Done, so go back. return 1; } @@ -182,11 +203,11 @@ } // Prepare the size array. - size_t * levelcount = spare->sizes; + size_t * levelcount = spare->size_data; bzero(levelcount, datasz); // Count up the level population. - for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { + for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { levelcount[b->level]++; } @@ -218,7 +239,7 @@ // State machine to find the two buffers. // Iterate over the blocks. - for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { + for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { if (!(*i) && (b->level == qualified_level)) { *i = b; continue; // Found the first one. } @@ -300,8 +321,7 @@ // Suggest a buffer size for the scratch buffer. // ---------------------------------------------------------------------------- median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size){ - metadata_t * md = (metadata_t *) m; - *suggested_size = sparespace(md); + *suggested_size = m->nLevels * m->nMembers * sizeof(sortrec_t) + sizeof(spare_buffer_t); return MEDIAN_OK; } @@ -309,7 +329,6 @@ // Initialize an allocated buffer. // ---------------------------------------------------------------------------- median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer){ - // Initialize. size_t expectedSize = 0; median_error_t error = MEDIAN_OK; @@ -320,6 +339,8 @@ if (expectedSize > buffer_size) return MEDIAN_ERROR_BUFFER_TOO_SMALL; // Cast the buffer as metadata_t so that we can initialize the metadata block. metadata_t * m = (metadata_t *)buffer; + // Initialize the mutex. + pthread_mutex_init(m->mutex_ptr, NULL); // Initialize the metadata block. m->gid = 0; m->nMembers = calculate_nMembers(epsilon); @@ -332,7 +353,7 @@ // cast the data part as an array of blockdata_t fronted blocks. // and initialize each of the blocks. // Iterate over the blocks. - for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { + for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { b->id = (m->gid)++; b->level = 0; b->occupancy = 0; @@ -343,6 +364,12 @@ return error; } +// Deinit the buffer, frees up the mutex. +median_error_t median_deinit_buffer(median_buffer_t m) { + pthread_mutex_destroy(m->mutex_ptr); + return MEDIAN_OK; +} + // ---------------------------------------------------------------------------- // Initialize the scratch buffer. // ---------------------------------------------------------------------------- @@ -355,109 +382,126 @@ // ---------------------------------------------------------------------------- // Insert data into the buffer. -// We could have made this more concise and pretty if we used short-circuit -// execution of the boolean condition.. -// -// if (current_block_has_space(m) || shift_current_block(m)) { -// insert_data_into_current_block(m,data); -// return MEDIAN_OK; -// } else { -// return MEDIAN_ERROR_MAX_N_EXCEEDED; -// } -// -// But this code is harder to read, because of the side -// effect issue. // ---------------------------------------------------------------------------- -median_error_t median_insert_data(median_buffer_t buffer, - median_data_t data, - median_scratch_buffer_t scratch){ - metadata_t * m = (metadata_t *) buffer; + +// Unprotected function. +median_error_t _median_insert_data(median_buffer_t m, + median_data_t data, + median_scratch_buffer_t scratch){ // If there is space in the current block.. - if(current_block_has_space(m)) { - // Insert data into the current block. This - // call is to an unchecked method, which will - // never fail. - insert_data_into_current_block(m,data); - } else { - // Current block is full. So try and shift it. - if(shift_current_block(m, scratch)) { - // The current block has shifted successfully. - // So we can now call the unchecked method again. - insert_data_into_current_block(m,data); - } else { - // Failed to shift block. Stuck we are. + if(!insert_into_current_block_if_possible(m,data)) { + // Shift + if(!shift_current_block(m,scratch)) { return MEDIAN_ERROR_MAX_N_EXCEEDED; } + // Try to insert again, this should work because + // we just shifted. + if(!insert_into_current_block_if_possible(m,data)) { + return MEDIAN_ERROR_BUG; + } } - // Everything worked. So return MEDIAN_OK. return MEDIAN_OK; } +// Locked function. +median_error_t median_insert_data(median_buffer_t m, + median_data_t data, + median_scratch_buffer_t scratch){ + // Lock. + pthread_mutex_lock(m->mutex_ptr); + // Call the unprotected routine. + median_error_t ret = _median_insert_data(m,data,scratch); + // Unlock + pthread_mutex_unlock(m->mutex_ptr); + // return + return ret; +} + // ---------------------------------------------------------------------------- // Compare two sort records. // ---------------------------------------------------------------------------- -int compare_sort_rec(const void * a, const void *b) { - return ((sort_rec_t *)a)->d < ((sort_rec_t*)b)->d?-1:1; +int compare_sortrec(const void * a, const void *b) { + return ((sortrec_t *)a)->d < ((sortrec_t*)b)->d?-1:1; } // ---------------------------------------------------------------------------- // Output the estimated median. // ---------------------------------------------------------------------------- -median_error_t median_output_median(const median_buffer_t buffer, + +// Unprotected implementation. +median_error_t _median_output_median(const median_buffer_t m, median_data_t *approximate_median, median_scratch_buffer_t scratch){ // Initialize the data buffer to zero. - metadata_t * m = (metadata_t *) buffer; - size_t datasz = m->nMembers * m->nLevels * sizeof(sort_rec_t); + size_t datasz = m->nMembers * m->nLevels * sizeof(sortrec_t); if(datasz > scratch->spareSize) { return MEDIAN_ERROR_BUFFER_TOO_SMALL; } - sort_rec_t * s = (sort_rec_t *) scratch->data; + sortrec_t * s = scratch->sortrec_data; bzero(s, datasz); // Initialize the count and weight values. size_t count = 0; size_t weight = 0; // First construct the sort buffer. - for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { + for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { for(size_t i = 0; i < b->occupancy; i++) { - s->d = b->data[i]; + s->d = b->median_data[i]; s->sz = (1 << b->level); weight += s->sz; s++; count++; } } - qsort(scratch->data,count,sizeof(sort_rec_t),&compare_sort_rec); + qsort(scratch->sortrec_data,count,sizeof(sortrec_t),&compare_sortrec); // This is the weight at which the median will be found. weight = weight/2; // We now need to pick out the // median. - for(s = (sort_rec_t *) scratch->data; + for(s = scratch->sortrec_data; weight > s->sz; - weight -= (s++)->sz) ; + weight -= (s++)->sz); *approximate_median = s->d; return MEDIAN_OK; } +// Locked call. +median_error_t median_output_median(const median_buffer_t m, + median_data_t *approximate_median, + median_scratch_buffer_t scratch){ + pthread_mutex_lock(m->mutex_ptr); + median_error_t ret = _median_output_median(m, approximate_median, scratch); + pthread_mutex_unlock(m->mutex_ptr); + return ret; +} // ---------------------------------------------------------------------------- // Dump the state to stderr. // ---------------------------------------------------------------------------- -median_error_t median_dump_stderr(const median_buffer_t buffer){ + +// Unprotected call. +median_error_t _median_dump_stderr(const median_buffer_t m){ // First print out all the metadata. - metadata_t * m = (metadata_t *) buffer; blockdata_t * current = get_current_block(m); fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id); - for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) { + for(blockdata_t * b = m->block_data; b < out_of_bounds(m); b = nextblock(b,m)) { // Now dump the blocks. fprintf(stderr, "Block:%d\n Level:%d\n Occupancy:%d\n", b->id,(unsigned)b->level,(unsigned)b->occupancy); // For every block, dump the first 8 keys in the block. - stderr_hexdmp("Data:",(unsigned char *)(b->data),8*sizeof(median_data_t)); + stderr_hexdmp("Data:",b->data,8*sizeof(median_data_t)); } // Done, return MEDIAN_OK. return MEDIAN_OK; } + +// Protected call. +median_error_t median_dump_stderr(const median_buffer_t m){ + pthread_mutex_lock(m->mutex_ptr); + median_error_t ret = _median_dump_stderr(m); + pthread_mutex_unlock(m->mutex_ptr); + return ret; +} + // ----------------------------------------------------------------------------
diff --git a/median.h b/median.h index 1a89ae6..5e02bf8 100644 --- a/median.h +++ b/median.h
@@ -47,6 +47,10 @@ // Overflow because MaxN has been exceeded. static const median_error_t MEDIAN_ERROR_MAX_N_EXCEEDED = -3; +// This is returned when some invariant is broken. +// It should not happen in normal operation. +static const median_error_t MEDIAN_ERROR_BUG = -1000; + // Indicates that the size of the largest possible dataset is not known. static const size_t MEDIAN_MAX_N_UNKNOWN = 0; @@ -84,6 +88,9 @@ // In all reasonable calls, the returned value will be // MEDIAN_ERROR_OK. median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer); +// This frees up any internal resource consumed with the buffer, including +// mutex locks and other provisioned resources. +median_error_t median_deinit_buffer(median_buffer_t m); // Initialize the scratch buffer. // This function is similar in behaviour to the // init function for the main buffer.
diff --git a/median.main.c b/median.main.c index bff30d1..32beae4 100644 --- a/median.main.c +++ b/median.main.c
@@ -31,11 +31,21 @@ assert(median_insert_data(buf,i,scratch) == MEDIAN_OK); } + // Dump the buffer. + assert(median_dump_stderr(buf) == MEDIAN_OK); + median_data_t median; - assert(median_output_median(buf, &median,scratch) == MEDIAN_OK); + assert(median_output_median(buf, &median, scratch) == MEDIAN_OK); fprintf(stderr, "And the median is %x\n", (unsigned)median); + // Free up the resources. + assert(median_deinit_buffer(buf) == MEDIAN_OK); + + // Free up the malloc'd space. + free(b); + free(s); + // All done, return 0 return 0; }