Merge pull request #38 from 30x/go-1.8-fix

fix issue where go 1.8 would add duplicate Auth header
diff --git a/apigee_sync.go b/apigee_sync.go
index aa816bb..ab47d17 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -33,8 +33,8 @@
 		return
 	}
 
-	downloadBootSnapshot(nil)
-	downloadDataSnapshot(quitPollingSnapshotServer)
+	snapManager.downloadBootSnapshot()
+	snapManager.downloadDataSnapshot()
 
 	changeManager.pollChangeWithBackoff()
 
@@ -87,7 +87,7 @@
 }
 
 func addHeaders(req *http.Request) {
-	req.Header.Add("Authorization", "Bearer "+ tokenManager.getBearerToken())
+	req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
 	req.Header.Set("apid_instance_id", apidInfo.InstanceID)
 	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
 	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
@@ -113,4 +113,4 @@
 
 func (a changeServerError) Error() string {
 	return a.Code
-}
\ No newline at end of file
+}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 9cdec11..68db99f 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -155,7 +155,7 @@
 			}
 			pie.Plugins = append(pie.Plugins, pluginData)
 			postInitPlugins(pie)
-		}, 3)
+		}, 5)
 
 		It("should bootstrap from local DB if present", func(done Done) {
 
@@ -254,7 +254,6 @@
 			testMock.forceNewSnapshot()
 		})
 
-
 		It("Verify the Sequence Number Logic works as expected", func() {
 			Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
 			Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
@@ -271,14 +270,17 @@
 			initializeContext()
 
 			tokenManager = createTokenManager()
+			snapManager = createSnapShotManager()
 			events.Listen(ApigeeSyncEventSelector, &handler{})
 
 			scopes := []string{apidInfo.ClusterID}
 			snapshot := &common.Snapshot{}
-			downloadSnapshot(scopes, snapshot, nil)
-			storeBootSnapshot(snapshot)
-			storeDataSnapshot(snapshot)
+			snapManager.downloadSnapshot(scopes, snapshot)
+			snapManager.storeBootSnapshot(snapshot)
+			snapManager.storeDataSnapshot(snapshot)
 			restoreContext()
+			<-snapManager.close()
+			tokenManager.close()
 		}, 3)
 	})
 })
diff --git a/backoff.go b/backoff.go
index e3a7403..94f411a 100644
--- a/backoff.go
+++ b/backoff.go
@@ -2,8 +2,8 @@
 
 import (
 	"math"
-	"time"
 	"math/rand"
+	"time"
 )
 
 const defaultInitial time.Duration = 200 * time.Millisecond
@@ -13,7 +13,7 @@
 type Backoff struct {
 	attempt         int
 	initial, max    time.Duration
-	jitter bool
+	jitter          bool
 	backoffStrategy func() time.Duration
 }
 
