Cleanup old bundles, set DeployStatus on bundle download fail, refactoring
diff --git a/api.go b/api.go index 8a54a31..49bf679 100644 --- a/api.go +++ b/api.go
@@ -87,9 +87,9 @@ case msg := <-deploymentsChanged: // todo: add a debounce w/ timeout to avoid sending on every single deployment? subs := subscribers - incrementETag() // todo: do this elsewhere? check error? - subscribers = make(map[chan string]struct{}) log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs)) + subscribers = make(map[chan string]struct{}) + incrementETag() for subscriber := range subs { select { case subscriber <- msg:
diff --git a/api_test.go b/api_test.go index 8ed074c..b371ea6 100644 --- a/api_test.go +++ b/api_test.go
@@ -109,6 +109,7 @@ res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() + eTag := res.Header.Get("etag") deploymentID = "api_get_current_blocking2" go func() { @@ -119,7 +120,7 @@ uri.RawQuery = query.Encode() req, err := http.NewRequest("GET", uri.String(), nil) req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", res.Header.Get("etag")) + req.Header.Add("If-None-Match", eTag) res, err := http.DefaultClient.Do(req) Expect(err).ShouldNot(HaveOccurred()) @@ -318,16 +319,18 @@ DataScopeID: deploymentID, BundleConfigJSON: string(bundleJson), ConfigJSON: string(bundleJson), - Status: "", Created: "", CreatedBy: "", Updated: "", UpdatedBy: "", BundleName: deploymentID, - BundleURI: "", - BundleChecksum: "", - BundleChecksumType: "", + BundleURI: bundle.URI, + BundleChecksum: bundle.Checksum, + BundleChecksumType: bundle.ChecksumType, LocalBundleURI: "x", + DeployStatus: "", + DeployErrorCode: 0, + DeployErrorMessage: "", } err = InsertDeployment(tx, dep)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index a4ff0e2..a43fb41 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -43,14 +43,14 @@ router := apid.API().Router() // fake an unreliable bundle repo backOffMultiplier = 10 * time.Millisecond - count := 0 + count := 1 router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) { count++ - if count % 2 == 0 { + vars := apid.API().Vars(req) + if count % 2 == 0 || vars["id"] == "alwaysfail" { w.WriteHeader(500) return } - vars := apid.API().Vars(req) w.Write([]byte("/bundles/" + vars["id"])) }) testServer = httptest.NewServer(router)
diff --git a/bundle.go b/bundle.go index 4762165..3e018d0 100644 --- a/bundle.go +++ b/bundle.go
@@ -25,26 +25,50 @@ backOffMultiplier = 10 * time.Second ) -func downloadBundle(dep DataDeployment) error { +func downloadBundle(dep DataDeployment) { log.Debugf("starting bundle download process: %s", dep.BundleURI) hashWriter, err := getHashWriter(dep.BundleChecksumType) if err != nil { - log.Errorf("invalid checksum type: %v", err) - return err + msg := fmt.Sprintf("invalid bundle checksum type: %v", dep.BundleChecksumType) + log.Error(msg) + setDeploymentResults(apiDeploymentResults{ + { + ID: dep.ID, + Status: RESPONSE_STATUS_FAIL, + ErrorCode: ERROR_CODE_TODO, + Message: msg, + }, + }) + return } - // retry - var tempFile string + // todo: do forever with backoff - note: we'll want to abort if deployment is deleted, however + // todo: also, we'll still mark deployment result as "failed" - after some timeout for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ { + var tempFile, bundleFile string tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum) + + if err == nil { + bundleFile = getBundleFile(dep) + err = os.Rename(tempFile, bundleFile) + if err != nil { + log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err) + } + } + + if tempFile != "" { + go safeDelete(tempFile) + } + + if err == nil { + err = updateLocalBundleURI(dep.ID, bundleFile) + } + if err == nil { break } - if tempFile != "" { - os.Remove(tempFile) - } // simple back-off, we could potentially be more sophisticated retryIn := time.Duration(i) * backOffMultiplier @@ -52,29 +76,24 @@ time.Sleep(retryIn) hashWriter.Reset() } - if err != nil { log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS) - return err } - bundleFile := getBundleFile(dep) - err = os.Rename(tempFile, bundleFile) if err != nil { - log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err) - os.Remove(tempFile) - return err - } - - err = updateLocalURI(dep.ID, bundleFile) - if err != nil { - return err + setDeploymentResults(apiDeploymentResults{ + { + ID: dep.ID, + Status: RESPONSE_STATUS_FAIL, + ErrorCode: ERROR_CODE_TODO, + Message: fmt.Sprintf("bundle download failed: %s", err), + }, + }) + return } // send deployments to client deploymentsChanged<- dep.ID - - return nil } func getBundleFile(dep DataDeployment) string {
diff --git a/bundle_test.go b/bundle_test.go new file mode 100644 index 0000000..2d1645c --- /dev/null +++ b/bundle_test.go
@@ -0,0 +1,81 @@ +package apiGatewayDeploy + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "net/url" + "encoding/json" +) + +var _ = Describe("bundle", func() { + + Context("download", func() { + + It("should mark the status as failed if download fails", func() { + + deploymentID := "bundle_download_fail" + + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + + uri.Path = "/bundles/alwaysfail" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc-32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + bundleJson, err := json.Marshal(bundle) + Expect(err).ShouldNot(HaveOccurred()) + + tx, err := getDB().Begin() + Expect(err).ShouldNot(HaveOccurred()) + + dep := DataDeployment{ + ID: deploymentID, + BundleConfigID: deploymentID, + ApidClusterID: deploymentID, + DataScopeID: deploymentID, + BundleConfigJSON: string(bundleJson), + ConfigJSON: string(bundleJson), + Created: "", + CreatedBy: "", + Updated: "", + UpdatedBy: "", + BundleName: deploymentID, + BundleURI: bundle.URI, + BundleChecksum: bundle.Checksum, + BundleChecksumType: bundle.ChecksumType, + LocalBundleURI: "", + DeployStatus: "", + DeployErrorCode: 0, + DeployErrorMessage: "", + } + + err = InsertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) + + var listener = make(chan string) + addSubscriber <- listener + + downloadBundle(dep) + + // get deployment + deployments, err := getDeployments("WHERE id=$1", deploymentID) + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(deployments)).To(Equal(1)) + d := deployments[0] + + Expect(d.ID).To(Equal(deploymentID)) + Expect(d.LocalBundleURI).To(BeEmpty()) + Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL)) + Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO)) + Expect(d.DeployErrorMessage).ToNot(BeEmpty()) + }) + }) +})
diff --git a/data.go b/data.go index 45ad641..42f6efd 100644 --- a/data.go +++ b/data.go
@@ -19,16 +19,18 @@ DataScopeID string BundleConfigJSON string ConfigJSON string - Status string Created string CreatedBy string Updated string UpdatedBy string BundleName string BundleURI string + LocalBundleURI string BundleChecksum string BundleChecksumType string - LocalBundleURI string + DeployStatus string + DeployErrorCode int + DeployErrorMessage string } type SQLExec interface { @@ -40,7 +42,7 @@ CREATE TABLE IF NOT EXISTS etag ( value integer ); - INSERT INTO etag VALUES (1); + INSERT INTO etag (value) VALUES (1); CREATE TABLE IF NOT EXISTS deployments ( id character varying(36) NOT NULL, bundle_config_id varchar(36) NOT NULL, @@ -48,7 +50,6 @@ data_scope_id varchar(36) NOT NULL, bundle_config_json text NOT NULL, config_json text NOT NULL, - status text NOT NULL, created timestamp without time zone, created_by text, updated timestamp without time zone, @@ -56,12 +57,15 @@ bundle_name text, bundle_uri text, local_bundle_uri text, + bundle_checksum text, + bundle_checksum_type text, deploy_status string, deploy_error_code int, deploy_error_message text, PRIMARY KEY (id) ); `) + // todo: is ID enough? must it be scoped by cluster id? if err != nil { return err } @@ -129,10 +133,11 @@ stmt, err := tx.Prepare(` INSERT INTO deployments (id, bundle_config_id, apid_cluster_id, data_scope_id, - bundle_config_json, config_json, status, created, - created_by, updated, updated_by, bundle_name, - bundle_uri, local_bundle_uri) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14); + bundle_config_json, config_json, created, created_by, + updated, updated_by, bundle_name, bundle_uri, local_bundle_uri, + bundle_checksum, bundle_checksum_type, deploy_status, + deploy_error_code, deploy_error_message) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18); `) if err != nil { log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err) @@ -142,9 +147,10 @@ _, err = stmt.Exec( dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID, - dep.BundleConfigJSON, dep.ConfigJSON, dep.Status, dep.Created, - dep.CreatedBy, dep.Updated, dep.UpdatedBy, dep.BundleName, - dep.BundleURI, dep.LocalBundleURI) + dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy, + dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI, + dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus, + dep.DeployErrorCode, dep.DeployErrorMessage) if err != nil { log.Errorf("insert into deployments %s failed: %v", dep.ID, err) return err @@ -179,35 +185,57 @@ // getReadyDeployments() returns array of deployments that are ready to deploy func getReadyDeployments() (deployments []DataDeployment, err error) { + return getDeployments("WHERE local_bundle_uri != $1", "") +} +// getUnreadyDeployments() returns array of deployments that are not yet ready to deploy +func getUnreadyDeployments() (deployments []DataDeployment, err error) { + return getDeployments("WHERE local_bundle_uri = $1 and deploy_status = $2", "", "") +} + +// getDeployments() accepts a "WHERE ..." clause and optional parameters and returns the list of deployments +func getDeployments(where string, a ...interface{}) (deployments []DataDeployment, err error) { db := getDB() - rows, err := db.Query(` + + var stmt *sql.Stmt + stmt, err = db.Prepare(` SELECT id, bundle_config_id, apid_cluster_id, data_scope_id, - bundle_config_json, config_json, status, created, - created_by, updated, updated_by, bundle_name, - bundle_uri, local_bundle_uri + bundle_config_json, config_json, created, created_by, + updated, updated_by, bundle_name, bundle_uri, + local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status, + deploy_error_code, deploy_error_message FROM deployments - WHERE local_bundle_uri != "" - `) + ` + where) + if err != nil { + return + } + var rows *sql.Rows + rows, err = stmt.Query(a...) if err != nil { if err == sql.ErrNoRows { - return deployments, nil + return } log.Errorf("Error querying deployments: %v", err) return } defer rows.Close() + deployments = dataDeploymentsFromRows(rows) + + return +} + +func dataDeploymentsFromRows(rows *sql.Rows) (deployments []DataDeployment) { for rows.Next() { dep := DataDeployment{} rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID, - &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created, - &dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName, - &dep.BundleURI, &dep.LocalBundleURI, + &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Created, &dep.CreatedBy, + &dep.Updated, &dep.UpdatedBy, &dep.BundleName, &dep.BundleURI, + &dep.LocalBundleURI, &dep.BundleChecksum, &dep.BundleChecksumType, &dep.DeployStatus, + &dep.DeployErrorCode, &dep.DeployErrorMessage, ) deployments = append(deployments, dep) } - return } @@ -252,18 +280,11 @@ return err } -func updateLocalURI(depID, localBundleUri string) error { +func updateLocalBundleURI(depID, localBundleUri string) error { - tx, err := getDB().Begin() + stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;") if err != nil { - log.Errorf("begin updateLocalURI failed: %v", err) - return err - } - defer tx.Rollback() - - stmt, err := tx.Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;") - if err != nil { - log.Errorf("prepare updateLocalURI failed: %v", err) + log.Errorf("prepare updateLocalBundleURI failed: %v", err) return err } defer stmt.Close() @@ -274,13 +295,16 @@ return err } - err = tx.Commit() - if err != nil { - log.Errorf("commit updateLocalURI failed: %v", err) - return err - } - log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri) return nil } + +func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) { + + err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri) + if err == sql.ErrNoRows { + err = nil + } + return +}
diff --git a/listener.go b/listener.go index 3b57542..91f0fe4 100644 --- a/listener.go +++ b/listener.go
@@ -5,6 +5,7 @@ "encoding/json" "github.com/30x/apid" "github.com/apigee-labs/transicator/common" + "os" ) const ( @@ -84,6 +85,18 @@ } SetDB(db) + + // if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish + if len(snapshot.Tables) == 0 { + deployments, err := getUnreadyDeployments() + if err != nil { + log.Panicf("unable to query database for unready deployments: %v", err) + } + for _, dep := range deployments { + go downloadBundle(dep) + } + } + log.Debug("Snapshot processed") } @@ -94,6 +107,7 @@ log.Panicf("Error processing ChangeList: %v", err) } defer tx.Rollback() + var bundlesToDelete []string for _, change := range changes.Changes { var err error switch change.Table { @@ -103,10 +117,11 @@ err = addDeployment(tx, change.NewRow) case common.Delete: var id string - err = change.OldRow.Get("id", &id) + change.OldRow.Get("id", &id) + localBundleUri, err := getLocalBundleURI(tx, id) if err == nil { + bundlesToDelete = append(bundlesToDelete, localBundleUri) err = deleteDeployment(tx, id) - // todo: delete downloaded bundle file } default: log.Errorf("unexpected operation: %s", change.Operation) @@ -120,58 +135,37 @@ if err != nil { log.Panicf("Error processing ChangeList: %v", err) } + + // clean up old bundles + if len(bundlesToDelete) > 0 { + log.Debugf("will delete %d old bundles", len(bundlesToDelete)) + go func() { + // give clients a minute to avoid conflicts + // todo: configurable time + //time.Sleep(1 * time.Minute) + for _, b := range bundlesToDelete { + log.Debugf("removing old bundle: %v", b) + safeDelete(b) + } + }() + } } -func addDeployment(tx *sql.Tx, row common.Row) (err error) { +func dataDeploymentFromRow(row common.Row) (d DataDeployment, err error) { - d := DataDeployment{} - err = row.Get("id", &d.ID) - if err != nil { - return - } - err = row.Get("bundle_config_id", &d.BundleConfigID) - if err != nil { - return - } - err = row.Get("apid_cluster_id", &d.ApidClusterID) - if err != nil { - return - } - err = row.Get("data_scope_id", &d.DataScopeID) - if err != nil { - return - } - err = row.Get("bundle_config_json", &d.BundleConfigJSON) - if err != nil { - return - } - err = row.Get("config_json", &d.ConfigJSON) - if err != nil { - return - } - err = row.Get("status", &d.Status) - if err != nil { - return - } - err = row.Get("created", &d.Created) - if err != nil { - return - } - err = row.Get("created_by", &d.CreatedBy) - if err != nil { - return - } - err = row.Get("updated", &d.Updated) - if err != nil { - return - } - err = row.Get("updated_by", &d.UpdatedBy) - if err != nil { - return - } + row.Get("id", &d.ID) + row.Get("bundle_config_id", &d.BundleConfigID) + row.Get("apid_cluster_id", &d.ApidClusterID) + row.Get("data_scope_id", &d.DataScopeID) + row.Get("bundle_config_json", &d.BundleConfigJSON) + row.Get("config_json", &d.ConfigJSON) + row.Get("created", &d.Created) + row.Get("created_by", &d.CreatedBy) + row.Get("updated", &d.Updated) + row.Get("updated_by", &d.UpdatedBy) var bc bundleConfigJson - err = json.Unmarshal([]byte(d.BundleConfigJSON), &bc) + json.Unmarshal([]byte(d.BundleConfigJSON), &bc) if err != nil { log.Errorf("JSON decoding Manifest failed: %v", err) return @@ -182,6 +176,17 @@ d.BundleChecksumType = bc.ChecksumType d.BundleChecksum = bc.Checksum + return +} + +func addDeployment(tx *sql.Tx, row common.Row) (err error) { + + var d DataDeployment + d, err = dataDeploymentFromRow(row) + if err != nil { + return + } + err = InsertDeployment(tx, d) if err != nil { return @@ -191,3 +196,9 @@ go downloadBundle(d) return } + +func safeDelete(file string) { + if e := os.Remove(file); e != nil && !os.IsNotExist(e) { + log.Warnf("unable to delete file %s: %v", file, e) + } +}