Change locking behavior to assure correct transaction ordering, stop storing eTag in DB
diff --git a/api.go b/api.go
index e677294..fec8239 100644
--- a/api.go
+++ b/api.go
@@ -6,10 +6,11 @@
"fmt"
"io/ioutil"
"net/http"
- "strconv"
- "time"
- "sync"
"net/url"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
)
const (
@@ -23,6 +24,7 @@
var (
deploymentsChanged = make(chan string)
addSubscriber = make(chan chan string)
+ eTag int64
)
type errorResponse struct {
@@ -94,13 +96,14 @@
subscribers = make(map[chan string]struct{})
m := msg
msg = ""
+ incrementETag()
mut.Unlock()
log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs))
- incrementETag()
for subscriber := range subs {
select {
case subscriber <- m:
log.Debugf("Handling deploy response for: %s", m)
+ log.Debugf("delivering TO: %v", subscriber)
default:
log.Debugf("listener too far behind, message dropped")
}
@@ -153,11 +156,7 @@
log.Debugf("if-none-match: %s", ifNoneMatch)
// send unmodified if matches prior eTag and no timeout
- eTag, err := getETag()
- if err != nil {
- writeDatabaseError(w)
- return
- }
+ eTag := getETag()
if eTag == ifNoneMatch && timeout == 0 {
w.WriteHeader(http.StatusNotModified)
return
@@ -210,16 +209,16 @@
for _, d := range dataDeps {
apiDeps = append(apiDeps, ApiDeployment{
- ID: d.ID,
- ScopeId: d.DataScopeID,
- Created: d.Created,
- CreatedBy: d.CreatedBy,
- Updated: d.Updated,
- UpdatedBy: d.UpdatedBy,
- BundleConfigJson: []byte(d.BundleConfigJSON),
- ConfigJson: []byte(d.ConfigJSON),
- DisplayName: d.BundleName,
- URI: d.LocalBundleURI,
+ ID: d.ID,
+ ScopeId: d.DataScopeID,
+ Created: d.Created,
+ CreatedBy: d.CreatedBy,
+ Updated: d.Updated,
+ UpdatedBy: d.UpdatedBy,
+ BundleConfigJson: []byte(d.BundleConfigJSON),
+ ConfigJson: []byte(d.ConfigJSON),
+ DisplayName: d.BundleName,
+ URI: d.LocalBundleURI,
})
}
@@ -329,3 +328,13 @@
return nil
}
}
+
+// call whenever the list of deployments changes
+func incrementETag() {
+ atomic.AddInt64(&eTag, 1)
+}
+
+func getETag() string {
+ e := atomic.LoadInt64(&eTag)
+ return strconv.FormatInt(e, 10)
+}
diff --git a/data.go b/data.go
index 6cd175a..dc46ab7 100644
--- a/data.go
+++ b/data.go
@@ -39,10 +39,6 @@
func InitDB(db apid.DB) error {
_, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS etag (
- value integer
- );
- INSERT INTO etag (value) VALUES (1);
CREATE TABLE IF NOT EXISTS deployments (
id character varying(36) NOT NULL,
bundle_config_id varchar(36) NOT NULL,
@@ -88,42 +84,6 @@
unsafeDB = db
}
-// call whenever the list of deployments changes
-func incrementETag() error {
-
- stmt, err := getDB().Prepare("UPDATE etag SET value = value+1;")
- if err != nil {
- log.Errorf("prepare update etag failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- _, err = stmt.Exec()
- if err != nil {
- log.Errorf("update etag failed: %v", err)
- return err
- }
-
- log.Debugf("etag incremented")
- return err
-}
-
-func getETag() (string, error) {
-
- var eTag string
- db := getDB()
- row := db.QueryRow("SELECT value FROM etag")
- err := row.Scan(&eTag)
- //err := getDB().QueryRow("SELECT value FROM etag").Scan(&eTag)
- if err != nil {
- log.Errorf("select etag failed: %v", err)
- return "", err
- }
-
- log.Debugf("etag queried: %v", eTag)
- return eTag, err
-}
-
func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
log.Debugf("insertDeployment: %s", dep.ID)
diff --git a/listener.go b/listener.go
index 7d9b056..24f3d96 100644
--- a/listener.go
+++ b/listener.go
@@ -64,6 +64,7 @@
// ensure that no new database updates are made on old database
dbMux.Lock()
+ defer dbMux.Unlock()
defer tx.Rollback()
for _, table := range snapshot.Tables {
@@ -86,7 +87,6 @@
}
SetDB(db)
- dbMux.Unlock()
// if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish
if len(snapshot.Tables) == 0 {
@@ -109,6 +109,11 @@
log.Panicf("Error processing ChangeList: %v", err)
}
defer tx.Rollback()
+
+ // ensure bundle download and delete updates aren't attempted while in process
+ dbMux.Lock()
+ defer dbMux.Unlock()
+
var bundlesToDelete []string
for _, change := range changes.Changes {
var err error