Merge pull request #40 from 30x/XAPID886
Xapid886
diff --git a/changes.go b/changes.go
index 3a480e4..11fdc3a 100644
--- a/changes.go
+++ b/changes.go
@@ -2,14 +2,14 @@
import (
"encoding/json"
+ "github.com/apigee-labs/transicator/common"
"io/ioutil"
"net/http"
"net/url"
"path"
- "time"
-
- "github.com/apigee-labs/transicator/common"
+ "sort"
"sync/atomic"
+ "time"
)
var lastSequence string
@@ -227,9 +227,7 @@
*/
if lastSequence != "" &&
getChangeStatus(lastSequence, resp.LastSequence) != 1 {
- return changeServerError{
- Code: "Ignore change, already have newer changes",
- }
+ return nil
}
if changesRequireDDLSync(resp) {
@@ -251,6 +249,16 @@
updateSequence(resp.LastSequence)
+ /*
+ * Check to see if there was any change in scope. If found, handle it
+ * by getting a new snapshot
+ */
+ newScopes := findScopesForId(apidInfo.ClusterID)
+ cs := scopeChanged(newScopes, scopes)
+ if cs != nil {
+ return cs
+ }
+
return nil
}
@@ -264,8 +272,8 @@
log.Debugf("handleChangeServerError: changeManager has been closed")
return
}
- if _, ok := err.(changeServerError); ok {
- log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
+ if c, ok := err.(changeServerError); ok {
+ log.Debugf("%s. Fetch a new snapshot to sync...", c.Code)
snapManager.downloadDataSnapshot()
} else {
log.Debugf("Error connecting to changeserver: %v", err)
@@ -317,3 +325,25 @@
}
}
+
+/*
+ * Returns nil if the two arrays have matching contents
+ */
+func scopeChanged(a, b []string) error {
+
+ if len(a) != len(b) {
+ return changeServerError{
+ Code: "Scope changes detected; must get new snapshot",
+ }
+ }
+ sort.Strings(a)
+ sort.Strings(b)
+ for i, v := range a {
+ if v != b[i] {
+ return changeServerError{
+ Code: "Scope changes detected; must get new snapshot",
+ }
+ }
+ }
+ return nil
+}
diff --git a/listener_test.go b/listener_test.go
index ee0b4cc..b5fea9b 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -50,6 +50,22 @@
Expect(func() { handler.Handle(&event) }).To(Panic())
}, 3)
+ It("should fail if more than one apid_cluster rows", func() {
+ newScopes := []string{"foo"}
+ scopes := []string{"bar"}
+ Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+ newScopes = []string{"foo", "bar"}
+ scopes = []string{"bar"}
+ Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+ newScopes = []string{"foo"}
+ scopes = []string{"bar", "foo"}
+ Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+ newScopes = []string{"foo", "bar"}
+ scopes = []string{"bar", "foo"}
+ Expect(scopeChanged(newScopes, scopes)).To(BeNil())
+
+ }, 3)
+
It("should process a valid Snapshot", func() {
event := common.Snapshot{
@@ -329,6 +345,7 @@
Expect(len(scopes)).To(Equal(2))
Expect(scopes[0]).To(Equal("s1"))
Expect(scopes[1]).To(Equal("s2"))
+
}, 3)
It("delete event should delete", func() {