Some cleanup to separate the buffers

Output and state are now managed in separate buffers.
diff --git a/median.c b/median.c
index 1dad56d..9e2a572 100644
--- a/median.c
+++ b/median.c
@@ -12,15 +12,6 @@
 // Where the number of blocks is nLevels and stored in Metadata, and nMembers
 // is stored in Metadata as well.
 // The buffer is cast into this struct.
-typedef struct metadata {
-  unsigned int gid;        // The next available global id
-  size_t nLevels;          // Number of buffers available.
-  size_t nMembers;         // The number of members in each block.
-  size_t currentBufferIdx; // The index of the currently active buffer.
-  size_t spareSize;        // How much spare space is there in the buffer.
-  unsigned char data[0];   // The start of the data.
-} metadata_t;
-
 typedef struct bufferdata {
   unsigned int id;           // Block identity.
   size_t level;              // The level of this buffer.
@@ -28,6 +19,30 @@
   median_data_t data[0];     // The data for this buffer.
 } blockdata_t;
 
+typedef struct metadata {
+  unsigned int gid;        // The next available global id
+  size_t nLevels;          // Number of buffers available.
+  size_t nMembers;         // The number of members in each block.
+  size_t currentBufferIdx; // The index of the currently active buffer.
+  unsigned char data[0];   // The start of the data.
+  blockdata_t blocks[0];   // Makes the code cleaner and avoids unnecessary casting.
+} metadata_t;
+
+typedef struct sortrec {
+  median_data_t d;
+  size_t        sz;
+} sort_rec_t;
+
+typedef struct spare_buffer {
+  size_t spareSize;             // Size in bytes of the spare block.
+  size_t recordSize;            // Size in SortRecords
+  size_t datumSize;             // Size in number of Datum elements.
+  unsigned char data[0];        // Start of the data block.
+  sort_rec_t records[0];        // The same point cast as a sort_rec
+  median_data_t median_data[0]; // The same point cast as a median datum
+  size_t sizes[0];              // The same point case as size_t
+} spare_buffer_t;
+
 
 // ----------------------------------------------------------------------------
 // Helper functions.
@@ -44,11 +59,17 @@
   return nLevels;
 }
 
-// This returns the blocksize.
+// This returns the blocksize in bytes.
 size_t blocksize(metadata_t * m) {
   return sizeof(blockdata_t) + m->nMembers * sizeof(median_data_t);
 }
 
+// This return the start of the next block given
+// a pointer to a block.
+blockdata_t * nextblock(blockdata_t * blk, metadata_t * m) {
+  return (blockdata_t *) (blk->data + m->nMembers);
+}
+
 // This returns the size of the entire buffer.
 size_t buffersize(metadata_t * m) {
   return sizeof(metadata_t) + m->nLevels * blocksize(m);
@@ -57,12 +78,14 @@
 // This is largely so that the "output" function
 // can be run without ruining the buffer.
 size_t sparespace(metadata_t * m) {
-  return 2 * buffersize(m);
+  return sizeof(spare_buffer_t) + m->nLevels * m->nMembers * sizeof(sort_rec_t);
 }
 
-// Returns the first byte address which is out of bounds.
-unsigned char * out_of_bounds(metadata_t * m) {
-  return m->data + buffersize(m) - sizeof(blockdata_t);
+// Returns the first byte address which is too high to start a block regardless
+// of size..  One blockdata worth less than the actual end of the buffer plus
+// one.
+blockdata_t * out_of_bounds(metadata_t * m) {
+  return (blockdata_t *) (m->data + buffersize(m) - blocksize(m));
 }
 
 // Get the current block.
@@ -70,16 +93,12 @@
   size_t offset = m->currentBufferIdx * blocksize(m);
   return (blockdata_t *)(m->data + offset);
 }
+
 // Does the current block have space?
 unsigned int current_block_has_space(metadata_t* m) {
   return (get_current_block(m)->occupancy < m->nMembers);
 }
 
-// First byte of the spare space.
-void * spare_space(metadata_t * m) {
-  return out_of_bounds(m);
-}
-
 // This will only be called in a safe situation. It's an internal function.
 // Does not bound check!
 // Do not call it unless you know what you are doing.
@@ -91,17 +110,12 @@
 // Check if there is space in some block and if there is, then
 // make that block the current block.
 unsigned int occupy_available_empty_block(metadata_t * m) {
-  size_t idx = 0;
-  for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
-      ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
-      ptr += blocksize(m)) { // Advance by a block
-    blockdata_t * b = (blockdata_t *)ptr;
+  // Iterate over the blocks.
+  for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
     if(b->occupancy < m->nMembers) {
       // Empty block found!
-      m->currentBufferIdx = idx;
+      m->currentBufferIdx = b->id;
       return 1;
-    } else {
-      idx++;
     }
   }
   // Failed to find an empty block.
@@ -115,10 +129,13 @@
   return (*i < *j)?-1:+1;
 }
 
