| package apidApigeeSync | 
 |  | 
 | import ( | 
 | 	"github.com/30x/apid-core" | 
 | 	"github.com/apigee-labs/transicator/common" | 
 | 	. "github.com/onsi/ginkgo" | 
 | 	. "github.com/onsi/gomega" | 
 | 	"net/http" | 
 | 	"net/http/httptest" | 
 | ) | 
 |  | 
 | var _ = Describe("Sync", func() { | 
 |  | 
 | 	Context("Sync", func() { | 
 |  | 
 | 		var initializeContext = func() { | 
 | 			testRouter = apid.API().Router() | 
 | 			testServer = httptest.NewServer(testRouter) | 
 |  | 
 | 			// set up mock server | 
 | 			mockParms := MockParms{ | 
 | 				ReliableAPI:  false, | 
 | 				ClusterID:    config.GetString(configApidClusterId), | 
 | 				TokenKey:     config.GetString(configConsumerKey), | 
 | 				TokenSecret:  config.GetString(configConsumerSecret), | 
 | 				Scope:        "ert452", | 
 | 				Organization: "att", | 
 | 				Environment:  "prod", | 
 | 			} | 
 | 			testMock = Mock(mockParms, testRouter) | 
 |  | 
 | 			config.Set(configProxyServerBaseURI, testServer.URL) | 
 | 			config.Set(configSnapServerBaseURI, testServer.URL) | 
 | 			config.Set(configChangeServerBaseURI, testServer.URL) | 
 | 		} | 
 |  | 
 | 		var restoreContext = func() { | 
 |  | 
 | 			testServer.Close() | 
 |  | 
 | 			config.Set(configProxyServerBaseURI, dummyConfigValue) | 
 | 			config.Set(configSnapServerBaseURI, dummyConfigValue) | 
 | 			config.Set(configChangeServerBaseURI, dummyConfigValue) | 
 |  | 
 | 		} | 
 |  | 
 | 		It("should succesfully bootstrap from clean slate", func(done Done) { | 
 | 			log.Info("Starting sync tests...") | 
 | 			var closeDone <-chan bool | 
 | 			initializeContext() | 
 | 			// do not wipe DB after.  Lets use it | 
 | 			wipeDBAferTest = false | 
 | 			var lastSnapshot *common.Snapshot | 
 |  | 
 | 			expectedSnapshotTables := common.ChangeList{ | 
 | 				Changes: []common.Change{common.Change{Table: "kms.company"}, | 
 | 					common.Change{Table: "edgex.apid_cluster"}, | 
 | 					common.Change{Table: "edgex.data_scope"}}, | 
 | 			} | 
 |  | 
 | 			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { | 
 | 				if s, ok := event.(*common.Snapshot); ok { | 
 |  | 
 | 					Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) | 
 |  | 
 | 					//add apid_cluster and data_scope since those would present if this were a real scenario | 
 | 					knownTables["kms.app_credential"] = true | 
 | 					knownTables["kms.app_credential_apiproduct_mapper"] = true | 
 | 					knownTables["kms.developer"] = true | 
 | 					knownTables["kms.company_developer"] = true | 
 | 					knownTables["kms.api_product"] = true | 
 | 					knownTables["kms.app"] = true | 
 |  | 
 | 					lastSnapshot = s | 
 |  | 
 | 					for _, t := range s.Tables { | 
 | 						switch t.Name { | 
 |  | 
 | 						case "edgex.apid_cluster": | 
 | 							Expect(t.Rows).To(HaveLen(1)) | 
 | 							r := t.Rows[0] | 
 | 							var id string | 
 | 							r.Get("id", &id) | 
 | 							Expect(id).To(Equal("bootstrap")) | 
 |  | 
 | 						case "edgex.data_scope": | 
 | 							Expect(t.Rows).To(HaveLen(2)) | 
 | 							r := t.Rows[1] // get the non-cluster row | 
 |  | 
 | 							var id, clusterID, env, org, scope string | 
 | 							r.Get("id", &id) | 
 | 							r.Get("apid_cluster_id", &clusterID) | 
 | 							r.Get("env", &env) | 
 | 							r.Get("org", &org) | 
 | 							r.Get("scope", &scope) | 
 |  | 
 | 							Expect(id).To(Equal("ert452")) | 
 | 							Expect(scope).To(Equal("ert452")) | 
 | 							Expect(clusterID).To(Equal("bootstrap")) | 
 | 							Expect(env).To(Equal("prod")) | 
 | 							Expect(org).To(Equal("att")) | 
 | 						} | 
 | 					} | 
 |  | 
 | 				} else if cl, ok := event.(*common.ChangeList); ok { | 
 | 					closeDone = changeManager.close() | 
 | 					// ensure that snapshot switched DB versions | 
 | 					Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) | 
 | 					expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) | 
 | 					Expect(err).NotTo(HaveOccurred()) | 
 | 					Expect(getDB() == expectedDB).Should(BeTrue()) | 
 |  | 
 | 					Expect(cl.Changes).To(HaveLen(6)) | 
 |  | 
 | 					var tables []string | 
 | 					for _, c := range cl.Changes { | 
 | 						tables = append(tables, c.Table) | 
 | 						Expect(c.NewRow).ToNot(BeNil()) | 
 |  | 
 | 						var tenantID string | 
 | 						c.NewRow.Get("tenant_id", &tenantID) | 
 | 						Expect(tenantID).To(Equal("ert452")) | 
 | 					} | 
 |  | 
 | 					Expect(tables).To(ContainElement("kms.app_credential")) | 
 | 					Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper")) | 
 | 					Expect(tables).To(ContainElement("kms.developer")) | 
 | 					Expect(tables).To(ContainElement("kms.company_developer")) | 
 | 					Expect(tables).To(ContainElement("kms.api_product")) | 
 | 					Expect(tables).To(ContainElement("kms.app")) | 
 |  | 
 | 					go func() { | 
 | 						// when close done, all handlers for the first changeList have been executed | 
 | 						<-closeDone | 
 | 						defer GinkgoRecover() | 
 | 						// allow other handler to execute to insert last_sequence | 
 | 						var seq string | 
 | 						//for seq = ""; seq == ""; { | 
 | 						//	time.Sleep(50 * time.Millisecond) | 
 | 						err := getDB(). | 
 | 							QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). | 
 | 							Scan(&seq) | 
 | 						Expect(err).NotTo(HaveOccurred()) | 
 | 						//} | 
 | 						Expect(seq).To(Equal(cl.LastSequence)) | 
 |  | 
 | 						restoreContext() | 
 | 						close(done) | 
 | 					}() | 
 |  | 
 | 				} | 
 | 			}) | 
 | 			pie := apid.PluginsInitializedEvent{ | 
 | 				Description: "plugins initialized", | 
 | 			} | 
 | 			pie.Plugins = append(pie.Plugins, pluginData) | 
 | 			postInitPlugins(pie) | 
 | 		}, 5) | 
 |  | 
 | 		It("should bootstrap from local DB if present", func(done Done) { | 
 |  | 
 | 			var closeDone <-chan bool | 
 |  | 
 | 			initializeContext() | 
 | 			expectedTables := common.ChangeList{ | 
 | 				Changes: []common.Change{common.Change{Table: "kms.company"}, | 
 | 					common.Change{Table: "edgex.apid_cluster"}, | 
 | 					common.Change{Table: "edgex.data_scope"}}, | 
 | 			} | 
 | 			Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) | 
 |  | 
 | 			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { | 
 |  | 
 | 				if s, ok := event.(*common.Snapshot); ok { | 
 | 					// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed | 
 | 					// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler | 
 | 					closeDone = changeManager.close() | 
 | 					go func() { | 
 | 						// when close done, all handlers for the first snapshot have been executed | 
 | 						<-closeDone | 
 | 						//verify that the knownTables array has been properly populated from existing DB | 
 | 						Expect(changesRequireDDLSync(expectedTables)).To(BeFalse()) | 
 |  | 
 | 						Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) | 
 | 						Expect(s.Tables).To(BeNil()) | 
 |  | 
 | 						restoreContext() | 
 | 						close(done) | 
 | 					}() | 
 |  | 
 | 				} | 
 | 			}) | 
 | 			pie := apid.PluginsInitializedEvent{ | 
 | 				Description: "plugins initialized", | 
 | 			} | 
 | 			pie.Plugins = append(pie.Plugins, pluginData) | 
 | 			postInitPlugins(pie) | 
 |  | 
 | 		}, 3) | 
 |  | 
 | 		It("should detect apid_cluster_id change in config yaml", func() { | 
 | 			Expect(apidInfo).ToNot(BeNil()) | 
 | 			Expect(apidInfo.ClusterID).To(Equal("bootstrap")) | 
 | 			Expect(apidInfo.InstanceID).ToNot(BeEmpty()) | 
 | 			previousInstanceId := apidInfo.InstanceID | 
 |  | 
 | 			config.Set(configApidClusterId, "new value") | 
 | 			apidInfo, err := getApidInstanceInfo() | 
 | 			Expect(err).NotTo(HaveOccurred()) | 
 | 			Expect(apidInfo.LastSnapshot).To(BeEmpty()) | 
 | 			Expect(apidInfo.InstanceID).ToNot(BeEmpty()) | 
 | 			Expect(apidInfo.InstanceID).ToNot(Equal(previousInstanceId)) | 
 | 			Expect(apidInfo.ClusterID).To(Equal("new value")) | 
 | 		}) | 
 |  | 
 | 		It("should correctly identify non-proper subsets with respect to maps", func() { | 
 |  | 
 | 			//test b proper subset of a | 
 | 			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, | 
 | 				[]common.Change{common.Change{Table: "b"}}, | 
 | 			)).To(BeFalse()) | 
 |  | 
 | 			//test a == b | 
 | 			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, | 
 | 				[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}}, | 
 | 			)).To(BeFalse()) | 
 |  | 
 | 			//test b superset of a | 
 | 			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, | 
 | 				[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}}, | 
 | 			)).To(BeTrue()) | 
 |  | 
 | 			//test b not subset of a | 
 | 			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, | 
 | 				[]common.Change{common.Change{Table: "c"}}, | 
 | 			)).To(BeTrue()) | 
 |  | 
 | 			//test a empty | 
 | 			Expect(changesHaveNewTables(map[string]bool{}, | 
 | 				[]common.Change{common.Change{Table: "a"}}, | 
 | 			)).To(BeTrue()) | 
 |  | 
 | 			//test b empty | 
 | 			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, | 
 | 				[]common.Change{}, | 
 | 			)).To(BeFalse()) | 
 |  | 
 | 			//test b nil | 
 | 			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse()) | 
 |  | 
 | 			//test a nil | 
 | 			Expect(changesHaveNewTables(nil, | 
 | 				[]common.Change{common.Change{Table: "a"}}, | 
 | 			)).To(BeTrue()) | 
 | 		}, 3) | 
 |  | 
 | 		// todo: disabled for now - | 
 | 		// there is precondition I haven't been able to track down that breaks this test on occasion | 
 | 		XIt("should process a new snapshot when change server requires it", func(done Done) { | 
 | 			oldSnap := apidInfo.LastSnapshot | 
 | 			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { | 
 | 				defer GinkgoRecover() | 
 |  | 
 | 				if s, ok := event.(*common.Snapshot); ok { | 
 | 					Expect(s.SnapshotInfo).NotTo(Equal(oldSnap)) | 
 | 					close(done) | 
 | 				} | 
 | 			}) | 
 | 			testMock.forceNewSnapshot() | 
 | 		}) | 
 |  | 
 | 		It("Verify the Sequence Number Logic works as expected", func() { | 
 | 			Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1)) | 
 | 			Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1)) | 
 | 			Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0)) | 
 | 			Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1)) | 
 | 			Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1)) | 
 | 			Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1)) | 
 | 		}, 3) | 
 |  | 
 | 		/* | 
 | 		 * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap | 
 | 		 */ | 
 | 		It("Should be able to handle duplicate snapshot during bootstrap", func() { | 
 | 			initializeContext() | 
 | 			tokenManager = createTokenManager() | 
 | 			snapManager = createSnapShotManager() | 
 | 			events.Listen(ApigeeSyncEventSelector, &handler{}) | 
 |  | 
 | 			scopes := []string{apidInfo.ClusterID} | 
 | 			snapshot := &common.Snapshot{} | 
 | 			snapManager.downloadSnapshot(scopes, snapshot) | 
 | 			snapManager.storeBootSnapshot(snapshot) | 
 | 			snapManager.storeDataSnapshot(snapshot) | 
 | 			restoreContext() | 
 | 			<-snapManager.close() | 
 | 			tokenManager.close() | 
 | 		}, 3) | 
 |  | 
 | 		It("Reuse http.Client connection for multiple concurrent requests", func() { | 
 | 			server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | 
 | 			})) | 
 | 			tr := &http.Transport{ | 
 | 				MaxIdleConnsPerHost: maxIdleConnsPerHost, | 
 | 			} | 
 | 			var rspcnt int = 0 | 
 | 			ch := make(chan *http.Response) | 
 | 			client := &http.Client{Transport: tr} | 
 | 			for i := 0; i < 2*maxIdleConnsPerHost; i++ { | 
 | 				go func(client *http.Client) { | 
 | 					req, err := http.NewRequest("GET", server.URL, nil) | 
 | 					resp, err := client.Do(req) | 
 | 					if err != nil { | 
 | 						Fail("Unable to process Client request") | 
 | 					} | 
 | 					ch <- resp | 
 | 					resp.Body.Close() | 
 |  | 
 | 				}(client) | 
 | 			} | 
 | 			for { | 
 | 				select { | 
 | 				case resp := <-ch: | 
 | 					Expect(resp.StatusCode).To(Equal(http.StatusOK)) | 
 | 					if rspcnt >= 2*maxIdleConnsPerHost-1 { | 
 | 						return | 
 | 					} | 
 | 					rspcnt++ | 
 | 				default: | 
 | 				} | 
 | 			} | 
 |  | 
 | 		}, 3) | 
 |  | 
 | 	}) | 
 | }) |