snapshot db protocol, cleanup
diff --git a/api.go b/api.go
index 40acb90..9ce3bea 100644
--- a/api.go
+++ b/api.go
@@ -28,7 +28,7 @@
 	Reason    string `json:"reason"`
 }
 
-func initAPI(services apid.Services) {
+func initAPI() {
 	services.API().HandleFunc("/deployments/current", handleCurrentDeployment).Methods("GET")
 	services.API().HandleFunc("/deployments/{deploymentID}", handleDeploymentResult).Methods("POST")
 }
diff --git a/api_test.go b/api_test.go
index be371be..1076ff8 100644
--- a/api_test.go
+++ b/api_test.go
@@ -19,6 +19,7 @@
 
 		It("should get 404 if no deployments", func() {
 
+			db := getDB()
 			_, err := db.Exec("DELETE FROM gateway_deploy_deployment")
 			Expect(err).ShouldNot(HaveOccurred())
 
@@ -77,6 +78,7 @@
 
 		It("should get 404 after blocking if no deployment", func() {
 
+			db := getDB()
 			_, err := db.Exec("DELETE FROM gateway_deploy_deployment")
 			Expect(err).ShouldNot(HaveOccurred())
 
@@ -184,6 +186,7 @@
 
 		It("should mark a deployment as deployed", func() {
 
+			db := getDB()
 			deploymentID := "api_mark_deployed"
 			insertTestDeployment(testServer, deploymentID)
 
@@ -221,6 +224,7 @@
 
 		It("should mark a deployment as failed", func() {
 
+			db := getDB()
 			deploymentID := "api_test_3"
 			insertTestDeployment(testServer, deploymentID)
 
@@ -264,6 +268,7 @@
 
 func insertTestDeployment(server *httptest.Server, depID string) {
 
+	db := getDB()
 	uri, err := url.Parse(server.URL)
 	Expect(err).ShouldNot(HaveOccurred())
 	uri.Path = "/bundle"
@@ -284,7 +289,7 @@
 		},
 	}
 
-	err = insertDeployment(depID, dep)
+	err = insertDeployment(db, depID, dep)
 	Expect(err).ShouldNot(HaveOccurred())
 
 	err = updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index c8be184..7287e3e 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -30,11 +30,15 @@
 
 	config.Set("local_storage_path", tmpDir)
 
-	// init() will create the tables
 	apid.InitializePlugins()
 
+	db, err := data.DB()
+	Expect(err).NotTo(HaveOccurred())
+	initDB(db)
+	setDB(db)
+
 	router := apid.API().Router()
-	// fake unreliable bundle repo
+	// fake an unreliable bundle repo
 	downloadMultiplier = 10 * time.Millisecond
 	count := 0
 	router.HandleFunc("/bundle/{id}", func(w http.ResponseWriter, req *http.Request) {
diff --git a/data.go b/data.go
index f1da64f..45d2cc1 100644
--- a/data.go
+++ b/data.go
@@ -3,6 +3,8 @@
 import (
 	"database/sql"
 	"time"
+	"github.com/30x/apid"
+	"sync"
 )
 
 const (
@@ -16,59 +18,57 @@
 	BUNDLE_TYPE_DEP = 2
 )
 
+var (
+	unsafeDB apid.DB
+	dbMux    sync.RWMutex
+)
+
 type SQLExec interface {
 	Exec(query string, args ...interface{}) (sql.Result, error)
 }
 
-func initDB() {
+func initDB(db apid.DB) {
 
-	var count int
-	row := db.QueryRow("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='gateway_deploy_deployment';")
-	if err := row.Scan(&count); err != nil {
-		log.Panicf("Unable to check for tables: %v", err)
-	}
-	if count > 0 {
-		return
-	}
+	_, err := db.Exec(`
+	CREATE TABLE IF NOT EXISTS gateway_deploy_deployment (
+		id varchar(255), status integer, created_at integer,
+		modified_at integer, error_code varchar(255),
+		PRIMARY KEY (id));
 
-	log.Debug("Creating database tables...")
-
-	tx, err := db.Begin()
+	CREATE TABLE IF NOT EXISTS gateway_deploy_bundle (
+		deployment_id varchar(255), id varchar(255), scope varchar(255), uri varchar(255), type integer,
+		created_at integer, modified_at integer, status integer, error_code integer, error_reason text,
+		PRIMARY KEY (deployment_id, id),
+		FOREIGN KEY (deployment_id) references gateway_deploy_deployment(id) ON DELETE CASCADE);
+	`)
 	if err != nil {
-		log.Panicf("Unable to start transaction: %v", err)
-	}
-	defer tx.Rollback()
-
-	_, err = tx.Exec("CREATE TABLE gateway_deploy_deployment (" +
-		"id varchar(255), status integer, created_at integer, " +
-		"modified_at integer, error_code varchar(255), " +
-		"PRIMARY KEY (id));")
-	if err != nil {
-		log.Panicf("Unable to initialize gateway_deploy_deployment: %v", err)
+		log.Panicf("Unable to initialize database: %v", err)
 	}
 
-	_, err = tx.Exec("CREATE TABLE gateway_deploy_bundle (" +
-		"deployment_id varchar(255), id varchar(255), scope varchar(255), uri varchar(255), type integer, " +
-		"created_at integer, modified_at integer, status integer, error_code integer, error_reason text, " +
-		"PRIMARY KEY (deployment_id, id), " +
-		"FOREIGN KEY (deployment_id) references gateway_deploy_deployment(id) ON DELETE CASCADE);")
-	if err != nil {
-		log.Panicf("Unable to initialize gateway_deploy_bundle: %v", err)
-	}
-
-	err = tx.Commit()
-	if err != nil {
-		log.Panicf("Unable to commit transaction: %v", err)
-	} else {
-		log.Debug("Database tables created.")
-	}
+	log.Debug("Database tables created.")
 }
 
 func dbTimeNow() int64 {
 	return int64(time.Now().UnixNano())
 }
 
-func insertDeployment(depID string, dep deployment) error {
+func getDB() apid.DB {
+	dbMux.RLock()
+	db := unsafeDB
+	dbMux.RUnlock()
+	return db
+}
+
+func setDB(db apid.DB) {
+	dbMux.Lock()
+	if unsafeDB == nil { // init API when DB is initialized
+		go initAPI()
+	}
+	unsafeDB = db
+	dbMux.Unlock()
+}
+
+func insertDeployment(db apid.DB, depID string, dep deployment) error {
 
 	log.Debugf("insertDeployment: %s", depID)
 
@@ -127,6 +127,8 @@
 
 	log.Debugf("updateDeploymentAndBundles: %s", depID)
 
+	db := getDB()
+
 	/*
 	 * If the state of deployment was success, update state of bundles and
 	 * its deployments as success as well
@@ -232,6 +234,7 @@
 // getCurrentDeploymentID returns the ID of what should be the "current" deployment
 func getCurrentDeploymentID() (string, error) {
 
+	db := getDB()
 	var depID string
 	err := db.QueryRow("SELECT id FROM gateway_deploy_deployment " +
 		"WHERE status >= ? ORDER BY created_at DESC LIMIT 1;", DEPLOYMENT_STATE_READY).Scan(&depID)
@@ -242,6 +245,7 @@
 // getDeployment returns a fully populated deploymentResponse
 func getDeployment(depID string) (*deployment, error) {
 
+	db := getDB()
 	rows, err := db.Query("SELECT id, type, uri, COALESCE(scope, '') as scope " +
 		"FROM gateway_deploy_bundle WHERE deployment_id=?;", depID)
 	if err != nil {
diff --git a/deployments.go b/deployments.go
index c10a212..6d1a624 100644
--- a/deployments.go
+++ b/deployments.go
@@ -12,6 +12,7 @@
 	"errors"
 	"io/ioutil"
 	"time"
+	"github.com/30x/apid"
 )
 
 // todo: remove downloaded bundle files from old deployments
@@ -180,11 +181,11 @@
 
 // returns first bundle download error
 // all bundles will be attempted regardless of errors, in the future we could retry
-func prepareDeployment(depID string, dep deployment) error {
+func prepareDeployment(db apid.DB, depID string, dep deployment) error {
 
 	log.Debugf("preparing deployment: %s", depID)
 
-	err := insertDeployment(depID, dep)
+	err := insertDeployment(db, depID, dep)
 	if err != nil {
 		log.Errorf("insert deployment failed: %v", err)
 		return err
diff --git a/init.go b/init.go
index 4468f08..07b3094 100644
--- a/init.go
+++ b/init.go
@@ -11,8 +11,9 @@
 )
 
 var (
+	services   apid.Services
 	log        apid.LogService
-	db         apid.DB
+	data       apid.DataService
 	bundlePath string
 )
 
@@ -20,14 +21,16 @@
 	apid.RegisterPlugin(initPlugin)
 }
 
-func initPlugin(services apid.Services) error {
+func initPlugin(s apid.Services) error {
+	services = s
 	log = services.Log().ForModule("apiGatewayDeploy")
 	log.Debug("start init")
 
 	config := services.Config()
 	config.SetDefault(configBundleDirKey, "bundles")
 
-	var err error
+	data = services.Data()
+
 	relativeBundlePath := config.GetString(configBundleDirKey)
 	if err := os.MkdirAll(relativeBundlePath, 0700); err != nil {
 		log.Panicf("Failed bundle directory creation: %v", err)
@@ -36,15 +39,8 @@
 	bundlePath = path.Join(storagePath, relativeBundlePath)
 	log.Infof("Bundle directory path is %s", bundlePath)
 
-	db, err = services.Data().DB()
-	if err != nil {
-		log.Panic("Unable to access DB", err)
-	}
-	initDB()
-
 	go distributeEvents()
 
-	initAPI(services)
 	initListener(services)
 
 	log.Debug("end init")
diff --git a/listener.go b/listener.go
index 8ad5411..915a6b5 100644
--- a/listener.go
+++ b/listener.go
@@ -28,12 +28,21 @@
 	} else if snapData, ok := e.(*common.Snapshot); ok {
 		processSnapshot(snapData)
 	} else {
-		log.Errorf("Received invalid event: %v", e)
+		log.Errorf("Received invalid event. Ignoring. %v", e)
 	}
 }
 
 func processSnapshot(snapshot *common.Snapshot) {
 
+	log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
+
+	db, err := data.DBVersion(snapshot.SnapshotInfo)
+	if err != nil {
+		log.Panicf("Unable to access database: %v", err)
+	}
+
+	initDB(db)
+
 	for _, table := range snapshot.Tables {
 		var err error
 		switch table.Name {
@@ -44,18 +53,20 @@
 			}
 			// todo: should be 0 or 1 *per system*!! - TBD
 			row := table.Rows[len(table.Rows)-1]
-			err = processNewManifest(row)
+			err = processNewManifest(db, row)
 		}
 		if err != nil {
 			log.Panicf("Error processing Snapshot: %v", err)
 		}
 	}
 
+	setDB(db)
 	log.Debug("Snapshot processed")
 }
 
 func processChangeList(changes *common.ChangeList) {
 
+	db := getDB()
 	for _, change := range changes.Changes {
 		log.Debugf("change table: %s operation: %s", change.Table, change.Operation)
 
@@ -64,7 +75,7 @@
 		case MANIFEST_TABLE:
 			switch change.Operation {
 			case common.Insert:
-				err = processNewManifest(change.NewRow)
+				err = processNewManifest(db, change.NewRow)
 			default:
 				log.Error("unexpected operation: %s", change.Operation)
 			}
@@ -75,7 +86,7 @@
 	}
 }
 
-func processNewManifest(row common.Row) error {
+func processNewManifest(db apid.DB, row common.Row) error {
 
 	var deploymentID, manifestString string
 	err := row.Get("id", &deploymentID)
@@ -93,7 +104,7 @@
 		return err
 	}
 
-	err = prepareDeployment(deploymentID, manifest)
+	err = prepareDeployment(db, deploymentID, manifest)
 	if err != nil {
 		log.Errorf("serviceDeploymentQueue prepare deployment failed: %s", deploymentID)
 		return err
diff --git a/listener_test.go b/listener_test.go
index 28d6ce3..a7cb03f 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -49,11 +49,13 @@
 		row["id"] = &common.ColumnVal{Value: deploymentID}
 		row["manifest_body"] = &common.ColumnVal{Value: string(depBytes)}
 
-		var event = common.Snapshot{}
-		event.Tables = []common.Table{
-			{
-				Name: MANIFEST_TABLE,
-				Rows: []common.Row{row},
+		var event = common.Snapshot{
+			SnapshotInfo: "test",
+			Tables: []common.Table{
+				{
+					Name: MANIFEST_TABLE,
+					Rows: []common.Row{row},
+				},
 			},
 		}
 
@@ -76,7 +78,7 @@
 
 		apid.Events().Listen(APIGEE_SYNC_EVENT, h)
 		apid.Events().Emit(APIGEE_SYNC_EVENT, &event)              // for standard listener
-		apid.Events().Emit(APIGEE_SYNC_EVENT, &common.Snapshot{}) // for test listener
+		apid.Events().Emit(APIGEE_SYNC_EVENT, &common.Snapshot{SnapshotInfo: "test"}) // for test listener
  	})
 
 	It("should process ApigeeSync change event", func(done Done) {