Rewrite the /current API logic to fix errors and match the spec
diff --git a/api.go b/api.go
index 6eb7212..7f34e74 100644
--- a/api.go
+++ b/api.go
@@ -66,44 +66,91 @@
func handleCurrentDeployment(w http.ResponseWriter, r *http.Request) {
- // If block greater than zero AND if request ETag header not empty AND if there is no new bundle list
- // available, then block for up to the specified number of seconds until a new bundle list becomes
- // available. If no new bundle list becomes available, then return an empty array.
+ // If returning without a bundle (immediately or after timeout), status = 404
+ // If returning If-None-Match value is equal to current deployment, status = 304
+ // If returning a new value, status = 200
+
+ // If timeout > 0 AND there is no deployment (or new deployment) available (per If-None-Match), then
+ // block for up to the specified number of seconds until a new deployment becomes available.
b := r.URL.Query().Get("block")
- var block int
+ var timeout int
if b != "" {
var err error
- block, err = strconv.Atoi(b)
+ timeout, err = strconv.Atoi(b)
if err != nil {
writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "bad block value, must be number of seconds")
return
}
}
- sent := sendDeployInfo(w, r, false)
+ // If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block'
+ // query param > 0, the server returns a 304 Not Modified response indicating that the client already has the
+ // most recent bundle list.
+ priorDepID := r.Header.Get("If-None-Match")
- // todo: can we kill the timer & channel if client connection is lost?
+ depID, err := getCurrentDeploymentID()
+ if err != nil && err != sql.ErrNoRows{
+ writeDatabaseError(w)
+ return
+ }
- // blocking request (Long Poll)
- if sent == false && block > 0 && r.Header.Get("etag") != "" {
- log.Debug("Blocking request... Waiting for new Deployments.")
- newReq := make(chan string)
+ // not found, no timeout, send immediately
+ if depID == "" && timeout == 0 {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
- // Update channel of a new request (subscriber)
- addSubscriber <- newReq
-
- // Block until timeout of new deployment
- select {
- case depID := <-newReq:
- // todo: depID could be used directly instead of getting it again in sendDeployInfo
- log.Debugf("DeploymentID = %s", depID)
- sendDeployInfo(w, r, false)
-
- case <-time.After(time.Duration(block) * time.Second):
- log.Debug("Blocking deployment request timed out.")
- sendDeployInfo(w, r, true)
+ // found, send immediately - if doesn't match prior ID
+ if depID != "" {
+ if depID == priorDepID {
+ if timeout == 0 {
+ w.WriteHeader(http.StatusNotModified)
+ return
+ } else {
+ // continue
+ }
+ } else {
+ sendDeployment(w, depID)
+ return
}
}
+
+ // can't send immediately, we need to block...
+ // todo: can we kill the timer & channel if client connection is lost?
+ // todo: resolve race condition - may miss a notification
+
+ log.Debug("Blocking request... Waiting for new Deployments.")
+ newReq := make(chan string)
+
+ // subscribe to new deployments
+ addSubscriber <- newReq
+
+ // block until new deployment or timeout
+ select {
+ case depID := <-newReq:
+ sendDeployment(w, depID)
+
+ case <-time.After(time.Duration(timeout) * time.Second):
+ log.Debug("Blocking deployment request timed out.")
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+}
+
+func sendDeployment(w http.ResponseWriter, depID string) {
+ deployment, err := getDeployment(depID)
+ if err != nil {
+ log.Errorf("unable to retrieve deployment [%s]: %s", depID, err)
+ w.WriteHeader(http.StatusInternalServerError)
+ }
+ b, err := json.Marshal(deployment)
+ if err != nil {
+ log.Errorf("unable to marshal deployment: %s", err)
+ w.WriteHeader(http.StatusInternalServerError)
+ } else {
+ w.Header().Set("ETag", depID)
+ w.Write(b)
+ }
}
func respHandler(w http.ResponseWriter, r *http.Request) {
@@ -146,7 +193,6 @@
updated = updateDeploymentFailure(depID, rsp.GWbunRsp, txn)
}
- log.Print("***** 1")
if !updated {
writeDatabaseError(w)
err = txn.Rollback()
@@ -156,17 +202,13 @@
return
}
- log.Print("***** 2")
err = txn.Commit()
if err != nil {
log.Errorf("Unable to commit transaction: %s", err)
writeDatabaseError(w)
}
- log.Print("***** 3")
-
return
-
}
func updateDeploymentSuccess(depID string, txn *sql.Tx) bool {
diff --git a/deployments.go b/deployments.go
index c746d04..daf104d 100644
--- a/deployments.go
+++ b/deployments.go
@@ -17,7 +17,6 @@
// todo: /current should return latest (regardless of status) if no ETag
-/* All Global Constants go here */
const DEPLOYMENT_STATE_UNUSED = 0
const DEPLOYMENT_STATE_INPROG = 1
const DEPLOYMENT_STATE_READY = 2
@@ -302,87 +301,48 @@
}
-func sendDeployInfo(w http.ResponseWriter, r *http.Request, sendEmpty bool) bool {
-
- // If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block'
- // query param > 0, the server returns a 304 Not Modified response indicating that the client already has the
- // most recent bundle list.
- ifNoneMatch := r.Header.Get("If-None-Match")
-
- // Pick the most recent deployment
+// getCurrentDeploymentID returns the ID of what should be the "current" deployment
+func getCurrentDeploymentID() (string, error) {
var depID string
- // todo: fix /current
- err := db.QueryRow("SELECT id FROM BUNDLE_DEPLOYMENT WHERE deploy_status = ? ORDER BY created_at ASC LIMIT 1;",
- DEPLOYMENT_STATE_READY).Scan(&depID)
- //err = db.QueryRow("SELECT id FROM BUNDLE_DEPLOYMENT ORDER BY created_at ASC LIMIT 1;").Scan(&depID)
+ err := db.QueryRow("SELECT id FROM BUNDLE_DEPLOYMENT ORDER BY created_at ASC LIMIT 1;").Scan(&depID)
+ return depID, err
+}
+
+
+// getDeployment returns a fully populated deploymentResponse
+func getDeployment(depID string) (*deploymentResponse, error) {
+
+ rows, err := db.Query("SELECT file_url, id, type FROM BUNDLE_INFO WHERE deployment_id = ?;", depID)
if err != nil {
- log.Errorf("Database error: %s", err)
- return false
+ log.Errorf("Unable to query BUNDLE_INFO. Err: %s", err)
+ return nil, err
}
- // todo: is depID appropriate for eTag?
- if depID == ifNoneMatch {
- w.WriteHeader(http.StatusNotModified)
- return true
+ depRes := deploymentResponse{
+ Bundles: []bundle{},
+ DeploymentId: depID,
}
- w.Header().Set("ETag", depID)
- var bundleID, fileUrl string
- // todo: fix /current
- err = db.QueryRow("SELECT file_url, id FROM BUNDLE_INFO WHERE deploy_status = ? AND deployment_id = ? AND "+
- "type = 'sys';", DEPLOYMENT_STATE_READY, depID).Scan(&fileUrl, &bundleID)
- //err = db.QueryRow("SELECT file_url, id FROM BUNDLE_INFO WHERE deployment_id = ? AND " +
- // "type = 'sys';", DEPLOYMENT_STATE_READY, depID).Scan(&fileUrl, &bundleID)
- if err != nil {
- if err == sql.ErrNoRows {
- log.Debugf("No System Deployment ready: %s", err)
- if !sendEmpty {
- return false
+ for rows.Next() {
+ var bundleID, fileUrl, bundleType string
+ err = rows.Scan(&fileUrl, &bundleID, &bundleType)
+ if err != nil {
+ log.Errorf("BUNDLE_INFO fetch failed. Err: %s", err)
+ return nil, err
+ }
+ if bundleType == "sys" {
+ depRes.System = bundle{
+ BundleId: bundleID,
+ URL: fileUrl,
}
} else {
- log.Errorf("Database error: %s", err)
- return false
+ bd := bundle{
+ AuthCode: bundleID, // todo: authCode?
+ BundleId: bundleID,
+ URL: fileUrl,
+ }
+ depRes.Bundles = append(depRes.Bundles, bd)
}
}
-
- chItems := []bundle{}
-
- sysInfo := bundle{
- BundleId: bundleID,
- URL: fileUrl,
- }
-
- depResp := deploymentResponse{
- Bundles: chItems,
- DeploymentId: depID,
- System: sysInfo,
- }
-
- // todo: fix /current
- rows, err := db.Query("SELECT file_url, id FROM BUNDLE_INFO WHERE deploy_status = ? AND deployment_id = ? "+
- "AND type = 'dep';", DEPLOYMENT_STATE_READY, depID)
- //rows, err := db.Query("SELECT file_url, id FROM BUNDLE_INFO WHERE deployment_id = ? " +
- // "AND type = 'dep';", depID)
- if err != nil {
- log.Debugf("No Deployments ready: %s", err)
- if !sendEmpty {
- return false
- }
- }
- for rows.Next() {
- err = rows.Scan(&fileUrl, &bundleID)
- if err != nil {
- log.Errorf("Deployments fetch failed. Err: %s", err)
- return false
- }
- bd := bundle{
- AuthCode: bundleID, // todo: authCode
- BundleId: bundleID,
- URL: fileUrl,
- }
- depResp.Bundles = append(depResp.Bundles, bd)
- }
- b, err := json.Marshal(depResp)
- w.Write(b)
- return true
+ return &depRes, nil
}
diff --git a/listener_test.go b/listener_test.go
index 33b3eec..8858d4b 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -14,6 +14,8 @@
It("should store data from ApigeeSync in the database", func(done Done) {
+ entityID := "listener_test_1"
+
uri, err := url.Parse(testServer.URL)
Expect(err).ShouldNot(HaveOccurred())
uri.Path = "/bundle"
@@ -40,7 +42,7 @@
Data: DataPayload{
EntityType: "deployment",
Operation: "create",
- EntityIdentifier: "entityID",
+ EntityIdentifier: entityID,
PldCont: Payload{
CreatedAt: now,
Manifest: manifest,
@@ -62,13 +64,17 @@
// todo: should do a lot more checking here... maybe call another api instead?
var selectedManifest string
var createdAt int64
- err = db.QueryRow("SELECT manifest, created_at from bundle_deployment where id = ?", "entityID").
+ err = db.QueryRow("SELECT manifest, created_at from bundle_deployment where id = ?", entityID).
Scan(&selectedManifest, &createdAt)
Expect(err).ShouldNot(HaveOccurred())
Expect(manifest).Should(Equal(selectedManifest))
Expect(createdAt).Should(Equal(now))
+ // clean up
+ _, err := db.Exec("DELETE from bundle_deployment where id = ?", entityID)
+ Expect(err).ShouldNot(HaveOccurred())
+
close(done)
},
}