Merge pull request #12 from 30x/XAPID-707
Xapid 707
diff --git a/api.go b/api.go
index 2fc41f4..a0e34cd 100644
--- a/api.go
+++ b/api.go
@@ -224,7 +224,7 @@
b, err := json.Marshal(apiDeps)
if err != nil {
- log.Errorf("unable to marshal deployments: %s", err)
+ log.Errorf("unable to marshal deployments: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
@@ -328,7 +328,7 @@
log.Errorf("failed to communicate with tracking service: %v", err)
} else {
b, _ := ioutil.ReadAll(resp.Body)
- log.Errorf("tracking service call failed to %s , code: %d, body: %s", apiPath, resp.StatusCode, string(b))
+ log.Errorf("tracking service call failed to %s, code: %d, body: %s", apiPath, resp.StatusCode, string(b))
}
backOffFunc()
continue
diff --git a/bundle.go b/bundle.go
index 95fb3f6..1163946 100644
--- a/bundle.go
+++ b/bundle.go
@@ -143,7 +143,7 @@
// the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI
// unfortunately, this also means that a bundle cache isn't especially relevant
- fileName := dep.DataScopeID + dep.ID + dep.ID
+ fileName := dep.DataScopeID + "_" + dep.ID
return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(fileName)))
}
diff --git a/data.go b/data.go
index d8af041..d36e745 100644
--- a/data.go
+++ b/data.go
@@ -86,8 +86,12 @@
}
func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
+ return insertDeployments(tx, []DataDeployment{dep})
+}
- log.Debugf("insertDeployment: %s", dep.ID)
+func insertDeployments(tx *sql.Tx, deps []DataDeployment) error {
+
+ log.Debugf("inserting %d deployments", len(deps))
stmt, err := tx.Prepare(`
INSERT INTO deployments
@@ -99,23 +103,27 @@
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
`)
if err != nil {
- log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
+ log.Errorf("prepare insert into deployments failed: %v", err)
return err
}
defer stmt.Close()
- _, err = stmt.Exec(
- dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
- dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
- dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
- dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
- dep.DeployErrorCode, dep.DeployErrorMessage)
- if err != nil {
- log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
- return err
+ for _, dep := range deps {
+ log.Debugf("insertDeployment: %s", dep.ID)
+
+ _, err = stmt.Exec(
+ dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
+ dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+ dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
+ dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+ dep.DeployErrorCode, dep.DeployErrorMessage)
+ if err != nil {
+ log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
+ return err
+ }
}
- log.Debugf("insert into deployments %s succeeded", dep.ID)
+ log.Debug("inserting deployments succeeded")
return err
}
@@ -136,8 +144,6 @@
return err
}
- deploymentsChanged <- depID
-
log.Debugf("deleteDeployment %s succeeded", depID)
return err
}
@@ -260,9 +266,9 @@
return nil
}
-func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) {
+func getLocalBundleURI(depID string) (localBundleUri string, err error) {
- err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
+ err = getDB().QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
if err == sql.ErrNoRows {
err = nil
}
diff --git a/listener.go b/listener.go
index c3b591f..0f5c493 100644
--- a/listener.go
+++ b/listener.go
@@ -1,11 +1,12 @@
package apiGatewayDeploy
import (
- "database/sql"
"encoding/json"
"os"
"time"
+ "fmt"
+
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
)
@@ -58,28 +59,41 @@
log.Panicf("Unable to initialize database: %v", err)
}
- tx, err := db.Begin()
- if err != nil {
- log.Panicf("Error starting transaction: %v", err)
+ var deploymentsToInsert []DataDeployment
+ var errResults apiDeploymentResults
+ for _, table := range snapshot.Tables {
+ switch table.Name {
+ case DEPLOYMENT_TABLE:
+ for _, row := range table.Rows {
+ dep, err := dataDeploymentFromRow(row)
+ if err == nil {
+ deploymentsToInsert = append(deploymentsToInsert, dep)
+ } else {
+ result := apiDeploymentResult{
+ ID: dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: ERROR_CODE_TODO,
+ Message: fmt.Sprintf("unable to parse deployment: %v", err),
+ }
+ errResults = append(errResults, result)
+ }
+ }
+ }
}
// ensure that no new database updates are made on old database
dbMux.Lock()
defer dbMux.Unlock()
+ tx, err := db.Begin()
+ if err != nil {
+ log.Panicf("Error starting transaction: %v", err)
+ }
defer tx.Rollback()
- for _, table := range snapshot.Tables {
- var err error
- switch table.Name {
- case DEPLOYMENT_TABLE:
- log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows))
- for _, row := range table.Rows {
- err = addDeployment(tx, row)
- }
- }
- if err != nil {
- log.Panicf("Error processing Snapshot: %v", err)
- }
+
+ err = insertDeployments(tx, deploymentsToInsert)
+ if err != nil {
+ log.Panicf("Error processing Snapshot: %v", err)
}
err = tx.Commit()
@@ -89,6 +103,15 @@
SetDB(db)
+ for _, dep := range deploymentsToInsert {
+ queueDownloadRequest(dep)
+ }
+
+ // transmit parsing errors back immediately
+ if len(errResults) > 0 {
+ go transmitDeploymentResultsToServer(errResults)
+ }
+
// if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish
if len(snapshot.Tables) == 0 {
go func() {
@@ -107,54 +130,89 @@
func processChangeList(changes *common.ChangeList) {
+ // gather deleted bundle info
+ var deploymentsToInsert, deploymentsToDelete []DataDeployment
+ var errResults apiDeploymentResults
+ for _, change := range changes.Changes {
+ switch change.Table {
+ case DEPLOYMENT_TABLE:
+ switch change.Operation {
+ case common.Insert:
+ dep, err := dataDeploymentFromRow(change.NewRow)
+ if err == nil {
+ deploymentsToInsert = append(deploymentsToInsert, dep)
+ } else {
+ result := apiDeploymentResult{
+ ID: dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: ERROR_CODE_TODO,
+ Message: fmt.Sprintf("unable to parse deployment: %v", err),
+ }
+ errResults = append(errResults, result)
+ }
+ case common.Delete:
+ var id, dataScopeID string
+ change.OldRow.Get("id", &id)
+ change.OldRow.Get("data_scope_id", &dataScopeID)
+ // only need these two fields to delete and determine bundle file
+ dep := DataDeployment{
+ ID: id,
+ DataScopeID: dataScopeID,
+ }
+ deploymentsToDelete = append(deploymentsToDelete, dep)
+ default:
+ log.Errorf("unexpected operation: %s", change.Operation)
+ }
+ }
+ }
+
+ // transmit parsing errors back immediately
+ if len(errResults) > 0 {
+ go transmitDeploymentResultsToServer(errResults)
+ }
+
tx, err := getDB().Begin()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
defer tx.Rollback()
- // ensure bundle download and delete updates aren't attempted while in process
- dbMux.Lock()
- defer dbMux.Unlock()
-
- var bundlesToDelete []string
- for _, change := range changes.Changes {
- var err error
- switch change.Table {
- case DEPLOYMENT_TABLE:
- switch change.Operation {
- case common.Insert:
- err = addDeployment(tx, change.NewRow)
- case common.Delete:
- var id string
- change.OldRow.Get("id", &id)
- localBundleUri, err := getLocalBundleURI(tx, id)
- if err == nil {
- bundlesToDelete = append(bundlesToDelete, localBundleUri)
- err = deleteDeployment(tx, id)
- }
- default:
- log.Errorf("unexpected operation: %s", change.Operation)
- }
- }
+ for _, dep := range deploymentsToDelete {
+ err = deleteDeployment(tx, dep.ID)
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
}
+ err = insertDeployments(tx, deploymentsToInsert)
+ if err != nil {
+ log.Panicf("Error processing ChangeList: %v", err)
+ }
+
err = tx.Commit()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
+ if len(deploymentsToDelete) > 0 {
+ deploymentsChanged <- deploymentsToDelete[0].ID // arbitrary, the ID doesn't matter
+ }
+
+ log.Debug("ChangeList processed")
+
+ for _, dep := range deploymentsToInsert {
+ queueDownloadRequest(dep)
+ }
+
// clean up old bundles
- if len(bundlesToDelete) > 0 {
- log.Debugf("will delete %d old bundles", len(bundlesToDelete))
+ if len(deploymentsToDelete) > 0 {
+ log.Debugf("will delete %d old bundles", len(deploymentsToDelete))
go func() {
// give clients a minute to avoid conflicts
time.Sleep(bundleCleanupDelay)
- for _, b := range bundlesToDelete {
- log.Debugf("removing old bundle: %v", b)
- safeDelete(b)
+ for _, dep := range deploymentsToDelete {
+ bundleFile := getBundleFile(dep)
+ log.Debugf("removing old bundle: %v", bundleFile)
+ safeDelete(bundleFile)
}
}()
}
@@ -188,23 +246,6 @@
return
}
-func addDeployment(tx *sql.Tx, row common.Row) (err error) {
-
- var d DataDeployment
- d, err = dataDeploymentFromRow(row)
- if err != nil {
- return
- }
-
- err = InsertDeployment(tx, d)
- if err != nil {
- return
- }
-
- queueDownloadRequest(d)
- return
-}
-
func safeDelete(file string) {
if e := os.Remove(file); e != nil && !os.IsNotExist(e) {
log.Warnf("unable to delete file %s: %v", file, e)
diff --git a/listener_test.go b/listener_test.go
index aeddbf9..dbee310 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -2,11 +2,12 @@
import (
"encoding/json"
+ "net/url"
+
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "net/url"
)
var _ = Describe("listener", func() {