feature: detect new DDL and replicate it by forcing a new snapshot
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 6181ff0..afca88c 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -64,11 +64,33 @@
// This is actually the first test :)
// Tests that entire bootstrap and set of sync operations work
var lastSnapshot *common.Snapshot
+
+ expectedSnapshotTables := make(map[string] bool)
+ expectedSnapshotTables["kms.company"] = true
+ expectedSnapshotTables["edgex.apid_cluster"] = true
+ expectedSnapshotTables["edgex.data_scope"] = true
+
apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
defer GinkgoRecover()
if s, ok := event.(*common.Snapshot); ok {
+ //verify that during downloadDataSnapshot, knownTables was correctly populated
+ Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue())
+
+ /* After this, we will mock changes for tables not present in the initial snapshot
+ * until that is changed in the mock server, we have to spoof the known tables
+ */
+
+ //add apid_cluster and data_scope since those would present if this were a real scenario
+ knownTables["kms.app_credential"] = true
+ knownTables["kms.app_credential_apiproduct_mapper"] = true
+ knownTables["kms.developer"] = true
+ knownTables["kms.company_developer"] = true
+ knownTables["kms.api_product"] = true
+ knownTables["kms.app"] = true
+
+
lastSnapshot = s
for _, t := range s.Tables {
diff --git a/apigee_sync.go b/apigee_sync.go
index deead43..0df37d6 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -25,6 +25,7 @@
block string = "45"
lastSequence string
polling uint32
+ knownTables = make(map[string]bool)
)
/*
@@ -44,7 +45,7 @@
err := pollChangeAgent()
end := time.Now()
if err != nil {
- if _, ok := err.(apiError); ok {
+ if _, ok := err.(changeServerError); ok {
downloadDataSnapshot()
continue
}
@@ -130,7 +131,7 @@
continue
case http.StatusBadRequest:
- var apiErr apiError
+ var apiErr changeServerError
var b []byte
b, err = ioutil.ReadAll(r.Body)
if err != nil {
@@ -160,6 +161,13 @@
return err
}
+ if changesRequireDDLSync(resp) {
+ log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
+ return changeServerError{
+ Code: "DDL changes detected; must get new snapshot",
+ }
+ }
+
/* If valid data present, Emit to plugins */
if len(resp.Changes) > 0 {
done := make(chan bool)
@@ -186,6 +194,12 @@
}
}
+
+func changesRequireDDLSync(changes common.ChangeList) bool {
+
+ return !mapIsSubset(knownTables, extractTablesFromChangelist(changes))
+}
+
// simple doubling back-off
func createBackOff(retryIn, maxBackOff time.Duration) func() {
return func() {
@@ -235,6 +249,14 @@
scopes = append(scopes, apidInfo.ClusterID)
resp := downloadSnapshot(scopes)
+ knownTables = extractTablesFromSnapshot(resp)
+
+ db, err := data.DBVersion(resp.SnapshotInfo)
+ if err != nil {
+ log.Panicf("Database inaccessible: %v", err)
+ }
+ persistKnownTablesToDB(knownTables, db)
+
done := make(chan bool)
log.Info("Emitting Snapshot to plugins")
events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) {
@@ -248,20 +270,91 @@
}
}
+func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) {
+
+ tables = make(map[string]bool)
+
+ log.Debug("Extracting table names from snapshot")
+ if snapshot.Tables == nil {
+ //if this panic ever fires, it's a bug
+ log.Panicf("Attempt to extract known tables from snapshot without tables failed")
+ }
+
+ for _, table := range snapshot.Tables {
+ tables[table.Name] = true
+ }
+
+ return tables
+}
+
+func extractTablesFromChangelist(changes common.ChangeList) (tables map[string] bool) {
+
+ tables = make(map[string]bool)
+
+ for _, change := range changes.Changes {
+ tables[change.Table] = true
+ }
+
+ return tables
+}
+
+func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
+
+ tables = make(map[string]bool)
+
+ log.Debug("Extracting table names from existing DB")
+ rows, err := db.Query("SELECT name FROM _known_tables;")
+
+ if err != nil {
+ log.Panicf("Error reading current set of tables: %v", err)
+ }
+
+ for rows.Next() {
+ var table string
+ if err := rows.Scan(&table); err != nil {
+ log.Panicf("Error reading current set of tables: %v", err)
+ }
+ log.Debugf("Table %s found in existing db", table)
+
+ tables[table] = true
+ }
+ return tables
+}
+
+func persistKnownTablesToDB(tables map[string]bool, db apid.DB) {
+ log.Debugf("Inserting table names found in snapshot into db")
+
+ _, err := db.Exec(`CREATE TABLE _known_tables (name text, PRIMARY KEY(name));`)
+ if err != nil {
+ log.Panicf("Could not create _known_tables table: %s", err)
+ }
+
+ for name := range tables {
+ log.Debugf("Inserting %s into _known_tables", name)
+ _, err := db.Exec(`INSERT INTO _known_tables VALUES(?);`, name)
+ if err != nil {
+ log.Panicf("Error encountered inserting into known tables ", err)
+ }
+
+ }
+}
+
// Skip Downloading snapshot if there is already a snapshot available from previous run
func startOnLocalSnapshot(snapshot string) {
log.Infof("Starting on local snapshot: %s", snapshot)
// ensure DB version will be accessible on behalf of dependant plugins
- _, err := data.DBVersion(snapshot)
+ db, err := data.DBVersion(snapshot)
if err != nil {
log.Panicf("Database inaccessible: %v", err)
}
+ knownTables = extractTablesFromDB(db)
+
// allow plugins (including this one) to start immediately on existing database
// Note: this MUST have no tables as that is used as an indicator
snap := &common.Snapshot{
- SnapshotInfo: apidInfo.LastSnapshot,
+ SnapshotInfo: snapshot,
}
events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) {
go pollForChanges()
@@ -356,10 +449,28 @@
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
}
-type apiError struct {
+type changeServerError struct {
Code string `json:"code"`
}
-func (a apiError) Error() string {
+func (a changeServerError) Error() string {
return a.Code
}
+
+/*
+ * Determine is map b is a subset of map a
+ */
+func mapIsSubset(a map[string]bool, b map[string]bool) bool {
+
+ if b == nil {
+ return true;
+ }
+
+ for k := range b {
+ if !a[k] {
+ return false
+ }
+ }
+
+ return true
+}
\ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 597536d..3f8373c 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -11,12 +11,22 @@
It("should bootstrap from local DB if present", func(done Done) {
+ expectedTables := make(map[string] bool)
+ expectedTables["kms.company"] = true
+ expectedTables["edgex.apid_cluster"] = true
+ expectedTables["edgex.data_scope"] = true
+
+
Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
defer GinkgoRecover()
if s, ok := event.(*common.Snapshot); ok {
+
+ //verify that the knownTables array has been properly populated from existing DB
+ Expect(mapIsSubset(knownTables, expectedTables)).To(BeTrue())
+
Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
Expect(s.Tables).To(BeNil())
diff --git a/mock_server.go b/mock_server.go
index 71b6d97..6907755 100644
--- a/mock_server.go
+++ b/mock_server.go
@@ -311,7 +311,7 @@
val := atomic.SwapInt32(m.newSnap, 0)
if val > 0 {
w.WriteHeader(http.StatusBadRequest)
- apiErr := apiError{
+ apiErr := changeServerError{
Code: "SNAPSHOT_TOO_OLD",
}
bytes, err := json.Marshal(apiErr)