-unsigned int  merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j){
+unsigned int  merge_blocks(metadata_t *m,blockdata_t *i,blockdata_t *j, median_scratch_buffer_t spare){
   // Prepare an array for the data.
-  unsigned char * d = (unsigned char *)spare_space(m);
+  unsigned char * d = spare->data;
   size_t datasz = m->nMembers * sizeof(median_data_t);
+  if (2*datasz > spare->spareSize) {
+    return 0;
+  }
   bzero(d, 2*datasz);
 
   // This is the size of the data.
@@ -134,7 +151,7 @@
   // inefficiency in.
   // But we should fix it.
   qsort(d,2*m->nMembers,sizeof(median_data_t),&compare_data);
-  median_data_t * sorted_data = (median_data_t *)d;
+  median_data_t * sorted_data = spare->median_data;
 
   // Now interleave the sorted buffer.
   // We should track parity and take the
@@ -147,6 +164,8 @@
   // Update the metadata. i is full, j is empty.
   // half the items are gone.
   i->level++;
+
+  // Reset j to clean state.
   j->level = 0;
   j->occupancy = 0;
   bzero(j->data, datasz);
@@ -154,17 +173,23 @@
 }
 
 // Find the smallest level such that there are two blocks at the same level.
-unsigned int identify_qualified_level(metadata_t *m, size_t *lvl) {
-  // Zero out the count buffer.
-  size_t * levelcount = (size_t *)spare_space(m);
-  bzero(levelcount, m->nLevels * sizeof(size_t));
-  // Iterate over the blocks, and count up the blocks at each level.
-  for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
-      ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
-      ptr += blocksize(m)) { // Advance by a block
-    blockdata_t * b = (blockdata_t*) ptr;
+unsigned int identify_qualified_level(metadata_t *m, size_t *lvl, median_scratch_buffer_t spare) {
+  // Zero out the count buffer after validating the
+  // spare space buffer.
+  size_t datasz = m->nLevels * sizeof(size_t);
+  if (datasz > spare->spareSize) {
+    return 0;
+  }
+
+  // Prepare the size array.
+  size_t * levelcount = spare->sizes;
+  bzero(levelcount, datasz);
+
+  // Count up the level population.
+  for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
     levelcount[b->level]++;
   }
+
   // Look for a level where the count is at least 2.
   for(size_t idx = 0; idx < m->nLevels; idx++) {
      if(levelcount[idx] > 1) {
@@ -173,69 +198,75 @@
        return 1;
      }
   }
+
   // There is no such thing.
   return 0;
 }
 
-unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j) {
-  size_t qualified_level;
+// This identifies the pair of blocks to merge.
+unsigned int identify_compatible_block_pair(metadata_t *m, blockdata_t ** i, blockdata_t ** j, median_scratch_buffer_t spare) {
+  // The qualified level will reside here.
+  // Find the qualified level.
+  size_t qualified_level = 0;
+  if (!identify_qualified_level(m,&qualified_level,spare)) {
+    return 0;
+  }
+
   // Initial values to return.
   *i = 0;
   *j = 0;
-  // Find the qualified level.
-  if (!identify_qualified_level(m,&qualified_level)) {
-    return 0;
-  }
+
   // State machine to find the two buffers.
-  for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
-      ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
-      ptr += blocksize(m)) { // Advance by a block
-    blockdata_t * b = (blockdata_t*) ptr;
+  // Iterate over the blocks.
+  for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
     if (!(*i) && (b->level == qualified_level)) {
-      // Found the first one.
-      *i = b;
-      continue;
+      *i = b; continue; // Found the first one.
     }
     if (!(*j) && (b->level == qualified_level)) {
-      // Found both.
-      *j = b;
-      break;
+      *j = b; return 1; // Found the second one.
     }
   }
-  return 1;
+  // Loop should not exit before the two are found.
+  return 0;
 }
 
 
