Merge branch 'master' of github.com:30x/apidApigeeSync into xapid-846
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index 6181ff0..afca88c 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -64,11 +64,33 @@ // This is actually the first test :) // Tests that entire bootstrap and set of sync operations work var lastSnapshot *common.Snapshot + + expectedSnapshotTables := make(map[string] bool) + expectedSnapshotTables["kms.company"] = true + expectedSnapshotTables["edgex.apid_cluster"] = true + expectedSnapshotTables["edgex.data_scope"] = true + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { defer GinkgoRecover() if s, ok := event.(*common.Snapshot); ok { + //verify that during downloadDataSnapshot, knownTables was correctly populated + Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue()) + + /* After this, we will mock changes for tables not present in the initial snapshot + * until that is changed in the mock server, we have to spoof the known tables + */ + + //add apid_cluster and data_scope since those would present if this were a real scenario + knownTables["kms.app_credential"] = true + knownTables["kms.app_credential_apiproduct_mapper"] = true + knownTables["kms.developer"] = true + knownTables["kms.company_developer"] = true + knownTables["kms.api_product"] = true + knownTables["kms.app"] = true + + lastSnapshot = s for _, t := range s.Tables {
diff --git a/apigee_sync.go b/apigee_sync.go index deead43..a4b5a16 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -25,6 +25,7 @@ block string = "45" lastSequence string polling uint32 + knownTables = make(map[string]bool) ) /* @@ -44,7 +45,7 @@ err := pollChangeAgent() end := time.Now() if err != nil { - if _, ok := err.(apiError); ok { + if _, ok := err.(changeServerError); ok { downloadDataSnapshot() continue } @@ -130,7 +131,7 @@ continue case http.StatusBadRequest: - var apiErr apiError + var apiErr changeServerError var b []byte b, err = ioutil.ReadAll(r.Body) if err != nil { @@ -160,6 +161,13 @@ return err } + if changesRequireDDLSync(resp) { + log.Info("Detected DDL changes, going to fetch a new snapshot to sync...") + return changeServerError{ + Code: "DDL changes detected; must get new snapshot", + } + } + /* If valid data present, Emit to plugins */ if len(resp.Changes) > 0 { done := make(chan bool) @@ -186,6 +194,12 @@ } } + +func changesRequireDDLSync(changes common.ChangeList) bool { + + return !mapIsSubset(knownTables, extractTablesFromChangelist(changes)) +} + // simple doubling back-off func createBackOff(retryIn, maxBackOff time.Duration) func() { return func() { @@ -235,6 +249,14 @@ scopes = append(scopes, apidInfo.ClusterID) resp := downloadSnapshot(scopes) + knownTables = extractTablesFromSnapshot(resp) + + db, err := data.DBVersion(resp.SnapshotInfo) + if err != nil { + log.Panicf("Database inaccessible: %v", err) + } + persistKnownTablesToDB(knownTables, db) + done := make(chan bool) log.Info("Emitting Snapshot to plugins") events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) { @@ -248,20 +270,104 @@ } } +func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) { + + tables = make(map[string]bool) + + log.Debug("Extracting table names from snapshot") + if snapshot.Tables == nil { + //if this panic ever fires, it's a bug + log.Panicf("Attempt to extract known tables from snapshot without tables failed") + } + + for _, table := range snapshot.Tables { + tables[table.Name] = true + } + + return tables +} + +func extractTablesFromChangelist(changes common.ChangeList) (tables map[string] bool) { + + tables = make(map[string]bool) + + for _, change := range changes.Changes { + tables[change.Table] = true + } + + return tables +} + +func extractTablesFromDB(db apid.DB) (tables map[string]bool) { + + tables = make(map[string]bool) + + log.Debug("Extracting table names from existing DB") + rows, err := db.Query("SELECT name FROM _known_tables;") + defer rows.Close() + + if err != nil { + log.Panicf("Error reading current set of tables: %v", err) + } + + for rows.Next() { + var table string + if err := rows.Scan(&table); err != nil { + log.Panicf("Error reading current set of tables: %v", err) + } + log.Debugf("Table %s found in existing db", table) + + tables[table] = true + } + return tables +} + +func persistKnownTablesToDB(tables map[string]bool, db apid.DB) { + log.Debugf("Inserting table names found in snapshot into db") + + tx, err := db.Begin() + if err != nil { + log.Panicf("Error starting transaction: %v", err) + } + defer tx.Rollback() + + _, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));") + if err != nil { + log.Panicf("Could not create _known_tables table: %s", err) + } + + for name := range tables { + log.Debugf("Inserting %s into _known_tables", name) + _, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name) + if err != nil { + log.Panicf("Error encountered inserting into known tables ", err) + } + + } + + err = tx.Commit() + if err != nil { + log.Panicf("Error committing transaction: %v", err) + + } +} + // Skip Downloading snapshot if there is already a snapshot available from previous run func startOnLocalSnapshot(snapshot string) { log.Infof("Starting on local snapshot: %s", snapshot) // ensure DB version will be accessible on behalf of dependant plugins - _, err := data.DBVersion(snapshot) + db, err := data.DBVersion(snapshot) if err != nil { log.Panicf("Database inaccessible: %v", err) } + knownTables = extractTablesFromDB(db) + // allow plugins (including this one) to start immediately on existing database // Note: this MUST have no tables as that is used as an indicator snap := &common.Snapshot{ - SnapshotInfo: apidInfo.LastSnapshot, + SnapshotInfo: snapshot, } events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) { go pollForChanges() @@ -356,10 +462,29 @@ req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) } -type apiError struct { +type changeServerError struct { Code string `json:"code"` } -func (a apiError) Error() string { +func (a changeServerError) Error() string { return a.Code } + +/* + * Determine is map b is a subset of map a + */ +func mapIsSubset(a map[string]bool, b map[string]bool) bool { + + //nil maps should not be passed in. Making the distinction between nil map and empty map + if a == nil || b == nil { + return false; + } + + for k := range b { + if !a[k] { + return false + } + } + + return true +} \ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 597536d..12198e9 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -11,12 +11,21 @@ It("should bootstrap from local DB if present", func(done Done) { + expectedTables := make(map[string] bool) + expectedTables["kms.company"] = true + expectedTables["edgex.apid_cluster"] = true + expectedTables["edgex.data_scope"] = true + Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { defer GinkgoRecover() if s, ok := event.(*common.Snapshot); ok { + + //verify that the knownTables array has been properly populated from existing DB + Expect(mapIsSubset(knownTables, expectedTables)).To(BeTrue()) + Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) Expect(s.Tables).To(BeNil()) @@ -27,6 +36,27 @@ bootstrap() }) + It("should correctly identify non-proper subsets with respect to maps", func() { + + //test b proper subset of a + Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"b": true})).To(BeTrue()) + + //test a == b + Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"a": true, "b": true})).To(BeTrue()) + + //test b superset of a + Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"a": true, "b": true, "c": true})).To(BeFalse()) + + //test b not subset of a + Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"c": true})).To(BeFalse()) + + //test b empty + Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{})).To(BeTrue()) + + //test a empty + Expect(mapIsSubset(map[string]bool{}, map[string]bool{"b": true})).To(BeFalse()) + }) + // todo: disabled for now - // there is precondition I haven't been able to track down that breaks this test on occasion XIt("should process a new snapshot when change server requires it", func(done Done) {
diff --git a/listener_test.go b/listener_test.go index 71adfc3..2b060de 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -10,11 +10,15 @@ var _ = Describe("listener", func() { handler := handler{} + var saveLastSnapshot string Context("ApigeeSync snapshot event", func() { It("should set DB to appropriate version", func() { + //save the last snapshot, so we can restore it at the end of this context + saveLastSnapshot = apidInfo.LastSnapshot + event := common.Snapshot{ SnapshotInfo: "test_snapshot", Tables: []common.Table{}, @@ -196,6 +200,9 @@ Expect(len(scopes)).To(Equal(2)) Expect(scopes[0]).To(Equal("s1")) Expect(scopes[1]).To(Equal("s2")) + + //restore the last snapshot + apidInfo.LastSnapshot = saveLastSnapshot }) }) @@ -204,6 +211,8 @@ Context(LISTENER_TABLE_APID_CLUSTER, func() { It("insert event should panic", func() { + //save the last snapshot, so we can restore it at the end of this context + saveLastSnapshot = apidInfo.LastSnapshot event := common.ChangeList{ LastSequence: "test", @@ -231,6 +240,8 @@ } Expect(func() { handler.Handle(&event) }).To(Panic()) + //restore the last snapshot + apidInfo.LastSnapshot = saveLastSnapshot }) PIt("delete event should kill all the things!") @@ -239,6 +250,9 @@ Context(LISTENER_TABLE_DATA_SCOPE, func() { It("insert event should add", func() { + //save the last snapshot, so we can restore it at the end of this context + saveLastSnapshot = apidInfo.LastSnapshot + event := common.ChangeList{ LastSequence: "test", Changes: []common.Change{ @@ -373,6 +387,8 @@ } Expect(func() { handler.Handle(&event) }).To(Panic()) + //restore the last snapshot + apidInfo.LastSnapshot = saveLastSnapshot }) })
diff --git a/mock_server.go b/mock_server.go index 71b6d97..6907755 100644 --- a/mock_server.go +++ b/mock_server.go
@@ -311,7 +311,7 @@ val := atomic.SwapInt32(m.newSnap, 0) if val > 0 { w.WriteHeader(http.StatusBadRequest) - apiErr := apiError{ + apiErr := changeServerError{ Code: "SNAPSHOT_TOO_OLD", } bytes, err := json.Marshal(apiErr)