feature: detect new DDL and replicate it by forcing a new snapshot
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index 6181ff0..afca88c 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -64,11 +64,33 @@ // This is actually the first test :) // Tests that entire bootstrap and set of sync operations work var lastSnapshot *common.Snapshot + + expectedSnapshotTables := make(map[string] bool) + expectedSnapshotTables["kms.company"] = true + expectedSnapshotTables["edgex.apid_cluster"] = true + expectedSnapshotTables["edgex.data_scope"] = true + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { defer GinkgoRecover() if s, ok := event.(*common.Snapshot); ok { + //verify that during downloadDataSnapshot, knownTables was correctly populated + Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue()) + + /* After this, we will mock changes for tables not present in the initial snapshot + * until that is changed in the mock server, we have to spoof the known tables + */ + + //add apid_cluster and data_scope since those would present if this were a real scenario + knownTables["kms.app_credential"] = true + knownTables["kms.app_credential_apiproduct_mapper"] = true + knownTables["kms.developer"] = true + knownTables["kms.company_developer"] = true + knownTables["kms.api_product"] = true + knownTables["kms.app"] = true + + lastSnapshot = s for _, t := range s.Tables {
diff --git a/apigee_sync.go b/apigee_sync.go index deead43..0df37d6 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -25,6 +25,7 @@ block string = "45" lastSequence string polling uint32 + knownTables = make(map[string]bool) ) /* @@ -44,7 +45,7 @@ err := pollChangeAgent() end := time.Now() if err != nil { - if _, ok := err.(apiError); ok { + if _, ok := err.(changeServerError); ok { downloadDataSnapshot() continue } @@ -130,7 +131,7 @@ continue case http.StatusBadRequest: - var apiErr apiError + var apiErr changeServerError var b []byte b, err = ioutil.ReadAll(r.Body) if err != nil { @@ -160,6 +161,13 @@ return err } + if changesRequireDDLSync(resp) { + log.Info("Detected DDL changes, going to fetch a new snapshot to sync...") + return changeServerError{ + Code: "DDL changes detected; must get new snapshot", + } + } + /* If valid data present, Emit to plugins */ if len(resp.Changes) > 0 { done := make(chan bool) @@ -186,6 +194,12 @@ } } + +func changesRequireDDLSync(changes common.ChangeList) bool { + + return !mapIsSubset(knownTables, extractTablesFromChangelist(changes)) +} + // simple doubling back-off func createBackOff(retryIn, maxBackOff time.Duration) func() { return func() { @@ -235,6 +249,14 @@ scopes = append(scopes, apidInfo.ClusterID) resp := downloadSnapshot(scopes) + knownTables = extractTablesFromSnapshot(resp) + + db, err := data.DBVersion(resp.SnapshotInfo) + if err != nil { + log.Panicf("Database inaccessible: %v", err) + } + persistKnownTablesToDB(knownTables, db) + done := make(chan bool) log.Info("Emitting Snapshot to plugins") events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) { @@ -248,20 +270,91 @@ } } +func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) { + + tables = make(map[string]bool) + + log.Debug("Extracting table names from snapshot") + if snapshot.Tables == nil { + //if this panic ever fires, it's a bug + log.Panicf("Attempt to extract known tables from snapshot without tables failed") + } + + for _, table := range snapshot.Tables { + tables[table.Name] = true + } + + return tables +} + +func extractTablesFromChangelist(changes common.ChangeList) (tables map[string] bool) { + + tables = make(map[string]bool) + + for _, change := range changes.Changes { + tables[change.Table] = true + } + + return tables +} + +func extractTablesFromDB(db apid.DB) (tables map[string]bool) { + + tables = make(map[string]bool) + + log.Debug("Extracting table names from existing DB") + rows, err := db.Query("SELECT name FROM _known_tables;") + + if err != nil { + log.Panicf("Error reading current set of tables: %v", err) + } + + for rows.Next() { + var table string + if err := rows.Scan(&table); err != nil { + log.Panicf("Error reading current set of tables: %v", err) + } + log.Debugf("Table %s found in existing db", table) + + tables[table] = true + } + return tables +} + +func persistKnownTablesToDB(tables map[string]bool, db apid.DB) { + log.Debugf("Inserting table names found in snapshot into db") + + _, err := db.Exec(`CREATE TABLE _known_tables (name text, PRIMARY KEY(name));`) + if err != nil { + log.Panicf("Could not create _known_tables table: %s", err) + } + + for name := range tables { + log.Debugf("Inserting %s into _known_tables", name) + _, err := db.Exec(`INSERT INTO _known_tables VALUES(?);`, name) + if err != nil { + log.Panicf("Error encountered inserting into known tables ", err) + } + + } +} + // Skip Downloading snapshot if there is already a snapshot available from previous run func startOnLocalSnapshot(snapshot string) { log.Infof("Starting on local snapshot: %s", snapshot) // ensure DB version will be accessible on behalf of dependant plugins - _, err := data.DBVersion(snapshot) + db, err := data.DBVersion(snapshot) if err != nil { log.Panicf("Database inaccessible: %v", err) } + knownTables = extractTablesFromDB(db) + // allow plugins (including this one) to start immediately on existing database // Note: this MUST have no tables as that is used as an indicator snap := &common.Snapshot{ - SnapshotInfo: apidInfo.LastSnapshot, + SnapshotInfo: snapshot, } events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) { go pollForChanges() @@ -356,10 +449,28 @@ req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) } -type apiError struct { +type changeServerError struct { Code string `json:"code"` } -func (a apiError) Error() string { +func (a changeServerError) Error() string { return a.Code } + +/* + * Determine is map b is a subset of map a + */ +func mapIsSubset(a map[string]bool, b map[string]bool) bool { + + if b == nil { + return true; + } + + for k := range b { + if !a[k] { + return false + } + } + + return true +} \ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 597536d..3f8373c 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -11,12 +11,22 @@ It("should bootstrap from local DB if present", func(done Done) { + expectedTables := make(map[string] bool) + expectedTables["kms.company"] = true + expectedTables["edgex.apid_cluster"] = true + expectedTables["edgex.data_scope"] = true + + Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { defer GinkgoRecover() if s, ok := event.(*common.Snapshot); ok { + + //verify that the knownTables array has been properly populated from existing DB + Expect(mapIsSubset(knownTables, expectedTables)).To(BeTrue()) + Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) Expect(s.Tables).To(BeNil())
diff --git a/mock_server.go b/mock_server.go index 71b6d97..6907755 100644 --- a/mock_server.go +++ b/mock_server.go
@@ -311,7 +311,7 @@ val := atomic.SwapInt32(m.newSnap, 0) if val > 0 { w.WriteHeader(http.StatusBadRequest) - apiErr := apiError{ + apiErr := changeServerError{ Code: "SNAPSHOT_TOO_OLD", } bytes, err := json.Marshal(apiErr)