-unsigned int create_empty_block(metadata_t * m) {
+// THis routine creates an empty block by merging two available blocks into a
+// single block. Consequently this frees up one of them.
+// If two compatible blocks cannot be found, then this routine returns 0.
+// It also returns 0 if for some reason the two blocks cannot be merged.
+unsigned int create_empty_block(metadata_t * m, median_scratch_buffer_t spare) {
   blockdata_t *i,*j;
-  if(!identify_compatible_block_pair(m, &i, &j)){
-    // Nothing to do unless there is a compatible block pair
-    // to merge.
+  if(!identify_compatible_block_pair(m, &i, &j, spare)){
     return 0;
   }
-  return merge_blocks(m,i,j);
+  return merge_blocks(m,i,j,spare);
 }
 
-// Shift the current block.
-unsigned int shift_current_block(metadata_t * m) {
+// Shift the current block. If there is an empty block, great!
+// If not, then empty a block by merging two blocks and then occupy the
+// freed up block.
+unsigned int shift_current_block(metadata_t * m, median_scratch_buffer_t spare) {
   if(!occupy_available_empty_block(m)) {
-    if(create_empty_block(m)) {
-      // Now there should be an empty block!
-      // So occupy it.
-      occupy_available_empty_block(m);
+    if(create_empty_block(m,spare)) { // Now there should be an empty block!
+      if(!occupy_available_empty_block(m)) {
+        return 0; // Stuck we are. Empty block is not, even though create it we did.
+      }
     } else {
-      // Stuck we are, block not.
-      return 0;
+      return 0; // Stuck we are. Block not.
     }
   }
-  return 1;
+  return 1; // Occupy a block we did.
 }
 
 
 // ----------------------------------------------------------------------------
 // Implementation of the interface functions (specified in median.h).
+// ----------------------------------------------------------------------------
 
+// ----------------------------------------------------------------------------
 // Suggest the size of the buffer needed for a fixed epsilon and maxN.
+// ----------------------------------------------------------------------------
 median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size){
   // First check for sane parameters.
 
@@ -259,13 +290,24 @@
   // within the metadata_t block. I prefer keeping this data structure
   // relocatable and serializable. So we may need to think about what the
   // alternatives are.
-  *suggested_size = buffersize(&m)+sparespace(&m);
+  *suggested_size = buffersize(&m);
 
   // Things worked. Return MEDIAN_OK
   return MEDIAN_OK;
 }
 
+// ----------------------------------------------------------------------------
+// Suggest a buffer size for the scratch buffer.
+// ----------------------------------------------------------------------------
+median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size){
+  metadata_t * md = (metadata_t *) m;
+  *suggested_size = sparespace(md);
+  return MEDIAN_OK;
+}
+
+// ----------------------------------------------------------------------------
 // Initialize an allocated buffer.
+// ----------------------------------------------------------------------------
 median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer){
 
   // Initialize.
@@ -279,24 +321,21 @@
   // Cast the buffer as metadata_t so that we can initialize the metadata block.
   metadata_t * m = (metadata_t *)buffer;
   // Initialize the metadata block.
-  m->gid = 1;
+  m->gid = 0;
   m->nMembers = calculate_nMembers(epsilon);
   m->nLevels = calculate_nLevels(maxN);
   m->currentBufferIdx = 0;
-  m->spareSize = buffer_size - buffersize(m);
   // Zero all the data following the metadata block.
   // Strictly this should not be necessary, but I'm a coward!
   bzero(m->data, buffersize(m) - sizeof(metadata_t));
 
   // cast the data part as an array of blockdata_t fronted blocks.
   // and initialize each of the blocks.
-  for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
-      ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
-      ptr += blocksize(m)) { // Advance by a block
-    blockdata_t * t = (blockdata_t *) ptr;
-    t->id = (m->gid)++;
-    t->level  = 0;
-    t->occupancy = 0;
+  // Iterate over the blocks.
+  for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
+    b->id = (m->gid)++;
+    b->level  = 0;
+    b->occupancy = 0;
   }
 
   // Populate the return value.
@@ -304,7 +343,17 @@
   return error;
 }
 
+// ----------------------------------------------------------------------------
+// Initialize the scratch buffer.
+// ----------------------------------------------------------------------------
+median_error_t median_init_scratch_buffer(void * buffer, size_t buffer_size, median_buffer_t m, median_scratch_buffer_t * initialized_buffer){
+  median_scratch_buffer_t s = buffer;
+  s->spareSize = buffer_size - sizeof(spare_buffer_t);
+  *initialized_buffer = s;
+  return MEDIAN_OK;
+}
 
+// ----------------------------------------------------------------------------
 // Insert data into the buffer.
 // We could have made this more concise and pretty if we used short-circuit
 // execution of the boolean condition..
@@ -318,8 +367,10 @@
 //
 // But this code is harder to read, because of the side
 // effect issue.