diff --git a/backoff_test.go b/backoff_test.go
index ae85909..f25399b 100644
--- a/backoff_test.go
+++ b/backoff_test.go
@@ -24,7 +24,7 @@
 		})
 
 		It("should properly apply exponential backoff strategy", func() {
-			b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+			b := NewExponentialBackoff(200*time.Millisecond, 2*time.Second, 2, false)
 			Expect(200 * time.Millisecond).To(Equal(b.Duration()))
 			Expect(1).To(Equal(b.Attempt()))
 			Expect(400 * time.Millisecond).To(Equal(b.Duration()))
@@ -36,7 +36,7 @@
 		})
 
 		It("should reset properly", func() {
-			b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+			b := NewExponentialBackoff(200*time.Millisecond, 2*time.Second, 2, false)
 			Expect(200 * time.Millisecond).To(Equal(b.Duration()))
 			Expect(1).To(Equal(b.Attempt()))
 			Expect(400 * time.Millisecond).To(Equal(b.Duration()))
diff --git a/changes.go b/changes.go
index aa8d822..0b77cba 100644
--- a/changes.go
+++ b/changes.go
@@ -44,18 +44,21 @@
 	if atomic.SwapInt32(c.isClosed, 1) == int32(1) {
 		log.Error("pollChangeManager: close() called on a closed pollChangeManager!")
 		go func() {
-			finishChan <- false
 			log.Debug("change manager closed")
+			finishChan <- false
 		}()
 		return finishChan
 	}
 	// not launched
 	if atomic.LoadInt32(c.isLaunched) == int32(0) {
-		log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! close tokenManager!")
+		log.Debug("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
+		log.Warn("Attempt to close unstarted change manager")
 		go func() {
+			c.quitChan <- true
 			tokenManager.close()
-			finishChan <- false
+			<-snapManager.close()
 			log.Debug("change manager closed")
+			finishChan <- false
 		}()
 		return finishChan
 	}
@@ -64,8 +67,9 @@
 	go func() {
 		c.quitChan <- true
 		tokenManager.close()
-		finishChan <- true
+		<-snapManager.close()
 		log.Debug("change manager closed")
+		finishChan <- true
 	}()
 	return finishChan
 }
@@ -75,11 +79,6 @@
  */
 
 func (c *pollChangeManager) pollChangeWithBackoff() {
-	// closed
-	if atomic.LoadInt32(c.isClosed) == int32(1) {
-		log.Error("pollChangeManager: pollChangeWithBackoff() called after closed")
-		return
-	}
 	// has been launched before
 	if atomic.SwapInt32(c.isLaunched, 1) == int32(1) {
 		log.Error("pollChangeManager: pollChangeWithBackoff() has been launched before")
@@ -267,7 +266,7 @@
 	}
 	if _, ok := err.(changeServerError); ok {
 		log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
-		downloadDataSnapshot(c.quitChan)
+		snapManager.downloadDataSnapshot()
 	} else {
 		log.Debugf("Error connecting to changeserver: %v", err)
 	}
diff --git a/init.go b/init.go
index e1b31c5..50ed49d 100644
--- a/init.go
+++ b/init.go
@@ -30,15 +30,15 @@
 
 var (
 	/* All set during plugin initialization */
-	log                       apid.LogService
-	config                    apid.ConfigService
-	dataService               apid.DataService
-	events                    apid.EventsService
-	apidInfo                  apidInstanceInfo
-	newInstanceID             bool
-	tokenManager              *tokenMan
-	changeManager             *pollChangeManager
-	quitPollingSnapshotServer chan bool
+	log           apid.LogService
+	config        apid.ConfigService
+	dataService   apid.DataService
+	events        apid.EventsService
+	apidInfo      apidInstanceInfo
+	newInstanceID bool
+	tokenManager  *tokenMan
+	changeManager *pollChangeManager
+	snapManager   *snapShotManager
 
 	/* Set during post plugin initialization
 	 * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -76,7 +76,7 @@
 	events = services.Events()
 	//TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
 	//also useful for testing
-	quitPollingSnapshotServer = make(chan bool)
+	snapManager = createSnapShotManager()
 	changeManager = createChangeManager()
 
 	// set up default database
diff --git a/snapshot.go b/snapshot.go
index 5a1c576..7a3c102 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -12,41 +12,140 @@
 	"io/ioutil"
 	"net/url"
 	"path"
+	"sync/atomic"
 	"time"
 )
 
+type snapShotManager struct {
+	// to send quit signal to the downloading thread
+	quitChan chan bool
+	// to mark the graceful close of snapshotManager
+	finishChan chan bool
+	// 0 for not closed, 1 for closed
+	isClosed *int32
+	// make sure close() returns immediately if there's no downloading/processing snapshot
+	isDownloading *int32
+}
+
+func createSnapShotManager() *snapShotManager {
+	isClosedInt := int32(0)
+	isDownloadingInt := int32(0)
+	return &snapShotManager{
+		quitChan:      make(chan bool, 1),
+		finishChan:    make(chan bool, 1),
+		isClosed:      &isClosedInt,
+		isDownloading: &isDownloadingInt,
+	}
+}
+
+/*
+ * thread-safe close of snapShotManager
+ * It marks status as closed immediately, and quits backoff downloading
+ * use <- close() for blocking close
+ * should only be called by pollChangeManager, because pollChangeManager is dependent on it
+ */
+func (s *snapShotManager) close() <-chan bool {
+	//has been closed before
+	if atomic.SwapInt32(s.isClosed, 1) == int32(1) {
+		log.Error("snapShotManager: close() called on a closed snapShotManager!")
+		go func() {
+			s.finishChan <- false
+			log.Debug("change manager closed")
+		}()
+		return s.finishChan
+	}
+	s.quitChan <- true
+	// wait until no downloading
+	for atomic.LoadInt32(s.isDownloading) == int32(1) {
+		time.Sleep(time.Millisecond)
+	}
+	s.finishChan <- true
+	return s.finishChan
+}
+
 // retrieve boot information: apid_config and apid_config_scope
-func downloadBootSnapshot(quitPolling chan bool) {
+func (s *snapShotManager) downloadBootSnapshot() {
+	if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
+		log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!")
+	}
+	defer atomic.StoreInt32(s.isDownloading, int32(0))
+
+	// has been closed
+	if atomic.LoadInt32(s.isClosed) == int32(1) {
+		log.Warn("snapShotManager: downloadBootSnapshot called on closed snapShotManager")
+		return
+	}
+
 	log.Debug("download Snapshot for boot data")
 
 	scopes := []string{apidInfo.ClusterID}
 	snapshot := &common.Snapshot{}
-	downloadSnapshot(scopes, snapshot, quitPolling)
-	storeBootSnapshot(snapshot)
+
+	err := s.downloadSnapshot(scopes, snapshot)
+	if err != nil {
+		// this may happen during shutdown
+		if _, ok := err.(quitSignalError); ok {
+			log.Warn("downloadBootSnapshot failed due to shutdown: " + err.Error())
+		}
+		return
+	}
+
+	// has been closed
+	if atomic.LoadInt32(s.isClosed) == int32(1) {
+		log.Error("snapShotManager: processSnapshot called on closed snapShotManager")
+		return
+	}
+
+	// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
+	s.storeBootSnapshot(snapshot)
 }
 
-func storeBootSnapshot(snapshot *common.Snapshot) {
+func (s *snapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
 	processSnapshot(snapshot)
 }
 
 // use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func downloadDataSnapshot(quitPolling chan bool) {
+func (s *snapShotManager) downloadDataSnapshot() {
+	if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
+		log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
+	}
+	defer atomic.StoreInt32(s.isDownloading, int32(0))
+
+	// has been closed
+	if atomic.LoadInt32(s.isClosed) == int32(1) {
+		log.Warn("snapShotManager: downloadDataSnapshot called on closed snapShotManager")
+		return
+	}
+
 	log.Debug("download Snapshot for data scopes")
 
 	scopes := findScopesForId(apidInfo.ClusterID)
 	scopes = append(scopes, apidInfo.ClusterID)
 	snapshot := &common.Snapshot{}
-	downloadSnapshot(scopes, snapshot, quitPolling)
-	storeDataSnapshot(snapshot)
+	err := s.downloadSnapshot(scopes, snapshot)
+	if err != nil {
+		// this may happen during shutdown
+		if _, ok := err.(quitSignalError); ok {
+			log.Warn("downloadDataSnapshot failed due to shutdown: " + err.Error())
+		}
+		return
+	}
+	s.storeDataSnapshot(snapshot)
 }
 
-func storeDataSnapshot(snapshot *common.Snapshot) {
+func (s *snapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
 	knownTables = extractTablesFromSnapshot(snapshot)
 
 	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
 	if err != nil {
 		log.Panicf("Database inaccessible: %v", err)
 	}
+
+	// if closed
+	if atomic.LoadInt32(s.isClosed) == int32(1) {
+		log.Warn("Trying to persistKnownTablesToDB with a closed snapShotManager")
+		return
+	}
 	persistKnownTablesToDB(knownTables, db)
 
 	log.Info("Emitting Snapshot to plugins")
@@ -55,6 +154,8 @@
 	case <-time.After(pluginTimeout):
 		log.Panic("Timeout. Plugins failed to respond to snapshot.")
 	case <-events.Emit(ApigeeSyncEventSelector, snapshot):
+		// the new snapshot has been processed
+		// if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency
 	}
 
 }
@@ -119,8 +220,15 @@
 	}
 }
 
+// a blocking method
 // will keep retrying with backoff until success
-func downloadSnapshot(scopes []string, snapshot *common.Snapshot, quitPolling chan bool) {
+
+func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
+	// if closed
+	if atomic.LoadInt32(s.isClosed) == int32(1) {
+		log.Warn("Trying to download snapshot with a closed snapShotManager")
+		return quitSignalError{}
+	}
 
 	log.Debug("downloadSnapshot")
 
@@ -150,8 +258,8 @@
 	//pollWithBackoff only accepts function that accept a single quit channel
 	//to accommodate functions which need more parameters, wrap them in closures
 	attemptDownload := getAttemptDownloadClosure(client, snapshot, uri)
-
-	pollWithBackoff(quitPolling, attemptDownload, handleSnapshotServerError)
+	pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
+	return nil
 }
 
 func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error {