report all known deployment statuses on start
diff --git a/api.go b/api.go
index a0e34cd..0366f7f 100644
--- a/api.go
+++ b/api.go
@@ -322,7 +322,6 @@
addHeaders(req)
resp, err := http.DefaultClient.Do(req)
- defer resp.Body.Close()
if err != nil || resp.StatusCode != http.StatusOK {
if err != nil {
log.Errorf("failed to communicate with tracking service: %v", err)
@@ -330,9 +329,11 @@
b, _ := ioutil.ReadAll(resp.Body)
log.Errorf("tracking service call failed to %s, code: %d, body: %s", apiPath, resp.StatusCode, string(b))
}
+ resp.Body.Close()
backOffFunc()
continue
}
+ resp.Body.Close()
return nil
}
}
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index ccb41b2..74c8c62 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -9,7 +9,7 @@
"github.com/30x/apid-core"
"github.com/30x/apid-core/factory"
- "io/ioutil"
+ "io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
@@ -99,9 +99,6 @@
}).Methods("PUT")
testServer = httptest.NewServer(router)
-
- apiServerBaseURI, err = url.Parse(testServer.URL)
- Expect(err).NotTo(HaveOccurred())
})
var _ = AfterSuite(func() {
@@ -113,7 +110,11 @@
})
var _ = BeforeEach(func() {
- _, err := getDB().Exec("DELETE FROM deployments")
+ var err error
+ apiServerBaseURI, err = url.Parse(testServer.URL)
+ Expect(err).NotTo(HaveOccurred())
+
+ _, err = getDB().Exec("DELETE FROM deployments")
Expect(err).ShouldNot(HaveOccurred())
_, err = getDB().Exec("UPDATE etag SET value=1")
})
diff --git a/listener.go b/listener.go
index 938b9d2..e4290d5 100644
--- a/listener.go
+++ b/listener.go
@@ -112,23 +112,51 @@
go transmitDeploymentResultsToServer(errResults)
}
- // if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish
+ // if no tables, this a startup event for an existing DB
if len(snapshot.Tables) == 0 {
- go func() {
- deployments, err := getUnreadyDeployments()
- if err != nil {
- log.Panicf("unable to query database for unready deployments: %v", err)
- }
- log.Debugf("Queuing %d deployments for bundle download", len(deployments))
- for _, dep := range deployments {
- queueDownloadRequest(dep)
- }
- }()
+ startupOnExistingDatabase()
}
log.Debug("Snapshot processed")
}
+func startupOnExistingDatabase() {
+ // ensure all deployment statuses have been sent to tracker
+ go func() {
+ deployments, err := getDeployments("WHERE deploy_status != $1", "")
+ if err != nil {
+ log.Panicf("unable to query database for ready deployments: %v", err)
+ }
+ log.Debugf("Queuing %d deployments for bundle download", len(deployments))
+
+ var results apiDeploymentResults
+ for _, dep := range deployments {
+ result := apiDeploymentResult{
+ ID: dep.ID,
+ Status: dep.DeployStatus,
+ ErrorCode: dep.DeployErrorCode,
+ Message: dep.DeployErrorMessage,
+ }
+ results = append(results, result)
+ }
+ if len(results) > 0 {
+ transmitDeploymentResultsToServer(results)
+ }
+ }()
+
+ // start bundle downloads that didn't finish
+ go func() {
+ deployments, err := getUnreadyDeployments()
+ if err != nil {
+ log.Panicf("unable to query database for unready deployments: %v", err)
+ }
+ log.Debugf("Queuing %d deployments for bundle download", len(deployments))
+ for _, dep := range deployments {
+ queueDownloadRequest(dep)
+ }
+ }()
+}
+
func processChangeList(changes *common.ChangeList) {
// gather deleted bundle info
diff --git a/listener_test.go b/listener_test.go
index 40c9ce4..397c8cc 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -4,6 +4,10 @@
"encoding/json"
"net/url"
+ "net/http/httptest"
+
+ "net/http"
+
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
@@ -67,7 +71,7 @@
close(done)
})
- It("should set DB and process unready on startup event", func(done Done) {
+ It("should process unready on existing db startup event", func(done Done) {
deploymentID := "startup_test"
@@ -133,6 +137,90 @@
Expect(d.ID).To(Equal(deploymentID))
close(done)
})
+
+ It("should send deployment statuses on existing db startup event", func(done Done) {
+
+ successDep := DataDeployment{
+ ID: "success",
+ LocalBundleURI: "x",
+ DeployStatus: RESPONSE_STATUS_SUCCESS,
+ DeployErrorCode: 1,
+ DeployErrorMessage: "message",
+ }
+
+ failDep := DataDeployment{
+ ID: "fail",
+ LocalBundleURI: "x",
+ DeployStatus: RESPONSE_STATUS_FAIL,
+ DeployErrorCode: 1,
+ DeployErrorMessage: "message",
+ }
+
+ blankDep := DataDeployment{
+ ID: "blank",
+ LocalBundleURI: "x",
+ }
+
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ defer GinkgoRecover()
+
+ var results apiDeploymentResults
+ err := json.NewDecoder(r.Body).Decode(&results)
+ Expect(err).ToNot(HaveOccurred())
+
+ Expect(results).To(HaveLen(2))
+
+ Expect(results).To(ContainElement(apiDeploymentResult{
+ ID: successDep.ID,
+ Status: successDep.DeployStatus,
+ ErrorCode: successDep.DeployErrorCode,
+ Message: successDep.DeployErrorMessage,
+ }))
+ Expect(results).To(ContainElement(apiDeploymentResult{
+ ID: failDep.ID,
+ Status: failDep.DeployStatus,
+ ErrorCode: failDep.DeployErrorCode,
+ Message: failDep.DeployErrorMessage,
+ }))
+
+ close(done)
+ }))
+
+ var err error
+ apiServerBaseURI, err = url.Parse(ts.URL)
+ Expect(err).NotTo(HaveOccurred())
+
+ // init without info == startup on existing DB
+ var snapshot = common.Snapshot{
+ SnapshotInfo: "test",
+ Tables: []common.Table{},
+ }
+
+ db, err := data.DBVersion(snapshot.SnapshotInfo)
+ if err != nil {
+ log.Panicf("Unable to access database: %v", err)
+ }
+
+ err = InitDB(db)
+ if err != nil {
+ log.Panicf("Unable to initialize database: %v", err)
+ }
+
+ tx, err := db.Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = InsertDeployment(tx, successDep)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = InsertDeployment(tx, failDep)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = InsertDeployment(tx, blankDep)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
+ })
})
Context("ApigeeSync change event", func() {