+// ----------------------------------------------------------------------------
 median_error_t median_insert_data(median_buffer_t buffer,
-                                  median_data_t data){
+                                  median_data_t data,
+                                  median_scratch_buffer_t scratch){
   metadata_t * m = (metadata_t *) buffer;
   // If there is space in the current block..
   if(current_block_has_space(m)) {
@@ -329,7 +380,7 @@
     insert_data_into_current_block(m,data);
   } else {
     // Current block is full. So try and shift it.
-    if(shift_current_block(m)) {
+    if(shift_current_block(m, scratch)) {
       // The current block has shifted successfully.
       // So we can now call the unchecked method again.
       insert_data_into_current_block(m,data);
@@ -342,23 +393,65 @@
   return MEDIAN_OK;
 }
 
+// ----------------------------------------------------------------------------
+// Compare two sort records.
+// ----------------------------------------------------------------------------
+int compare_sort_rec(const void * a, const void *b) {
+  return ((sort_rec_t *)a)->d < ((sort_rec_t*)b)->d?-1:1;
+}
+
+// ----------------------------------------------------------------------------
+// Output the estimated median.
+// ----------------------------------------------------------------------------
 median_error_t median_output_median(const median_buffer_t buffer,
-                                    median_data_t *approximate_median){
+                                    median_data_t *approximate_median,
+                                    median_scratch_buffer_t scratch){
+  // Initialize the data buffer to zero.
+  metadata_t * m = (metadata_t *) buffer;
+  size_t datasz = m->nMembers * m->nLevels * sizeof(sort_rec_t);
+  if(datasz > scratch->spareSize) {
+    return MEDIAN_ERROR_BUFFER_TOO_SMALL;
+  }
+  sort_rec_t * s = (sort_rec_t *) scratch->data;
+  bzero(s, datasz);
+  // Initialize the count and weight values.
+  size_t count = 0;
+  size_t weight = 0;
+  // First construct the sort buffer.
+  for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
+    for(size_t i = 0; i < b->occupancy; i++) {
+      s->d = b->data[i];
+      s->sz = (1 << b->level);
+      weight += s->sz;
+      s++; count++;
+    }
+  }
+  qsort(scratch->data,count,sizeof(sort_rec_t),&compare_sort_rec);
+
+  // This is the weight at which the median will be found.
+  weight = weight/2;
+
+  // We now need to pick out the
+  // median.
+  for(s = (sort_rec_t *) scratch->data;
+      weight > s->sz;
+      weight -= (s++)->sz) ;
+
+  *approximate_median = s->d;
   return MEDIAN_OK;
 }
 
+// ----------------------------------------------------------------------------
 // Dump the state to stderr.
+// ----------------------------------------------------------------------------
 median_error_t median_dump_stderr(const median_buffer_t buffer){
   // First print out all the metadata.
   metadata_t * m = (metadata_t *) buffer;
   blockdata_t * current = get_current_block(m);
-  fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n SpareSize:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id,(unsigned)m->spareSize);
+  fprintf(stderr, "Metadata:\n Next GID:%d\n nMembers:%d\n nLevels:%d\n CurrentBlock:%d\n",m->gid,(unsigned)m->nMembers,(unsigned)m->nLevels, current->id);
 
+  for(blockdata_t * b = m->blocks; b < out_of_bounds(m); b = nextblock(b,m)) {
   // Now dump the blocks.
-  for(unsigned char * ptr = (unsigned char *) m->data; // Start of the data blocks.
-      ptr <= (m->data + buffersize(m) - blocksize(m)); // The next block will fit into the buffer.
-      ptr += blocksize(m)) { // Advance by a block
-    blockdata_t * b = (blockdata_t *)ptr;
     fprintf(stderr, "Block:%d\n Level:%d\n Occupancy:%d\n", b->id,(unsigned)b->level,(unsigned)b->occupancy);
     // For every block, dump the first 8 keys in the block.
     stderr_hexdmp("Data:",(unsigned char *)(b->data),8*sizeof(median_data_t));
@@ -367,3 +460,4 @@
   // Done, return MEDIAN_OK.
   return MEDIAN_OK;
 }
+// ----------------------------------------------------------------------------
diff --git a/median.h b/median.h
index c62fbbd..1a89ae6 100644
--- a/median.h
+++ b/median.h
@@ -20,9 +20,13 @@
 // functions in this file.
 typedef int32_t median_error_t;
 
+struct metadata_t;
+struct spare_buffer_t;
 // A buffer which contains all the state held
 // within an approximate median calculator.
-typedef void * median_buffer_t;
+// And another type for scratch data.
+typedef struct metadata * median_buffer_t;
+typedef struct spare_buffer * median_scratch_buffer_t;
 
 // A single data element. For now this is defined
 // to be a 64 bit integer. But in principle this could be
@@ -59,6 +63,9 @@
 // In all reasonable calls, the returned value will be
 // MEDIAN_ERROR_OK.
 median_error_t median_suggest_buffer_size(double epsilon, size_t maxN, size_t *suggested_size);
+// Suggest a size of the buffer for scratch space. This is needed
+// so that the buffer is not corrupted by the output and merge steps.
+median_error_t median_suggest_scratch_buffer_size(median_buffer_t m, size_t *suggested_size);
 
 // This routine will init the buffer. It will check if the buffer meets the
 // size restriction returned by #median_suggest_buffer_size(). If not, it will
@@ -77,6 +84,10 @@
 // In all reasonable calls, the returned value will be
 // MEDIAN_ERROR_OK.
 median_error_t median_init_buffer(void * buffer, size_t buffer_size, double epsilon, size_t maxN, median_buffer_t *initialized_buffer);
+// Initialize the scratch buffer.
+// This function is similar in behaviour to the
+// init function for the main buffer.
+median_error_t median_init_scratch_buffer(void * buffer, size_t buffer_size,  median_buffer_t m, median_scratch_buffer_t * initialized_buffer);
 
 // This routine inserts a data point into the median computation. It's
 // implemented to be really cheap. However, it is not always constant time.
@@ -93,7 +104,7 @@
 // then MEDIAN_ERROR_INVALID_PARAMETERS will be returned.
 // In all reasonable calls, the returned value will be
 // MEDIAN_ERROR_OK.
-median_error_t median_insert_data(median_buffer_t buffer, median_data_t data);
+median_error_t median_insert_data(median_buffer_t buffer, median_data_t data, median_scratch_buffer_t scratch_buffer);
 
 // This routine returns the approximate median of the data seen thusfar.
 // This is implemented to be really cheap as well, but not as cheap as the
@@ -108,7 +119,7 @@
 // return an error.
 // If it does return an error, the error will be
 // MEDIAN_ERROR_INVALID_PARAMETERS
-median_error_t median_output_median(const median_buffer_t buffer, median_data_t *approximate_median);
+median_error_t median_output_median(const median_buffer_t buffer, median_data_t *approximate_median, median_scratch_buffer_t scratch_buffer);
 
 // This hexdumps the state of the buffer to stderr so that it can help debug if needed.
 median_error_t median_dump_stderr(const median_buffer_t buffer);
diff --git a/median.main.c b/median.main.c
index 49dac5d..bff30d1 100644
--- a/median.main.c
+++ b/median.main.c
@@ -1,9 +1,10 @@
+#include <stdio.h>  // fprintf, stderr
 #include <stdlib.h> // size_t
 #include <assert.h> // assert
 #include "median.h"
 #include "hex.h"
 
-#define TEST_SIZE 1024*1024*128 // 128M
+#define TEST_SIZE 1024*1024*256 // 256M
 // A sinple main routine meant as an example,
 // and also to unit test most of the intersting
 // functions of the median module.
@@ -11,21 +12,29 @@
   // Call suggest buffer size to determine the size of the buffer
   // we need to build the data structure.
   size_t n = 0;
-  assert(median_suggest_buffer_size(.001, TEST_SIZE , &n) == MEDIAN_OK);
 
   // Allocate the buffer of the required size. And then
   // initialize it (median_init_buffer).
+  assert(median_suggest_buffer_size(.001, TEST_SIZE , &n) == MEDIAN_OK);
   void * b = malloc(n);
   median_buffer_t buf;
   assert(median_init_buffer(b, n, .001, TEST_SIZE, &buf) == MEDIAN_OK);
 
+  // Initialize the spare buffer.
+  assert(median_suggest_scratch_buffer_size(buf, &n) == MEDIAN_OK);
+  void * s = malloc(n);
+  median_scratch_buffer_t scratch;
+  assert(median_init_scratch_buffer(s,n,buf,&scratch) == MEDIAN_OK);
+
   // Insert some data.
   for(median_data_t i = 1; i < TEST_SIZE; i++) { // 1G
-    assert(median_insert_data(buf,i) == MEDIAN_OK);
+    assert(median_insert_data(buf,i,scratch) == MEDIAN_OK);
   }
 
-  // Dump it.
-  assert(median_dump_stderr(buf) == MEDIAN_OK);
+  median_data_t median;
+  assert(median_output_median(buf, &median,scratch) == MEDIAN_OK);
+
+  fprintf(stderr, "And the median is %x\n", (unsigned)median);
 
   // All done, return 0
   return 0;