Cleanup old bundles, set DeployStatus on bundle download fail, refactoring
diff --git a/api.go b/api.go
index 8a54a31..49bf679 100644
--- a/api.go
+++ b/api.go
@@ -87,9 +87,9 @@
case msg := <-deploymentsChanged:
// todo: add a debounce w/ timeout to avoid sending on every single deployment?
subs := subscribers
- incrementETag() // todo: do this elsewhere? check error?
- subscribers = make(map[chan string]struct{})
log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs))
+ subscribers = make(map[chan string]struct{})
+ incrementETag()
for subscriber := range subs {
select {
case subscriber <- msg:
diff --git a/api_test.go b/api_test.go
index 8ed074c..b371ea6 100644
--- a/api_test.go
+++ b/api_test.go
@@ -109,6 +109,7 @@
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
+ eTag := res.Header.Get("etag")
deploymentID = "api_get_current_blocking2"
go func() {
@@ -119,7 +120,7 @@
uri.RawQuery = query.Encode()
req, err := http.NewRequest("GET", uri.String(), nil)
req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", res.Header.Get("etag"))
+ req.Header.Add("If-None-Match", eTag)
res, err := http.DefaultClient.Do(req)
Expect(err).ShouldNot(HaveOccurred())
@@ -318,16 +319,18 @@
DataScopeID: deploymentID,
BundleConfigJSON: string(bundleJson),
ConfigJSON: string(bundleJson),
- Status: "",
Created: "",
CreatedBy: "",
Updated: "",
UpdatedBy: "",
BundleName: deploymentID,
- BundleURI: "",
- BundleChecksum: "",
- BundleChecksumType: "",
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
LocalBundleURI: "x",
+ DeployStatus: "",
+ DeployErrorCode: 0,
+ DeployErrorMessage: "",
}
err = InsertDeployment(tx, dep)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index a4ff0e2..a43fb41 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -43,14 +43,14 @@
router := apid.API().Router()
// fake an unreliable bundle repo
backOffMultiplier = 10 * time.Millisecond
- count := 0
+ count := 1
router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
count++
- if count % 2 == 0 {
+ vars := apid.API().Vars(req)
+ if count % 2 == 0 || vars["id"] == "alwaysfail" {
w.WriteHeader(500)
return
}
- vars := apid.API().Vars(req)
w.Write([]byte("/bundles/" + vars["id"]))
})
testServer = httptest.NewServer(router)
diff --git a/bundle.go b/bundle.go
index 4762165..3e018d0 100644
--- a/bundle.go
+++ b/bundle.go
@@ -25,26 +25,50 @@
backOffMultiplier = 10 * time.Second
)
-func downloadBundle(dep DataDeployment) error {
+func downloadBundle(dep DataDeployment) {
log.Debugf("starting bundle download process: %s", dep.BundleURI)
hashWriter, err := getHashWriter(dep.BundleChecksumType)
if err != nil {
- log.Errorf("invalid checksum type: %v", err)
- return err
+ msg := fmt.Sprintf("invalid bundle checksum type: %v", dep.BundleChecksumType)
+ log.Error(msg)
+ setDeploymentResults(apiDeploymentResults{
+ {
+ ID: dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: ERROR_CODE_TODO,
+ Message: msg,
+ },
+ })
+ return
}
- // retry
- var tempFile string
+ // todo: do forever with backoff - note: we'll want to abort if deployment is deleted, however
+ // todo: also, we'll still mark deployment result as "failed" - after some timeout
for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ {
+ var tempFile, bundleFile string
tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+
+ if err == nil {
+ bundleFile = getBundleFile(dep)
+ err = os.Rename(tempFile, bundleFile)
+ if err != nil {
+ log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
+ }
+ }
+
+ if tempFile != "" {
+ go safeDelete(tempFile)
+ }
+
+ if err == nil {
+ err = updateLocalBundleURI(dep.ID, bundleFile)
+ }
+
if err == nil {
break
}
- if tempFile != "" {
- os.Remove(tempFile)
- }
// simple back-off, we could potentially be more sophisticated
retryIn := time.Duration(i) * backOffMultiplier
@@ -52,29 +76,24 @@
time.Sleep(retryIn)
hashWriter.Reset()
}
-
if err != nil {
log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS)
- return err
}
- bundleFile := getBundleFile(dep)
- err = os.Rename(tempFile, bundleFile)
if err != nil {
- log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
- os.Remove(tempFile)
- return err
- }
-
- err = updateLocalURI(dep.ID, bundleFile)
- if err != nil {
- return err
+ setDeploymentResults(apiDeploymentResults{
+ {
+ ID: dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: ERROR_CODE_TODO,
+ Message: fmt.Sprintf("bundle download failed: %s", err),
+ },
+ })
+ return
}
// send deployments to client
deploymentsChanged<- dep.ID
-
- return nil
}
func getBundleFile(dep DataDeployment) string {
diff --git a/bundle_test.go b/bundle_test.go
new file mode 100644
index 0000000..2d1645c
--- /dev/null
+++ b/bundle_test.go
@@ -0,0 +1,81 @@
+package apiGatewayDeploy
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "net/url"
+ "encoding/json"
+)
+
+var _ = Describe("bundle", func() {
+
+ Context("download", func() {
+
+ It("should mark the status as failed if download fails", func() {
+
+ deploymentID := "bundle_download_fail"
+
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/alwaysfail"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc-32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundleJson, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ BundleConfigID: deploymentID,
+ ApidClusterID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleConfigJSON: string(bundleJson),
+ ConfigJSON: string(bundleJson),
+ Created: "",
+ CreatedBy: "",
+ Updated: "",
+ UpdatedBy: "",
+ BundleName: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
+ LocalBundleURI: "",
+ DeployStatus: "",
+ DeployErrorCode: 0,
+ DeployErrorMessage: "",
+ }
+
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ var listener = make(chan string)
+ addSubscriber <- listener
+
+ downloadBundle(dep)
+
+ // get deployment
+ deployments, err := getDeployments("WHERE id=$1", deploymentID)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d := deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.LocalBundleURI).To(BeEmpty())
+ Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
+ Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO))
+ Expect(d.DeployErrorMessage).ToNot(BeEmpty())
+ })
+ })
+})
diff --git a/data.go b/data.go
index 45ad641..42f6efd 100644
--- a/data.go
+++ b/data.go
@@ -19,16 +19,18 @@
DataScopeID string
BundleConfigJSON string
ConfigJSON string
- Status string
Created string
CreatedBy string
Updated string
UpdatedBy string
BundleName string
BundleURI string
+ LocalBundleURI string
BundleChecksum string
BundleChecksumType string
- LocalBundleURI string
+ DeployStatus string
+ DeployErrorCode int
+ DeployErrorMessage string
}
type SQLExec interface {
@@ -40,7 +42,7 @@
CREATE TABLE IF NOT EXISTS etag (
value integer
);
- INSERT INTO etag VALUES (1);
+ INSERT INTO etag (value) VALUES (1);
CREATE TABLE IF NOT EXISTS deployments (
id character varying(36) NOT NULL,
bundle_config_id varchar(36) NOT NULL,
@@ -48,7 +50,6 @@
data_scope_id varchar(36) NOT NULL,
bundle_config_json text NOT NULL,
config_json text NOT NULL,
- status text NOT NULL,
created timestamp without time zone,
created_by text,
updated timestamp without time zone,
@@ -56,12 +57,15 @@
bundle_name text,
bundle_uri text,
local_bundle_uri text,
+ bundle_checksum text,
+ bundle_checksum_type text,
deploy_status string,
deploy_error_code int,
deploy_error_message text,
PRIMARY KEY (id)
);
`)
+ // todo: is ID enough? must it be scoped by cluster id?
if err != nil {
return err
}
@@ -129,10 +133,11 @@
stmt, err := tx.Prepare(`
INSERT INTO deployments
(id, bundle_config_id, apid_cluster_id, data_scope_id,
- bundle_config_json, config_json, status, created,
- created_by, updated, updated_by, bundle_name,
- bundle_uri, local_bundle_uri)
- VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14);
+ bundle_config_json, config_json, created, created_by,
+ updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
+ bundle_checksum, bundle_checksum_type, deploy_status,
+ deploy_error_code, deploy_error_message)
+ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
`)
if err != nil {
log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
@@ -142,9 +147,10 @@
_, err = stmt.Exec(
dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
- dep.BundleConfigJSON, dep.ConfigJSON, dep.Status, dep.Created,
- dep.CreatedBy, dep.Updated, dep.UpdatedBy, dep.BundleName,
- dep.BundleURI, dep.LocalBundleURI)
+ dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+ dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
+ dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+ dep.DeployErrorCode, dep.DeployErrorMessage)
if err != nil {
log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
return err
@@ -179,35 +185,57 @@
// getReadyDeployments() returns array of deployments that are ready to deploy
func getReadyDeployments() (deployments []DataDeployment, err error) {
+ return getDeployments("WHERE local_bundle_uri != $1", "")
+}
+// getUnreadyDeployments() returns array of deployments that are not yet ready to deploy
+func getUnreadyDeployments() (deployments []DataDeployment, err error) {
+ return getDeployments("WHERE local_bundle_uri = $1 and deploy_status = $2", "", "")
+}
+
+// getDeployments() accepts a "WHERE ..." clause and optional parameters and returns the list of deployments
+func getDeployments(where string, a ...interface{}) (deployments []DataDeployment, err error) {
db := getDB()
- rows, err := db.Query(`
+
+ var stmt *sql.Stmt
+ stmt, err = db.Prepare(`
SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
- bundle_config_json, config_json, status, created,
- created_by, updated, updated_by, bundle_name,
- bundle_uri, local_bundle_uri
+ bundle_config_json, config_json, created, created_by,
+ updated, updated_by, bundle_name, bundle_uri,
+ local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
+ deploy_error_code, deploy_error_message
FROM deployments
- WHERE local_bundle_uri != ""
- `)
+ ` + where)
+ if err != nil {
+ return
+ }
+ var rows *sql.Rows
+ rows, err = stmt.Query(a...)
if err != nil {
if err == sql.ErrNoRows {
- return deployments, nil
+ return
}
log.Errorf("Error querying deployments: %v", err)
return
}
defer rows.Close()
+ deployments = dataDeploymentsFromRows(rows)
+
+ return
+}
+
+func dataDeploymentsFromRows(rows *sql.Rows) (deployments []DataDeployment) {
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID,
- &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created,
- &dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName,
- &dep.BundleURI, &dep.LocalBundleURI,
+ &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Created, &dep.CreatedBy,
+ &dep.Updated, &dep.UpdatedBy, &dep.BundleName, &dep.BundleURI,
+ &dep.LocalBundleURI, &dep.BundleChecksum, &dep.BundleChecksumType, &dep.DeployStatus,
+ &dep.DeployErrorCode, &dep.DeployErrorMessage,
)
deployments = append(deployments, dep)
}
-
return
}
@@ -252,18 +280,11 @@
return err
}
-func updateLocalURI(depID, localBundleUri string) error {
+func updateLocalBundleURI(depID, localBundleUri string) error {
- tx, err := getDB().Begin()
+ stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
if err != nil {
- log.Errorf("begin updateLocalURI failed: %v", err)
- return err
- }
- defer tx.Rollback()
-
- stmt, err := tx.Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
- if err != nil {
- log.Errorf("prepare updateLocalURI failed: %v", err)
+ log.Errorf("prepare updateLocalBundleURI failed: %v", err)
return err
}
defer stmt.Close()
@@ -274,13 +295,16 @@
return err
}
- err = tx.Commit()
- if err != nil {
- log.Errorf("commit updateLocalURI failed: %v", err)
- return err
- }
-
log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
return nil
}
+
+func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) {
+
+ err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
+ if err == sql.ErrNoRows {
+ err = nil
+ }
+ return
+}
diff --git a/listener.go b/listener.go
index 3b57542..91f0fe4 100644
--- a/listener.go
+++ b/listener.go
@@ -5,6 +5,7 @@
"encoding/json"
"github.com/30x/apid"
"github.com/apigee-labs/transicator/common"
+ "os"
)
const (
@@ -84,6 +85,18 @@
}
SetDB(db)
+
+ // if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish
+ if len(snapshot.Tables) == 0 {
+ deployments, err := getUnreadyDeployments()
+ if err != nil {
+ log.Panicf("unable to query database for unready deployments: %v", err)
+ }
+ for _, dep := range deployments {
+ go downloadBundle(dep)
+ }
+ }
+
log.Debug("Snapshot processed")
}
@@ -94,6 +107,7 @@
log.Panicf("Error processing ChangeList: %v", err)
}
defer tx.Rollback()
+ var bundlesToDelete []string
for _, change := range changes.Changes {
var err error
switch change.Table {
@@ -103,10 +117,11 @@
err = addDeployment(tx, change.NewRow)
case common.Delete:
var id string
- err = change.OldRow.Get("id", &id)
+ change.OldRow.Get("id", &id)
+ localBundleUri, err := getLocalBundleURI(tx, id)
if err == nil {
+ bundlesToDelete = append(bundlesToDelete, localBundleUri)
err = deleteDeployment(tx, id)
- // todo: delete downloaded bundle file
}
default:
log.Errorf("unexpected operation: %s", change.Operation)
@@ -120,58 +135,37 @@
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
+
+ // clean up old bundles
+ if len(bundlesToDelete) > 0 {
+ log.Debugf("will delete %d old bundles", len(bundlesToDelete))
+ go func() {
+ // give clients a minute to avoid conflicts
+ // todo: configurable time
+ //time.Sleep(1 * time.Minute)
+ for _, b := range bundlesToDelete {
+ log.Debugf("removing old bundle: %v", b)
+ safeDelete(b)
+ }
+ }()
+ }
}
-func addDeployment(tx *sql.Tx, row common.Row) (err error) {
+func dataDeploymentFromRow(row common.Row) (d DataDeployment, err error) {
- d := DataDeployment{}
- err = row.Get("id", &d.ID)
- if err != nil {
- return
- }
- err = row.Get("bundle_config_id", &d.BundleConfigID)
- if err != nil {
- return
- }
- err = row.Get("apid_cluster_id", &d.ApidClusterID)
- if err != nil {
- return
- }
- err = row.Get("data_scope_id", &d.DataScopeID)
- if err != nil {
- return
- }
- err = row.Get("bundle_config_json", &d.BundleConfigJSON)
- if err != nil {
- return
- }
- err = row.Get("config_json", &d.ConfigJSON)
- if err != nil {
- return
- }
- err = row.Get("status", &d.Status)
- if err != nil {
- return
- }
- err = row.Get("created", &d.Created)
- if err != nil {
- return
- }
- err = row.Get("created_by", &d.CreatedBy)
- if err != nil {
- return
- }
- err = row.Get("updated", &d.Updated)
- if err != nil {
- return
- }
- err = row.Get("updated_by", &d.UpdatedBy)
- if err != nil {
- return
- }
+ row.Get("id", &d.ID)
+ row.Get("bundle_config_id", &d.BundleConfigID)
+ row.Get("apid_cluster_id", &d.ApidClusterID)
+ row.Get("data_scope_id", &d.DataScopeID)
+ row.Get("bundle_config_json", &d.BundleConfigJSON)
+ row.Get("config_json", &d.ConfigJSON)
+ row.Get("created", &d.Created)
+ row.Get("created_by", &d.CreatedBy)
+ row.Get("updated", &d.Updated)
+ row.Get("updated_by", &d.UpdatedBy)
var bc bundleConfigJson
- err = json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
+ json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
if err != nil {
log.Errorf("JSON decoding Manifest failed: %v", err)
return
@@ -182,6 +176,17 @@
d.BundleChecksumType = bc.ChecksumType
d.BundleChecksum = bc.Checksum
+ return
+}
+
+func addDeployment(tx *sql.Tx, row common.Row) (err error) {
+
+ var d DataDeployment
+ d, err = dataDeploymentFromRow(row)
+ if err != nil {
+ return
+ }
+
err = InsertDeployment(tx, d)
if err != nil {
return
@@ -191,3 +196,9 @@
go downloadBundle(d)
return
}
+
+func safeDelete(file string) {
+ if e := os.Remove(file); e != nil && !os.IsNotExist(e) {
+ log.Warnf("unable to delete file %s: %v", file, e)
+ }
+}