report all known deployment statuses on start
diff --git a/api.go b/api.go index a0e34cd..0366f7f 100644 --- a/api.go +++ b/api.go
@@ -322,7 +322,6 @@ addHeaders(req) resp, err := http.DefaultClient.Do(req) - defer resp.Body.Close() if err != nil || resp.StatusCode != http.StatusOK { if err != nil { log.Errorf("failed to communicate with tracking service: %v", err) @@ -330,9 +329,11 @@ b, _ := ioutil.ReadAll(resp.Body) log.Errorf("tracking service call failed to %s, code: %d, body: %s", apiPath, resp.StatusCode, string(b)) } + resp.Body.Close() backOffFunc() continue } + resp.Body.Close() return nil } }
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index ccb41b2..74c8c62 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -9,7 +9,7 @@ "github.com/30x/apid-core" "github.com/30x/apid-core/factory" - "io/ioutil" + "io/ioutil" "net/http" "net/http/httptest" "net/url" @@ -99,9 +99,6 @@ }).Methods("PUT") testServer = httptest.NewServer(router) - - apiServerBaseURI, err = url.Parse(testServer.URL) - Expect(err).NotTo(HaveOccurred()) }) var _ = AfterSuite(func() { @@ -113,7 +110,11 @@ }) var _ = BeforeEach(func() { - _, err := getDB().Exec("DELETE FROM deployments") + var err error + apiServerBaseURI, err = url.Parse(testServer.URL) + Expect(err).NotTo(HaveOccurred()) + + _, err = getDB().Exec("DELETE FROM deployments") Expect(err).ShouldNot(HaveOccurred()) _, err = getDB().Exec("UPDATE etag SET value=1") })
diff --git a/listener.go b/listener.go index 938b9d2..e4290d5 100644 --- a/listener.go +++ b/listener.go
@@ -112,23 +112,51 @@ go transmitDeploymentResultsToServer(errResults) } - // if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish + // if no tables, this a startup event for an existing DB if len(snapshot.Tables) == 0 { - go func() { - deployments, err := getUnreadyDeployments() - if err != nil { - log.Panicf("unable to query database for unready deployments: %v", err) - } - log.Debugf("Queuing %d deployments for bundle download", len(deployments)) - for _, dep := range deployments { - queueDownloadRequest(dep) - } - }() + startupOnExistingDatabase() } log.Debug("Snapshot processed") } +func startupOnExistingDatabase() { + // ensure all deployment statuses have been sent to tracker + go func() { + deployments, err := getDeployments("WHERE deploy_status != $1", "") + if err != nil { + log.Panicf("unable to query database for ready deployments: %v", err) + } + log.Debugf("Queuing %d deployments for bundle download", len(deployments)) + + var results apiDeploymentResults + for _, dep := range deployments { + result := apiDeploymentResult{ + ID: dep.ID, + Status: dep.DeployStatus, + ErrorCode: dep.DeployErrorCode, + Message: dep.DeployErrorMessage, + } + results = append(results, result) + } + if len(results) > 0 { + transmitDeploymentResultsToServer(results) + } + }() + + // start bundle downloads that didn't finish + go func() { + deployments, err := getUnreadyDeployments() + if err != nil { + log.Panicf("unable to query database for unready deployments: %v", err) + } + log.Debugf("Queuing %d deployments for bundle download", len(deployments)) + for _, dep := range deployments { + queueDownloadRequest(dep) + } + }() +} + func processChangeList(changes *common.ChangeList) { // gather deleted bundle info
diff --git a/listener_test.go b/listener_test.go index 40c9ce4..397c8cc 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -4,6 +4,10 @@ "encoding/json" "net/url" + "net/http/httptest" + + "net/http" + "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" @@ -67,7 +71,7 @@ close(done) }) - It("should set DB and process unready on startup event", func(done Done) { + It("should process unready on existing db startup event", func(done Done) { deploymentID := "startup_test" @@ -133,6 +137,90 @@ Expect(d.ID).To(Equal(deploymentID)) close(done) }) + + It("should send deployment statuses on existing db startup event", func(done Done) { + + successDep := DataDeployment{ + ID: "success", + LocalBundleURI: "x", + DeployStatus: RESPONSE_STATUS_SUCCESS, + DeployErrorCode: 1, + DeployErrorMessage: "message", + } + + failDep := DataDeployment{ + ID: "fail", + LocalBundleURI: "x", + DeployStatus: RESPONSE_STATUS_FAIL, + DeployErrorCode: 1, + DeployErrorMessage: "message", + } + + blankDep := DataDeployment{ + ID: "blank", + LocalBundleURI: "x", + } + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + + var results apiDeploymentResults + err := json.NewDecoder(r.Body).Decode(&results) + Expect(err).ToNot(HaveOccurred()) + + Expect(results).To(HaveLen(2)) + + Expect(results).To(ContainElement(apiDeploymentResult{ + ID: successDep.ID, + Status: successDep.DeployStatus, + ErrorCode: successDep.DeployErrorCode, + Message: successDep.DeployErrorMessage, + })) + Expect(results).To(ContainElement(apiDeploymentResult{ + ID: failDep.ID, + Status: failDep.DeployStatus, + ErrorCode: failDep.DeployErrorCode, + Message: failDep.DeployErrorMessage, + })) + + close(done) + })) + + var err error + apiServerBaseURI, err = url.Parse(ts.URL) + Expect(err).NotTo(HaveOccurred()) + + // init without info == startup on existing DB + var snapshot = common.Snapshot{ + SnapshotInfo: "test", + Tables: []common.Table{}, + } + + db, err := data.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Unable to access database: %v", err) + } + + err = InitDB(db) + if err != nil { + log.Panicf("Unable to initialize database: %v", err) + } + + tx, err := db.Begin() + Expect(err).ShouldNot(HaveOccurred()) + + err = InsertDeployment(tx, successDep) + Expect(err).ShouldNot(HaveOccurred()) + err = InsertDeployment(tx, failDep) + Expect(err).ShouldNot(HaveOccurred()) + err = InsertDeployment(tx, blankDep) + Expect(err).ShouldNot(HaveOccurred()) + + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) + + apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) + }) }) Context("ApigeeSync change event", func() {