better, more efficient tx handling
diff --git a/api.go b/api.go index 2fc41f4..a0e34cd 100644 --- a/api.go +++ b/api.go
@@ -224,7 +224,7 @@ b, err := json.Marshal(apiDeps) if err != nil { - log.Errorf("unable to marshal deployments: %s", err) + log.Errorf("unable to marshal deployments: %v", err) w.WriteHeader(http.StatusInternalServerError) return } @@ -328,7 +328,7 @@ log.Errorf("failed to communicate with tracking service: %v", err) } else { b, _ := ioutil.ReadAll(resp.Body) - log.Errorf("tracking service call failed to %s , code: %d, body: %s", apiPath, resp.StatusCode, string(b)) + log.Errorf("tracking service call failed to %s, code: %d, body: %s", apiPath, resp.StatusCode, string(b)) } backOffFunc() continue
diff --git a/bundle.go b/bundle.go index 95fb3f6..1163946 100644 --- a/bundle.go +++ b/bundle.go
@@ -143,7 +143,7 @@ // the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI // unfortunately, this also means that a bundle cache isn't especially relevant - fileName := dep.DataScopeID + dep.ID + dep.ID + fileName := dep.DataScopeID + "_" + dep.ID return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(fileName))) }
diff --git a/data.go b/data.go index d8af041..ae8aafd 100644 --- a/data.go +++ b/data.go
@@ -86,8 +86,12 @@ } func InsertDeployment(tx *sql.Tx, dep DataDeployment) error { + return insertDeployments(tx, []DataDeployment{dep}) +} - log.Debugf("insertDeployment: %s", dep.ID) +func insertDeployments(tx *sql.Tx, deps []DataDeployment) error { + + log.Debugf("inserting %d deployments", len(deps)) stmt, err := tx.Prepare(` INSERT INTO deployments @@ -99,23 +103,27 @@ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18); `) if err != nil { - log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err) + log.Errorf("prepare insert into deployments failed: %v", err) return err } defer stmt.Close() - _, err = stmt.Exec( - dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID, - dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy, - dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI, - dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus, - dep.DeployErrorCode, dep.DeployErrorMessage) - if err != nil { - log.Errorf("insert into deployments %s failed: %v", dep.ID, err) - return err + for _, dep := range deps { + log.Debugf("insertDeployment: %s", dep.ID) + + _, err = stmt.Exec( + dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID, + dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy, + dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI, + dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus, + dep.DeployErrorCode, dep.DeployErrorMessage) + if err != nil { + log.Errorf("insert into deployments %s failed: %v", dep.ID, err) + return err + } } - log.Debugf("insert into deployments %s succeeded", dep.ID) + log.Debug("inserting deployments succeeded") return err } @@ -260,9 +268,9 @@ return nil } -func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) { +func getLocalBundleURI(depID string) (localBundleUri string, err error) { - err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri) + err = getDB().QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri) if err == sql.ErrNoRows { err = nil }
diff --git a/listener.go b/listener.go index c3b591f..96ca324 100644 --- a/listener.go +++ b/listener.go
@@ -6,6 +6,8 @@ "os" "time" + "fmt" + "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" ) @@ -58,28 +60,41 @@ log.Panicf("Unable to initialize database: %v", err) } - tx, err := db.Begin() - if err != nil { - log.Panicf("Error starting transaction: %v", err) + var deploymentsToInsert []DataDeployment + var errResults apiDeploymentResults + for _, table := range snapshot.Tables { + switch table.Name { + case DEPLOYMENT_TABLE: + for _, row := range table.Rows { + dep, err := dataDeploymentFromRow(row) + if err == nil { + deploymentsToInsert = append(deploymentsToInsert, dep) + } else { + result := apiDeploymentResult{ + ID: dep.ID, + Status: RESPONSE_STATUS_FAIL, + ErrorCode: ERROR_CODE_TODO, + Message: fmt.Sprintf("unable to parse deployment: %v", err), + } + errResults = append(errResults, result) + } + } + } } // ensure that no new database updates are made on old database dbMux.Lock() defer dbMux.Unlock() + tx, err := db.Begin() + if err != nil { + log.Panicf("Error starting transaction: %v", err) + } defer tx.Rollback() - for _, table := range snapshot.Tables { - var err error - switch table.Name { - case DEPLOYMENT_TABLE: - log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows)) - for _, row := range table.Rows { - err = addDeployment(tx, row) - } - } - if err != nil { - log.Panicf("Error processing Snapshot: %v", err) - } + + err = addDeployments(tx, deploymentsToInsert) + if err != nil { + log.Panicf("Error processing Snapshot: %v", err) } err = tx.Commit() @@ -89,6 +104,11 @@ SetDB(db) + // transmit parsing errors back immediately + if len(errResults) > 0 { + go transmitDeploymentResultsToServer(errResults) + } + // if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish if len(snapshot.Tables) == 0 { go func() { @@ -107,54 +127,77 @@ func processChangeList(changes *common.ChangeList) { + // gather deleted bundle info + var deploymentsToInsert, deploymentsToDelete []DataDeployment + var errResults apiDeploymentResults + for _, change := range changes.Changes { + switch change.Table { + case DEPLOYMENT_TABLE: + switch change.Operation { + case common.Insert: + dep, err := dataDeploymentFromRow(change.NewRow) + if err == nil { + deploymentsToInsert = append(deploymentsToInsert, dep) + } else { + result := apiDeploymentResult{ + ID: dep.ID, + Status: RESPONSE_STATUS_FAIL, + ErrorCode: ERROR_CODE_TODO, + Message: fmt.Sprintf("unable to parse deployment: %v", err), + } + errResults = append(errResults, result) + } + case common.Delete: + var id, dataScopeID string + change.OldRow.Get("id", &id) + change.OldRow.Get("data_scope_id", &dataScopeID) + // only need these two fields to delete and determine bundle file + dep := DataDeployment{ + ID: id, + DataScopeID: dataScopeID, + } + deploymentsToDelete = append(deploymentsToDelete, dep) + } + } + } + + // transmit parsing errors back immediately + if len(errResults) > 0 { + go transmitDeploymentResultsToServer(errResults) + } + tx, err := getDB().Begin() if err != nil { log.Panicf("Error processing ChangeList: %v", err) } defer tx.Rollback() - // ensure bundle download and delete updates aren't attempted while in process - dbMux.Lock() - defer dbMux.Unlock() - - var bundlesToDelete []string - for _, change := range changes.Changes { - var err error - switch change.Table { - case DEPLOYMENT_TABLE: - switch change.Operation { - case common.Insert: - err = addDeployment(tx, change.NewRow) - case common.Delete: - var id string - change.OldRow.Get("id", &id) - localBundleUri, err := getLocalBundleURI(tx, id) - if err == nil { - bundlesToDelete = append(bundlesToDelete, localBundleUri) - err = deleteDeployment(tx, id) - } - default: - log.Errorf("unexpected operation: %s", change.Operation) - } - } + for _, dep := range deploymentsToDelete { + err = deleteDeployment(tx, dep.ID) if err != nil { log.Panicf("Error processing ChangeList: %v", err) } } - err = tx.Commit() + err = addDeployments(tx, deploymentsToInsert) if err != nil { log.Panicf("Error processing ChangeList: %v", err) } + err = tx.Commit() + if err != nil { + log.Panicf("Error committing Snapshot change: %v", err) + } + // clean up old bundles - if len(bundlesToDelete) > 0 { - log.Debugf("will delete %d old bundles", len(bundlesToDelete)) + if len(deploymentsToDelete) > 0 { + log.Debugf("will delete %d old bundles", len(deploymentsToDelete)) go func() { // give clients a minute to avoid conflicts time.Sleep(bundleCleanupDelay) - for _, b := range bundlesToDelete { - log.Debugf("removing old bundle: %v", b) - safeDelete(b) + for _, dep := range deploymentsToDelete { + bundleFile := getBundleFile(dep) + log.Debugf("removing old bundle: %v", bundleFile) + safeDelete(bundleFile) } }() } @@ -188,20 +231,16 @@ return } -func addDeployment(tx *sql.Tx, row common.Row) (err error) { +func addDeployments(tx *sql.Tx, deps []DataDeployment) (err error) { - var d DataDeployment - d, err = dataDeploymentFromRow(row) + err = insertDeployments(tx, deps) if err != nil { return } - err = InsertDeployment(tx, d) - if err != nil { - return + for _, dep := range deps { + queueDownloadRequest(dep) } - - queueDownloadRequest(d) return }
diff --git a/listener_test.go b/listener_test.go index aeddbf9..dbee310 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -2,11 +2,12 @@ import ( "encoding/json" + "net/url" + "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "net/url" ) var _ = Describe("listener", func() {