Merge branch 'XAPID-999' into rj_XAPID-999
diff --git a/api.go b/api.go
index fd41fb8..6dbdba3 100644
--- a/api.go
+++ b/api.go
@@ -108,12 +108,17 @@
 	deploymentsChanged  chan interface{}
 	addSubscriber       chan chan deploymentsResult
 	removeSubscriber    chan chan deploymentsResult
+	apiInitialized      bool
 }
 
 func (a *apiManager) InitAPI() {
-	log.Debug("API endpoints initialized")
+	if a.apiInitialized {
+		return
+	}
 	services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET")
 	services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET")
+	a.apiInitialized = true
+	log.Debug("API endpoints initialized")
 }
 
 func (a *apiManager) addChangedDeployment(id string) {
diff --git a/api_test.go b/api_test.go
index 66e790d..bbdd9c4 100644
--- a/api_test.go
+++ b/api_test.go
@@ -311,7 +311,6 @@
 		CreatedBy:      "haoming@google.com",
 		Updated:        time.Now().Format(time.RFC3339),
 		UpdatedBy:      "haoming@google.com",
-		BlobFSLocation: "BlobFSLocation",
 	}
 	return dep
 }
@@ -338,10 +337,11 @@
 	readyDeployments []DataDeployment
 	localFSLocation  string
 	fileResponse     chan string
+	version          string
 }
 
 func (d *dummyDbManager) setDbVersion(version string) {
-
+	d.version = version
 }
 
 func (d *dummyDbManager) initDb() error {
diff --git a/bundle.go b/bundle.go
index b0dc434..a53f31c 100644
--- a/bundle.go
+++ b/bundle.go
@@ -36,7 +36,7 @@
 	queueDownloadRequest(*DataDeployment)
 	enqueueRequest(*DownloadRequest)
 	makeDownloadRequest(string) *DownloadRequest
-	//deleteBundles([]DataDeployment)
+	deleteBundles([]DataDeployment)
 	Close()
 }
 
@@ -128,7 +128,7 @@
 			for _, dep := range deletedDeployments {
 				bundleFile := getBlobFilePath(dep.BlobID)
 				log.Debugf("removing old bundle: %v", bundleFile)
-				// TODO Remove from the Database table edgex_blob_available
+				// TODO Remove from the Database table apid_blob_available
 				safeDelete(bundleFile)
 			}
 		}()
