Change locking behavior to assure correct transaction ordering, stop storing eTag in DB
diff --git a/api.go b/api.go index e677294..fec8239 100644 --- a/api.go +++ b/api.go
@@ -6,10 +6,11 @@ "fmt" "io/ioutil" "net/http" - "strconv" - "time" - "sync" "net/url" + "strconv" + "sync" + "sync/atomic" + "time" ) const ( @@ -23,6 +24,7 @@ var ( deploymentsChanged = make(chan string) addSubscriber = make(chan chan string) + eTag int64 ) type errorResponse struct { @@ -94,13 +96,14 @@ subscribers = make(map[chan string]struct{}) m := msg msg = "" + incrementETag() mut.Unlock() log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs)) - incrementETag() for subscriber := range subs { select { case subscriber <- m: log.Debugf("Handling deploy response for: %s", m) + log.Debugf("delivering TO: %v", subscriber) default: log.Debugf("listener too far behind, message dropped") } @@ -153,11 +156,7 @@ log.Debugf("if-none-match: %s", ifNoneMatch) // send unmodified if matches prior eTag and no timeout - eTag, err := getETag() - if err != nil { - writeDatabaseError(w) - return - } + eTag := getETag() if eTag == ifNoneMatch && timeout == 0 { w.WriteHeader(http.StatusNotModified) return @@ -210,16 +209,16 @@ for _, d := range dataDeps { apiDeps = append(apiDeps, ApiDeployment{ - ID: d.ID, - ScopeId: d.DataScopeID, - Created: d.Created, - CreatedBy: d.CreatedBy, - Updated: d.Updated, - UpdatedBy: d.UpdatedBy, - BundleConfigJson: []byte(d.BundleConfigJSON), - ConfigJson: []byte(d.ConfigJSON), - DisplayName: d.BundleName, - URI: d.LocalBundleURI, + ID: d.ID, + ScopeId: d.DataScopeID, + Created: d.Created, + CreatedBy: d.CreatedBy, + Updated: d.Updated, + UpdatedBy: d.UpdatedBy, + BundleConfigJson: []byte(d.BundleConfigJSON), + ConfigJson: []byte(d.ConfigJSON), + DisplayName: d.BundleName, + URI: d.LocalBundleURI, }) } @@ -329,3 +328,13 @@ return nil } } + +// call whenever the list of deployments changes +func incrementETag() { + atomic.AddInt64(&eTag, 1) +} + +func getETag() string { + e := atomic.LoadInt64(&eTag) + return strconv.FormatInt(e, 10) +}
diff --git a/data.go b/data.go index 6cd175a..dc46ab7 100644 --- a/data.go +++ b/data.go
@@ -39,10 +39,6 @@ func InitDB(db apid.DB) error { _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS etag ( - value integer - ); - INSERT INTO etag (value) VALUES (1); CREATE TABLE IF NOT EXISTS deployments ( id character varying(36) NOT NULL, bundle_config_id varchar(36) NOT NULL, @@ -88,42 +84,6 @@ unsafeDB = db } -// call whenever the list of deployments changes -func incrementETag() error { - - stmt, err := getDB().Prepare("UPDATE etag SET value = value+1;") - if err != nil { - log.Errorf("prepare update etag failed: %v", err) - return err - } - defer stmt.Close() - - _, err = stmt.Exec() - if err != nil { - log.Errorf("update etag failed: %v", err) - return err - } - - log.Debugf("etag incremented") - return err -} - -func getETag() (string, error) { - - var eTag string - db := getDB() - row := db.QueryRow("SELECT value FROM etag") - err := row.Scan(&eTag) - //err := getDB().QueryRow("SELECT value FROM etag").Scan(&eTag) - if err != nil { - log.Errorf("select etag failed: %v", err) - return "", err - } - - log.Debugf("etag queried: %v", eTag) - return eTag, err -} - func InsertDeployment(tx *sql.Tx, dep DataDeployment) error { log.Debugf("insertDeployment: %s", dep.ID)
diff --git a/listener.go b/listener.go index 7d9b056..24f3d96 100644 --- a/listener.go +++ b/listener.go
@@ -64,6 +64,7 @@ // ensure that no new database updates are made on old database dbMux.Lock() + defer dbMux.Unlock() defer tx.Rollback() for _, table := range snapshot.Tables { @@ -86,7 +87,6 @@ } SetDB(db) - dbMux.Unlock() // if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish if len(snapshot.Tables) == 0 { @@ -109,6 +109,11 @@ log.Panicf("Error processing ChangeList: %v", err) } defer tx.Rollback() + + // ensure bundle download and delete updates aren't attempted while in process + dbMux.Lock() + defer dbMux.Unlock() + var bundlesToDelete []string for _, change := range changes.Changes { var err error