refactor data.go
diff --git a/data.go b/data.go
index aeb0203..eca1491 100644
--- a/data.go
+++ b/data.go
@@ -516,6 +516,54 @@
return err
}
+func (dbc *dbManager) getClusterCount() (numApidClusters int, err error) {
+ rows, err := dbc.db.Query("SELECT COUNT(*) FROM edgex_apid_cluster")
+ if err != nil {
+ return
+ }
+ defer rows.Close()
+ rows.Next()
+ err = rows.Scan(&numApidClusters)
+ return
+}
+
+func (dbc *dbManager) alterClusterTable() (err error) {
+ _, err = dbc.db.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
+ if err.Error() == "duplicate column name: last_sequence" {
+ return nil
+ }
+ return
+}
+
+func (dbc *dbManager) writeTransaction(changes *common.ChangeList) bool {
+ tx, err := dbc.getDb().Begin()
+ if err != nil {
+ log.Panicf("Error processing ChangeList: %v", err)
+ }
+ defer tx.Rollback()
+ var ok bool
+ for _, change := range changes.Changes {
+ switch change.Operation {
+ case common.Insert:
+ ok = dbc.insert(change.Table, []common.Row{change.NewRow}, tx)
+ case common.Update:
+ ok = dbc.update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
+ case common.Delete:
+ ok = dbc.deleteRowsFromTable(change.Table, []common.Row{change.OldRow}, tx)
+ }
+ if !ok {
+ log.Error("Sql Operation error. Operation rollbacked")
+ return ok
+ }
+ }
+ err = tx.Commit()
+ if err != nil {
+ log.Errorf("Error processing ChangeList: %v", err)
+ return false
+ }
+ return ok
+}
+
/*
* generates a random uuid (mix of timestamp & crypto random string)
*/
diff --git a/data_test.go b/data_test.go
index 11a4a6c..e1bf8dc 100644
--- a/data_test.go
+++ b/data_test.go
@@ -19,37 +19,44 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sort"
+ "sync"
)
var _ = Describe("data access tests", func() {
-
+ var testCount int
+ var testDbMan *dbManager
BeforeEach(func() {
- db := getDB()
+ testCount += 1
+ testDbMan = &dbManager{
+ dbMux: sync.RWMutex{},
+ }
+
+ db := testDbMan.getDb()
//all tests in this file operate on the api_product table. Create the necessary tables for this here
- getDB().Exec("CREATE TABLE _transicator_tables " +
+ db.Exec("CREATE TABLE _transicator_tables " +
"(tableName varchar not null, columnName varchar not null, " +
"typid integer, primaryKey bool);")
- getDB().Exec("DELETE from _transicator_tables")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
- getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
+ db.Exec("DELETE from _transicator_tables")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
- getDB().Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
+ db.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
"api_resources text,approval_type text,scopes text,proxies text, environments text," +
"created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
"primary key (id,tenant_id,created_at,updated_at));")
- getDB().Exec("DELETE from kms_api_product")
+ db.Exec("DELETE from kms_api_product")
setDB(db)
initDB(db)
diff --git a/init.go b/init.go
index 3ad221e..d8226de 100644
--- a/init.go
+++ b/init.go
@@ -22,6 +22,7 @@
"time"
"github.com/30x/apid-core"
+ "sync"
)
const (
@@ -102,7 +103,9 @@
},
}
- dbMan = &dbManager{}
+ dbMan = &dbManager{
+ dbMux: sync.RWMutex{},
+ }
// set up default database
err := dbMan.initDefaultDb()
diff --git a/listener.go b/listener.go
index c269b8f..76ec295 100644
--- a/listener.go
+++ b/listener.go
@@ -52,71 +52,27 @@
func (lm *listenerManager) processSqliteSnapshot(db apid.DB) {
- var numApidClusters int
- apidClusters, err := db.Query("SELECT COUNT(*) FROM edgex_apid_cluster")
- if err != nil {
- log.Panicf("Unable to read database: %s", err.Error())
- }
- apidClusters.Next()
- err = apidClusters.Scan(&numApidClusters)
- if err != nil {
- log.Panicf("Unable to read database: %s", err.Error())
+ if count, err := lm.dbm.getClusterCount(); err != nil || count != 1 {
+ log.Panicf("Illegal state for apid_cluster. Must be a single row. %v", err)
}
- if numApidClusters != 1 {
- log.Panic("Illegal state for apid_cluster. Must be a single row.")
- }
-
- _, err = db.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
- if err != nil {
- if err.Error() == "duplicate column name: last_sequence" {
- return
- } else {
- log.Error("[[" + err.Error() + "]]")
- log.Panicf("Unable to create last_sequence column on DB. Unrecoverable error ", err)
- }
+ if err := lm.dbm.alterClusterTable(); err != nil {
+ log.Panicf("Unable to create last_sequence column on DB. Unrecoverable error %v", err)
}
}
func (lm *listenerManager) processChangeList(changes *common.ChangeList) bool {
- ok := false
-
- tx, err := lm.dbm.getDb().Begin()
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- return ok
- }
- defer tx.Rollback()
-
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
for _, change := range changes.Changes {
if change.Table == LISTENER_TABLE_APID_CLUSTER {
log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
}
- switch change.Operation {
- case common.Insert:
- ok = lm.dbm.insert(change.Table, []common.Row{change.NewRow}, tx)
- case common.Update:
- if change.Table == LISTENER_TABLE_DATA_SCOPE {
- log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
- }
- ok = lm.dbm.update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
- case common.Delete:
- ok = lm.dbm.deleteRowsFromTable(change.Table, []common.Row{change.OldRow}, tx)
- }
- if !ok {
- log.Error("Sql Operation error. Operation rollbacked")
- return ok
+ if change.Operation == common.Update && change.Table == LISTENER_TABLE_DATA_SCOPE {
+ log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
}
}
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- return false
- }
-
- return ok
+ return lm.dbm.writeTransaction(changes)
}
diff --git a/managerInterfaces.go b/managerInterfaces.go
index 53a4492..e406784 100644
--- a/managerInterfaces.go
+++ b/managerInterfaces.go
@@ -58,6 +58,9 @@
getApidInstanceInfo() (info apidInstanceInfo, err error)
getLastSequence() string
updateLastSequence(lastSequence string) error
+ getClusterCount() (numApidClusters int, err error)
+ alterClusterTable() (err error)
+ writeTransaction(*common.ChangeList) bool
}
type listenerManagerInterface interface {
diff --git a/token.go b/token.go
index 991463c..e21a10d 100644
--- a/token.go
+++ b/token.go
@@ -51,6 +51,9 @@
invalidateDone: make(chan bool),
isClosed: &isClosedInt,
dbm: dbm,
+ proxyServerBaseUri: config.GetString(configProxyServerBaseURI),
+ clientId: config.GetString(configConsumerKey),
+ clientSecret: config.GetString(configConsumerSecret),
}
return t
}
@@ -67,6 +70,9 @@
invalidateDone chan bool
dbm dbManagerInterface
notNewInstanceId bool
+ proxyServerBaseUri string
+ clientId string
+ clientSecret string
}
func (t *tokenManager) start() {
@@ -144,10 +150,10 @@
func (t *tokenManager) retrieveNewToken() {
log.Debug("Getting OAuth token...")
- uriString := config.GetString(configProxyServerBaseURI)
+ uriString := t.proxyServerBaseUri
uri, err := url.Parse(uriString)
if err != nil {
- log.Panicf("unable to parse uri config '%s' value: '%s': %v", configProxyServerBaseURI, uriString, err)
+ log.Panicf("unable to parse uri value: '%s': %v", uriString, err)
}
uri.Path = path.Join(uri.Path, "/accesstoken")
@@ -158,8 +164,8 @@
return func(_ chan bool) error {
form := url.Values{}
form.Set("grant_type", "client_credentials")
- form.Add("client_id", config.GetString(configConsumerKey))
- form.Add("client_secret", config.GetString(configConsumerSecret))
+ form.Add("client_id", t.clientId)
+ form.Add("client_secret", t.clientSecret)
req, err := http.NewRequest("POST", uri.String(), bytes.NewBufferString(form.Encode()))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
req.Header.Set("display_name", apidInfo.InstanceName)
@@ -219,7 +225,6 @@
}
}
t.token = &token
- config.Set(configBearerToken, token.AccessToken)
return nil
}
diff --git a/token_test.go b/token_test.go
index 4a0c3e6..e75b54d 100644
--- a/token_test.go
+++ b/token_test.go
@@ -221,37 +221,57 @@
func (d *dummyDbMan) initDefaultDb() error {
return nil
}
+
func (d *dummyDbMan) setDb(apid.DB) {}
+
func (d *dummyDbMan) getDb() apid.DB {
return nil
}
+
func (d *dummyDbMan) insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
return true
}
+
func (d *dummyDbMan) deleteRowsFromTable(tableName string, rows []common.Row, txn *sql.Tx) bool {
return true
}
+
func (d *dummyDbMan) update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
return true
}
+
func (d *dummyDbMan) getPkeysForTable(tableName string) ([]string, error) {
return nil, nil
}
+
func (d *dummyDbMan) findScopesForId(configId string) []string {
return nil
}
+
func (d *dummyDbMan) getDefaultDb() (apid.DB, error) {
return nil, nil
}
+
func (d *dummyDbMan) updateApidInstanceInfo() error {
return nil
}
+
func (d *dummyDbMan) getApidInstanceInfo() (info apidInstanceInfo, err error) {
return
}
+
func (d *dummyDbMan) getLastSequence() string {
return ""
}
+
func (d *dummyDbMan) updateLastSequence(lastSequence string) error {
return nil
}
+
+func (d *dummyDbMan) getClusterCount() (int, error) {
+ return 1, nil
+}
+
+func (d *dummyDbMan) alterClusterTable() error {
+ return nil
+}