diff --git a/bundle_test.go b/bundle_test.go
index fa44b03..e848cec 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -132,9 +132,11 @@
 })
 
 type dummyApiManager struct {
+	initCalled bool
 }
 
 func (a *dummyApiManager) InitAPI() {
+	a.initCalled = true
 }
 
 type dummyBlobServer struct {
diff --git a/data.go b/data.go
index e02cf60..366e010 100644
--- a/data.go
+++ b/data.go
@@ -38,7 +38,6 @@
 	CreatedBy      string
 	Updated        string
 	UpdatedBy      string
-	BlobFSLocation string
 }
 
 type SQLExec interface {
@@ -78,7 +77,7 @@
 
 func (dbc *dbManager) initDb() error {
 	_, err := dbc.getDb().Exec(`
-	CREATE TABLE IF NOT EXISTS edgex_blob_available (
+	CREATE TABLE IF NOT EXISTS apid_blob_available (
 		id text primary key,
    		local_fs_location text NOT NULL
 	);
@@ -87,7 +86,7 @@
 		return err
 	}
 
-	log.Debug("Database table edgex_blob_available created.")
+	log.Debug("Database table apid_blob_available created.")
 	return nil
 }
 
@@ -99,7 +98,7 @@
 	rows, err := dbc.getDb().Query(`
 	SELECT a.bean_blob_id
 		FROM metadata_runtime_entity_metadata as a
-		LEFT JOIN edgex_blob_available as b
+		LEFT JOIN apid_blob_available as b
 		ON a.bean_blob_id = b.id
 		WHERE b.id IS NULL;
 	`)
@@ -118,7 +117,7 @@
 	rows, err = dbc.getDb().Query(`
 	SELECT a.resource_blob_id
 		FROM metadata_runtime_entity_metadata as a
-		LEFT JOIN edgex_blob_available as b
+		LEFT JOIN apid_blob_available as b
 		ON a.resource_blob_id = b.id
 		WHERE (b.id IS NULL AND a.resource_blob_id IS NOT NULL AND a.resource_blob_id != '');
 	`)
@@ -140,6 +139,7 @@
 // TODO there's a bug in the db statement
 func (dbc *dbManager) getReadyDeployments() ([]DataDeployment, error) {
 
+
 	rows, err := dbc.getDb().Query(`
 		SELECT 	a.id,
 			a.organization_id,
@@ -181,7 +181,6 @@
 				ON a.bean_blob_id = b.id
 				WHERE a.resource_blob_id = ""
 		)
-
 	;
 	`)
 
@@ -209,7 +208,7 @@
 func (dbc *dbManager) updateLocalFsLocation(blobId, localFsLocation string) error {
 
 	stmt, err := dbc.getDb().Prepare(`
-		INSERT OR IGNORE INTO edgex_blob_available (
+		INSERT OR IGNORE INTO apid_blob_available (
 		id,
 		local_fs_location
 		) VALUES (?, ?);`)
@@ -221,11 +220,11 @@
 
 	_, err = stmt.Exec(blobId, localFsLocation)
 	if err != nil {
-		log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
+		log.Errorf("UPDATE apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
 		return err
 	}
 
-	log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
+	log.Debugf("INSERT apid_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
 	return nil
 
 }
@@ -233,7 +232,7 @@
 func (dbc *dbManager) getLocalFSLocation(blobId string) (localFsLocation string, err error) {
 
 	log.Debugf("Getting the blob file for blobId {%s}", blobId)
-	rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE id = '" + blobId + "'")
+	rows, err := dbc.getDb().Query("SELECT local_fs_location FROM apid_blob_available WHERE id = '" + blobId + "'")
 	if err != nil {
 		log.Errorf("SELECT local_fs_location failed %v", err)
 		return "", err
@@ -268,7 +267,6 @@
 			&dep.CreatedBy,
 			&dep.Updated,
 			&dep.UpdatedBy,
-			&dep.BlobFSLocation,
 		)
 		if err != nil {
 			return nil, err
diff --git a/data_test.go b/data_test.go
index d678828..29f777a 100644
--- a/data_test.go
+++ b/data_test.go
@@ -47,7 +47,8 @@
 		}
 		testDbMan.setDbVersion("test" + strconv.Itoa(testCount))
 		initTestDb(testDbMan.getDb())
-		testDbMan.initDb()
+		err := testDbMan.initDb()
+		Expect(err).Should(Succeed())
 		time.Sleep(100 * time.Millisecond)
 	})
 
@@ -57,10 +58,15 @@
 	})
 
 	Context("db tests", func() {
+		It("initDb() should be idempotent", func() {
+			err := testDbMan.initDb()
+			Expect(err).Should(Succeed())
+		})
+
 		It("should succefully initialized tables", func() {
-			// edgex_blob_available
+			// apid_blob_available
 			rows, err := testDbMan.getDb().Query(`
-				SELECT count(*) from edgex_blob_available;
+				SELECT count(*) from apid_blob_available;
 			`)
 			Expect(err).Should(Succeed())
 			defer rows.Close()
@@ -86,9 +92,9 @@
 
 			err := testDbMan.updateLocalFsLocation(readyBlobId, readyblobLocalFs)
 			Expect(err).Should(Succeed())
-			// edgex_blob_available
+			// apid_blob_available
 			rows, err := testDbMan.getDb().Query(`
-				SELECT count(*) from edgex_blob_available;
+				SELECT count(*) from apid_blob_available;
 			`)
 			Expect(err).Should(Succeed())
 			defer rows.Close()
@@ -104,7 +110,7 @@
 			err := testDbMan.updateLocalFsLocation(readyBlobId, readyblobLocalFs)
 			Expect(err).Should(Succeed())
 
-			// edgex_blob_available
+			// apid_blob_available
 			location, err := testDbMan.getLocalFSLocation(readyBlobId)
 			Expect(err).Should(Succeed())
 			Expect(location).Should(Equal(readyblobLocalFs))
@@ -118,7 +124,6 @@
 			Expect(err).Should(Succeed())
 			Expect(len(deps)).Should(Equal(1))
 			Expect(deps[0].BlobID).Should(Equal(readyBlobId))
-			Expect(deps[0].BlobFSLocation).Should(Equal(readyblobLocalFs))
 		})
 
 		It("should succefully get unready blob ids", func() {
diff --git a/init.go b/init.go
index 43ae0e0..d27b566 100644
--- a/init.go
+++ b/init.go
@@ -49,8 +49,7 @@
 	bundlePath       string
 	debounceDuration time.Duration
 	apiServerBaseURI *url.URL
-	apidInstanceID   string
-	apidClusterID    string
+	eventHandler     *apigeeSyncHandler
 )
 
 func init() {
@@ -78,16 +77,6 @@
 		return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err)
 	}
 
-	if !config.IsSet(configApidInstanceID) {
-		return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID)
-	}
-	apidInstanceID = config.GetString(configApidInstanceID)
-
-	if !config.IsSet(configApidClusterID) {
-		return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID)
-	}
-	apidClusterID = config.GetString(configApidClusterID)
-
 	config.SetDefault(configBundleDirKey, "bundles")
 	config.SetDefault(configDebounceDuration, time.Second)
 	config.SetDefault(configBundleCleanupDelay, time.Minute)
@@ -135,6 +124,7 @@
 		deploymentsChanged:  make(chan interface{}, 5),
 		addSubscriber:       make(chan chan deploymentsResult),
 		removeSubscriber:    make(chan chan deploymentsResult),
+		apiInitialized:      false,
 	}
 
 	// initialize bundle manager
@@ -167,7 +157,15 @@
 	//TODO initialize apiMan.distributeEvents() for api call with "block"
 	//go apiMan.distributeEvents()
 
-	initListener(services, dbMan, apiMan, bundleMan)
+	// initialize event handler
+	eventHandler = &apigeeSyncHandler{
+		dbMan:     dbMan,
+		apiMan:    apiMan,
+		bundleMan: bundleMan,
+		closed:    false,
+	}
+
+	eventHandler.initListener(services)
 
 	log.Debug("end init")
 
diff --git a/listener.go b/listener.go
index a6b637c..8961bae 100644
--- a/listener.go
+++ b/listener.go
@@ -22,19 +22,18 @@
 
 const (
 	APIGEE_SYNC_EVENT     = "ApigeeSync"
-	CONFIG_METADATA_TABLE = "project.runtime_blob_metadata"
+	CONFIG_METADATA_TABLE = "metadata.runtime_entity_metadata"
 )
 
-var apiInitialized bool
+func (h *apigeeSyncHandler) initListener(services apid.Services) {
+	services.Events().Listen(APIGEE_SYNC_EVENT, h)
+}
 
-func initListener(services apid.Services, dbMan dbManagerInterface, apiMan apiManagerInterface, bundleMan bundleManagerInterface) {
-	handler := &apigeeSyncHandler{
-		dbMan:     dbMan,
-		apiMan:    apiMan,
-		bundleMan: bundleMan,
+func (h *apigeeSyncHandler) stopListener(services apid.Services) {
+	if !h.closed {
+		services.Events().StopListening(APIGEE_SYNC_EVENT, h)
+		h.closed = true
 	}
-
-	services.Events().Listen(APIGEE_SYNC_EVENT, handler)
 }
 
 type bundleConfigJson struct {
@@ -48,6 +47,7 @@
 	dbMan     dbManagerInterface
 	apiMan    apiManagerInterface
 	bundleMan bundleManagerInterface
+	closed    bool
 }
 
 func (h *apigeeSyncHandler) String() string {
@@ -72,10 +72,7 @@
 	h.dbMan.setDbVersion(snapshot.SnapshotInfo)
 
 	h.startupOnExistingDatabase()
-	if !apiInitialized {
-		h.apiMan.InitAPI()
-		apiInitialized = true
-	}
+	h.apiMan.InitAPI()
 	log.Debug("Snapshot processed")
 }
 
@@ -83,7 +80,7 @@
 func (h *apigeeSyncHandler) startupOnExistingDatabase() {
 	// start bundle downloads that didn't finish
 	go func() {
-		// create edgex_blob_available table
+		// create apid_blob_available table
 		h.dbMan.initDb()
 		blobIds, err := h.dbMan.getUnreadyBlobs()
 
@@ -130,15 +127,15 @@
 		}
 	*/
 
-	for _, dep := range insertedDeployments {
-		go h.bundleMan.queueDownloadRequest(&dep)
+	for i := range insertedDeployments {
+		go h.bundleMan.queueDownloadRequest(&insertedDeployments[i])
 	}
 
 	// clean up old bundles
 	if len(deletedDeployments) > 0 {
 		log.Debugf("will delete %d old bundles", len(deletedDeployments))
 		//TODO delete bundles for deleted deployments
-		//h.bundleMan.deleteBundles(deletedDeployments)
+		h.bundleMan.deleteBundles(deletedDeployments)
 	}
 }
 
diff --git a/listener_test.go b/listener_test.go
new file mode 100644
index 0000000..d82ce1c
--- /dev/null
+++ b/listener_test.go
@@ -0,0 +1,220 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apiGatewayConfDeploy
+
+import (
+	"fmt"
+	"github.com/30x/apid-core"
+	"github.com/apigee-labs/transicator/common"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"math/rand"
+	"reflect"
+	"time"
+)
+
+var _ = Describe("listener", func() {
+	var dummyDbMan *dummyDbManager
+	var dummyApiMan *dummyApiManager
+	var dummyBundleMan *dummyBundleManager
+	var testHandler *apigeeSyncHandler
+	var _ = BeforeEach(func() {
+		// stop handler created by initPlugin()
+		eventHandler.stopListener(services)
+
+		dummyApiMan = &dummyApiManager{}
+		dummyDbMan = &dummyDbManager{}
+		dummyBundleMan = &dummyBundleManager{
+			requestChan: make(chan *DownloadRequest),
+			depChan:     make(chan *DataDeployment),
+			delChan:     make(chan *DataDeployment),
+		}
+		testHandler = &apigeeSyncHandler{
+			dbMan:     dummyDbMan,
+			apiMan:    dummyApiMan,
+			bundleMan: dummyBundleMan,
+		}
+		testHandler.initListener(services)
+		time.Sleep(100 * time.Millisecond)
+	})
+
+	var _ = AfterEach(func() {
+		testHandler.stopListener(services)
+	})
+	Context("Snapshot", func() {
+
+		It("Snapshot event shoud enqueue download requests for all unready blobs", func() {
+			// init unready blob ids
+			unreadyBlobIds := make([]string, 0)
+			blobMap := make(map[string]int)
+			for i := 0; i < 1+rand.Intn(10); i++ {
+				id := GenerateUUID()
+				blobMap[id] = 1
+				unreadyBlobIds = append(unreadyBlobIds, id)
+			}
+			dummyDbMan.unreadyBlobIds = unreadyBlobIds
+
+			// emit snapshot
+			snapshot := &common.Snapshot{
+				SnapshotInfo: fmt.Sprint(rand.Uint32()),
+			}
+
+			apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+
+			for i := 0; i < len(unreadyBlobIds); i++ {
+				req := <-dummyBundleMan.requestChan
+				blobMap[req.blobId]++
+			}
+
+			// verify all unready blobids are enqueued
+			for _, val := range blobMap {
+				Expect(val).Should(Equal(2))
+			}
+		})
+
+		It("Snapshot events shoud set db version, and should only init API endpoint once", func() {
+
+			// emit snapshot
+			for i := 0; i < 2+rand.Intn(5); i++ {
+				version := fmt.Sprint(rand.Uint32())
+				snapshot := &common.Snapshot{
+					SnapshotInfo: version,
+				}
+				<-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+				Expect(dummyDbMan.version).Should(Equal(version))
+			}
+
+			// verify init API called
+			Expect(dummyApiMan.initCalled).Should(BeTrue())
+		})
+
+	})
+
+	Context("Change list", func() {
+
+		It("Insert event shoud enqueue download requests for all inserted deployments", func() {
+			// emit change event
+			changes := make([]common.Change, 0)
+			deployments := make(map[string]DataDeployment)
+			for i := 0; i < 1+rand.Intn(10); i++ {
+				dep := makeTestDeployment()
+				change := common.Change{
+					Operation: common.Insert,
+					Table:     CONFIG_METADATA_TABLE,
+					NewRow:    rowFromDeployment(dep),
+				}
+				changes = append(changes, change)
+				deployments[dep.ID] = *dep
+			}
+
+			changeList := &common.ChangeList{
+				Changes: changes,
+			}
+
+			apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+
+			// verify
+			for i := 0; i < len(changes); i++ {
+				dep := <-dummyBundleMan.depChan
+				Expect(reflect.DeepEqual(deployments[dep.ID], *dep)).Should(BeTrue())
+				delete(deployments, dep.ID)
+			}
+			Expect(len(deployments)).Should(BeZero())
+		})
+
+		It("Delete event shoud deliver to the bundle manager", func() {
+			// emit change event
+			changes := make([]common.Change, 0)
+			deployments := make(map[string]bool)
+			for i := 0; i < 1+rand.Intn(10); i++ {
+				dep := makeTestDeployment()
+				change := common.Change{
+					Operation: common.Delete,
+					Table:     CONFIG_METADATA_TABLE,
+					OldRow:    rowFromDeployment(dep),
+				}
+				changes = append(changes, change)
+				deployments[dep.ID] = true
+			}
+
+			changeList := &common.ChangeList{
+				Changes: changes,
+			}
+
+			apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+
+			// verify
+			for i := 0; i < len(changes); i++ {
+				dep := <-dummyBundleMan.delChan
+				Expect(deployments[dep.ID]).Should(BeTrue())
+				delete(deployments, dep.ID)
+			}
+			Expect(len(deployments)).Should(BeZero())
+		})
+	})
+})
+
+type dummyBundleManager struct {
+	requestChan chan *DownloadRequest
+	depChan     chan *DataDeployment
+	delChan     chan *DataDeployment
+}
+
+func (bm *dummyBundleManager) initializeBundleDownloading() {
+
+}
+
+func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) {
+	bm.depChan <- dep
+}
+
+func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) {
+	bm.requestChan <- req
+}
+
+func (bm *dummyBundleManager) makeDownloadRequest(blobId string) *DownloadRequest {
+	return &DownloadRequest{
+		blobId: blobId,
+	}
+}
+
+func (bm *dummyBundleManager) deleteBundles(deployments []DataDeployment) {
+	for i := range deployments {
+		bm.delChan <- &deployments[i]
+	}
+}
+
+func (bm *dummyBundleManager) Close() {
+
+}
+
+func rowFromDeployment(dep *DataDeployment) common.Row {
+	row := common.Row{}
+	row["id"] = &common.ColumnVal{Value: dep.ID}
+	row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
+	row["environment_id"] = &common.ColumnVal{Value: dep.EnvID}
+	row["bean_blob_id"] = &common.ColumnVal{Value: dep.BlobID}
+	row["resource_blob_id"] = &common.ColumnVal{Value: dep.BlobResourceID}
+	row["type"] = &common.ColumnVal{Value: dep.Type}
+	row["name"] = &common.ColumnVal{Value: dep.Name}
+	row["revision"] = &common.ColumnVal{Value: dep.Revision}
+	row["path"] = &common.ColumnVal{Value: dep.Path}
+	row["created_at"] = &common.ColumnVal{Value: dep.Created}
+	row["created_by"] = &common.ColumnVal{Value: dep.CreatedBy}
+	row["updated_at"] = &common.ColumnVal{Value: dep.Updated}
+	row["updated_by"] = &common.ColumnVal{Value: dep.UpdatedBy}
+
+	return row
+}