Remove queue, cleanup unnecessary locking, close rows
diff --git a/api_test.go b/api_test.go index 7c0e810..bdafa9d 100644 --- a/api_test.go +++ b/api_test.go
@@ -215,6 +215,7 @@ rows, err := db.Query("SELECT status from gateway_deploy_bundle WHERE id = ?;", deploymentID) Expect(err).ShouldNot(HaveOccurred()) + defer rows.Close() for rows.Next() { rows.Scan(&deployStatus) Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS))
diff --git a/data.go b/data.go index dd18cdb..9133750 100644 --- a/data.go +++ b/data.go
@@ -3,7 +3,6 @@ import ( "database/sql" "time" - "sync" ) const ( @@ -17,29 +16,6 @@ BUNDLE_TYPE_DEP = 2 ) -/* -Startup flow: - Check deployment queue - If anything in queue, initiate deployment retrieval -Listener flow: - Receive deployment event - Store deployment event in deployment queue - Initiate deployment retrieval -Deployment Retrieval: - Load deployment from deployment queue - Retrieve and store each bundle - Mark deployment as ready to deploy - Trigger deployment -Deployment: - -Tables: - gateway_deploy_queue - Deployment(s) received and not yet processed (one for now) - gateway_deploy_deployment - gateway_deploy_bundle - */ - - type SQLExec interface { Exec(query string, args ...interface{}) (sql.Result, error) } @@ -49,7 +25,7 @@ func initDB() { var count int - row := db.QueryRow("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='gateway_deploy_queue';") + row := db.QueryRow("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='gateway_deploy_deployment';") if err := row.Scan(&count); err != nil { log.Panicf("Unable to check for tables: %v", err) } @@ -65,13 +41,6 @@ } defer tx.Rollback() - _, err = tx.Exec("CREATE TABLE gateway_deploy_queue (" + - "id varchar(255), manifest text, created_at integer, " + - "PRIMARY KEY (id));") - if err != nil { - log.Panicf("Unable to initialize gateway_deploy_queue: %v", err) - } - _, err = tx.Exec("CREATE TABLE gateway_deploy_deployment (" + "id varchar(255), status integer, created_at integer, " + "modified_at integer, error_code varchar(255), " + @@ -97,99 +66,6 @@ } } -var tableLocksLock sync.Mutex -var tableLocks map[string]*sync.Mutex = map[string]*sync.Mutex{} - -func getTableLocker(table string) sync.Locker { - tableLocksLock.Lock() - defer tableLocksLock.Unlock() - - lock := tableLocks[table] - if lock == nil { - lock = &sync.Mutex{} - tableLocks[table] = lock - } - - return lock -} - -// currently only maintains 1 in the queue -func queueDeployment(deploymentID, manifestString string) error { - - log.Debugf("queuing deployment %s: %s", deploymentID, manifestString) - - getTableLocker("gateway_deploy_queue").Lock() - defer getTableLocker("gateway_deploy_queue").Unlock() - - // validate manifest - _, err := parseManifest(manifestString) - if err != nil { - return err - } - - // maintains queue at 1 - // todo: this should be transactional - _, err = db.Exec("DELETE FROM gateway_deploy_queue"); - if err != nil { - log.Errorf("DELETE FROM gateway_deploy_queue failed: %v", err) - return err - } - - _, err = db.Exec("INSERT INTO gateway_deploy_queue (id, manifest, created_at) VALUES (?,?,?);", - deploymentID, - manifestString, - dbTimeNow(), - ) - if err != nil { - log.Errorf("INSERT gateway_deploy_queue %s failed: %v", deploymentID, err) - return err - } - - log.Debugf("deployment %s queued", deploymentID) - - return nil -} - -// committing passed transaction will delete deployment from queue -// Call using the following guard: -func getQueuedDeployment() (depID, manifestString string) { - - log.Debug("getting queued deployment") - - getTableLocker("gateway_deploy_queue").Lock() - defer getTableLocker("gateway_deploy_queue").Unlock() - - err := db.QueryRow("SELECT id, manifest FROM gateway_deploy_queue ORDER BY created_at ASC LIMIT 1;"). - Scan(&depID, &manifestString) - if err != nil { - if err == sql.ErrNoRows { - log.Info("No Deployments available to be processed") - } else { - log.Errorf("SELECT on gateway_deploy_queue failed: %s", err) - } - } - - log.Debugf("got queued deployment: %s", depID) - - return -} - -func deleteDeploymentFromQueue(depID string) { - - log.Debugf("deleting deployment %s from queue", depID) - - getTableLocker("gateway_deploy_queue").Lock() - defer getTableLocker("gateway_deploy_queue").Unlock() - - _, err := db.Exec("DELETE FROM gateway_deploy_queue WHERE id=?;", depID) - if err != nil { - log.Errorf("DELETE from gateway_deploy_queue failed: %s", err) - return - } - - log.Debugf("deleted deployment %s from queue", depID) -} - func dbTimeNow() int64 { return int64(time.Now().UnixNano()) } @@ -198,23 +74,18 @@ log.Debugf("insertDeployment: %s", depID) - getTableLocker("gateway_deploy_deployment").Lock() - defer getTableLocker("gateway_deploy_deployment").Unlock() - getTableLocker("gateway_deploy_bundle").Lock() - defer getTableLocker("gateway_deploy_bundle").Unlock() - tx, err := db.Begin() + defer tx.Rollback() if err != nil { log.Errorf("insertDeployment begin transaction failed: %v", depID, err) return err } - defer tx.Rollback() timeNow := dbTimeNow() _, err = tx.Exec("INSERT INTO gateway_deploy_deployment " + "(id, status, created_at) VALUES(?,?,?);", - depID, DEPLOYMENT_STATE_INPROG, timeNow) + depID, DEPLOYMENT_STATE_READY, timeNow) if err != nil { log.Errorf("INSERT gateway_deploy_deployment %s failed: %v", depID, err) return err @@ -225,7 +96,7 @@ _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " + "(id, deployment_id, scope, type, uri, status, created_at) " + "VALUES(?,?,?,?,?,?,?);", - dep.System.BundleID, depID, dep.System.Scope, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow) + dep.System.BundleID, depID, dep.System.Scope, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_READY, timeNow) if err != nil { log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, dep.System.BundleID, err) return err @@ -250,6 +121,7 @@ log.Debugf("INSERT gateway_deploy_deployment %s succeeded", depID) + incoming <- depID return err } @@ -257,20 +129,17 @@ log.Debugf("updateDeploymentAndBundles: %s", depID) - getTableLocker("gateway_deploy_deployment").Lock() - defer getTableLocker("gateway_deploy_deployment").Unlock() - getTableLocker("gateway_deploy_bundle").Lock() - defer getTableLocker("gateway_deploy_bundle").Unlock() - /* * If the state of deployment was success, update state of bundles and * its deployments as success as well */ + log.Print("begin 3") txn, err := db.Begin() if err != nil { log.Errorf("Unable to begin transaction: %s", err) return err } + log.Print("began 3") defer txn.Rollback() if rsp.Status == RESPONSE_STATUS_SUCCESS { @@ -300,6 +169,7 @@ return err } + log.Print("commit 3") err = txn.Commit() if err != nil { log.Errorf("Unable to commit updateDeploymentStatus transaction: %s", err) @@ -356,7 +226,7 @@ } } if err != nil { - log.Error("UPDATE gateway_deploy_bundle %s:%s failed: %v", depID, bundleID, err) + log.Errorf("UPDATE gateway_deploy_bundle %s:%s failed: %v", depID, bundleID, err) return err } @@ -377,11 +247,13 @@ // getDeployment returns a fully populated deploymentResponse func getDeployment(depID string) (*deployment, error) { - rows, err := db.Query("SELECT id, type, uri, COALESCE(scope, '') as scope FROM gateway_deploy_bundle WHERE deployment_id=?;", depID) + rows, err := db.Query("SELECT id, type, uri, COALESCE(scope, '') as scope " + + "FROM gateway_deploy_bundle WHERE deployment_id=?;", depID) if err != nil { log.Errorf("Unable to query gateway_deploy_bundle. Err: %s", err) return nil, err } + defer rows.Close() depRes := deployment{ Bundles: []bundle{},
diff --git a/deployments.go b/deployments.go index 3a7a13a..5c1b6b2 100644 --- a/deployments.go +++ b/deployments.go
@@ -11,7 +11,6 @@ "encoding/base64" "path" "errors" - "sync" ) var ( @@ -142,11 +141,6 @@ log.Debugf("preparing deployment: %s", depID) - getTableLocker("gateway_deploy_deployment").Lock() - defer getTableLocker("gateway_deploy_deployment").Unlock() - getTableLocker("gateway_deploy_bundle").Lock() - defer getTableLocker("gateway_deploy_bundle").Unlock() - err := insertDeployment(depID, dep) if err != nil { log.Errorf("insert deployment failed: %v", err) @@ -188,37 +182,6 @@ return updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0) } -var queueMutex sync.Mutex - -func serviceDeploymentQueue() { - - queueMutex.Lock() - defer queueMutex.Unlock() - - log.Debug("Checking for new deployments") - - depID, manifestString := getQueuedDeployment() - if depID == "" { - return - } - - manifest, err := parseManifest(manifestString) - if err != nil { - return - } - - err = prepareDeployment(depID, manifest) - if err != nil { - log.Errorf("serviceDeploymentQueue prepare deployment failed: %v", depID) - return - } - - deleteDeploymentFromQueue(depID) - - log.Debugf("Signaling new deployment ready: %s", depID) - incoming <- depID -} - func parseManifest(manifestString string) (dep deployment, err error) { err = json.Unmarshal([]byte(manifestString), &dep) if err != nil {
diff --git a/init.go b/init.go index 066590d..7fe5ebe 100644 --- a/init.go +++ b/init.go
@@ -55,8 +55,6 @@ initAPI(services) initListener(services) - go serviceDeploymentQueue() - log.Debug("end init") return nil
diff --git a/listener.go b/listener.go index 3cb3b4e..5fde4b3 100644 --- a/listener.go +++ b/listener.go
@@ -87,11 +87,30 @@ return err } - err = queueDeployment(deploymentID, manifest) - - if err == nil { - go serviceDeploymentQueue() - } + //err = queueDeployment(deploymentID, manifest) + // + //if err == nil { + // go serviceDeploymentQueue() + //} + bypassQueue(deploymentID, manifest) return err } + + +func bypassQueue(depID, manifestString string) { + + manifest, err := parseManifest(manifestString) + if err != nil { + return + } + + err = prepareDeployment(depID, manifest) + if err != nil { + log.Errorf("serviceDeploymentQueue prepare deployment failed: %v", depID) + return + } + + log.Debugf("Signaling new deployment ready: %s", depID) + incoming <- depID +} \ No newline at end of file
diff --git a/listener_test.go b/listener_test.go index 6872c55..b72f9d7 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -9,8 +9,6 @@ "github.com/apigee-labs/transicator/common" ) -// todo: test servicing the deployment queue - var _ = Describe("listener", func() { It("should process ApigeeSync snapshot event", func(done Done) { @@ -62,10 +60,9 @@ return } + depID, err := getCurrentDeploymentID() Expect(err).ShouldNot(HaveOccurred()) - depID, manString := getQueuedDeployment() Expect(depID).Should(Equal(deploymentID)) - Expect(manString).Should(Equal(string(depBytes))) close(done) }, @@ -126,9 +123,9 @@ return } - depID, manString := getQueuedDeployment() + depID, err := getCurrentDeploymentID() + Expect(err).ShouldNot(HaveOccurred()) Expect(depID).Should(Equal(deploymentID)) - Expect(manString).Should(Equal(string(depBytes))) close(done) },