reformat api.go, listener.go and data.go, rewrite bundle.go
diff --git a/api.go b/api.go
index 5aea7f2..3d4fe23 100644
--- a/api.go
+++ b/api.go
@@ -47,19 +47,16 @@
changeTimeFormat = "2006-01-02 15:04:05.999"
)
+const (
+ kindCollection = "Collection"
+)
+
type deploymentsResult struct {
deployments []DataDeployment
err error
eTag string
}
-var (
- deploymentsChanged = make(chan interface{}, 5)
- addSubscriber = make(chan chan deploymentsResult)
- removeSubscriber = make(chan chan deploymentsResult)
- eTag int64
-)
-
type errorResponse struct {
ErrorCode int `json:"errorCode"`
Reason string `json:"reason"`
@@ -87,15 +84,36 @@
}
const deploymentsEndpoint = "/configurations"
-const BlobEndpoint = "/blob/{blobId}"
+const blobEndpointPath = "/blob"
+const blobEndpoint = blobEndpointPath + "/{blobId}"
-func InitAPI() {
- log.Debug("API endpoints initialized")
- services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
- services.API().HandleFunc(BlobEndpoint, apiReturnBlobData).Methods("GET")
+type apiManagerInterface interface {
+ InitAPI()
+ addChangedDeployment(string)
+ distributeEvents()
}
-func writeError(w http.ResponseWriter, status int, code int, reason string) {
+type apiManager struct {
+ dbMan dbManagerInterface
+ deploymentsEndpoint string
+ blobEndpoint string
+ eTag int64
+ deploymentsChanged chan interface{}
+ addSubscriber chan chan deploymentsResult
+ removeSubscriber chan chan deploymentsResult
+}
+
+func (a *apiManager) InitAPI() {
+ log.Debug("API endpoints initialized")
+ services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET")
+ services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET")
+}
+
+func (a *apiManager) addChangedDeployment(id string) {
+ a.deploymentsChanged <- id
+}
+
+func (a *apiManager) writeError(w http.ResponseWriter, status int, code int, reason string) {
w.WriteHeader(status)
e := errorResponse{
ErrorCode: code,
@@ -110,11 +128,11 @@
log.Debugf("sending %d error to client: %s", status, reason)
}
-func writeInternalError(w http.ResponseWriter, err string) {
- writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err)
+func (a *apiManager) writeInternalError(w http.ResponseWriter, err string) {
+ a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err)
}
-func debounce(in chan interface{}, out chan []interface{}, window time.Duration) {
+func (a *apiManager) debounce(in chan interface{}, out chan []interface{}, window time.Duration) {
send := func(toSend []interface{}) {
if toSend != nil {
log.Debugf("debouncer sending: %v", toSend)
@@ -141,11 +159,11 @@
}
}
-func distributeEvents() {
+func (a *apiManager) distributeEvents() {
subscribers := make(map[chan deploymentsResult]bool)
deliverDeployments := make(chan []interface{}, 1)
- go debounce(deploymentsChanged, deliverDeployments, debounceDuration)
+ go a.debounce(a.deploymentsChanged, deliverDeployments, debounceDuration)
for {
select {
@@ -156,46 +174,46 @@
subs := subscribers
subscribers = make(map[chan deploymentsResult]bool)
go func() {
- eTag := incrementETag()
- deployments, err := dbMan.getUnreadyDeployments()
+ eTag := a.incrementETag()
+ deployments, err := a.dbMan.getUnreadyDeployments()
log.Debugf("delivering deployments to %d subscribers", len(subs))
for subscriber := range subs {
log.Debugf("delivering to: %v", subscriber)
subscriber <- deploymentsResult{deployments, err, eTag}
}
}()
- case subscriber := <-addSubscriber:
+ case subscriber := <-a.addSubscriber:
log.Debugf("Add subscriber: %v", subscriber)
subscribers[subscriber] = true
- case subscriber := <-removeSubscriber:
+ case subscriber := <-a.removeSubscriber:
log.Debugf("Remove subscriber: %v", subscriber)
delete(subscribers, subscriber)
}
}
}
-func apiReturnBlobData(w http.ResponseWriter, r *http.Request) {
+func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
blobId := vars["blobId"]
- fs, err := dbMan.getLocalFSLocation(blobId)
+ fs, err := a.dbMan.getLocalFSLocation(blobId)
if err != nil {
- writeInternalError(w, "BlobId "+blobId+" has no mapping blob file")
+ a.writeInternalError(w, "BlobId "+blobId+" has no mapping blob file")
return
}
byte, err := ioutil.ReadFile(fs)
if err != nil {
- writeInternalError(w, err.Error())
+ a.writeInternalError(w, err.Error())
return
}
_, err = io.Copy(w, bytes.NewReader(byte))
if err != nil {
- writeInternalError(w, err.Error())
+ a.writeInternalError(w, err.Error())
}
}
-func apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) {
+func (a *apiManager) apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) {
// If returning without a bundle (immediately or after timeout), status = 404
// If returning If-None-Match value is equal to current deployment, status = 304
@@ -209,7 +227,7 @@
var err error
timeout, err = strconv.Atoi(b)
if err != nil {
- writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds")
+ a.writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds")
return
}
}
@@ -222,7 +240,7 @@
log.Debugf("if-none-match: %s", ifNoneMatch)
// send unmodified if matches prior eTag and no timeout
- eTag := getETag()
+ eTag := a.getETag()
if eTag == ifNoneMatch && timeout == 0 {
w.WriteHeader(http.StatusNotModified)
return
@@ -230,7 +248,7 @@
// send results if different eTag
if eTag != ifNoneMatch {
- sendReadyDeployments(w)
+ a.sendReadyDeployments(w)
return
}
@@ -238,7 +256,7 @@
var newDeploymentsChannel chan deploymentsResult
if timeout > 0 && ifNoneMatch != "" {
newDeploymentsChannel = make(chan deploymentsResult, 1)
- addSubscriber <- newDeploymentsChannel
+ a.addSubscriber <- newDeploymentsChannel
}
log.Debug("Blocking request... Waiting for new Deployments.")
@@ -246,49 +264,39 @@
select {
case result := <-newDeploymentsChannel:
if result.err != nil {
- writeInternalError(w, "Database error")
+ a.writeInternalError(w, "Database error")
} else {
- sendDeployments(w, result.deployments, result.eTag)
+ a.sendDeployments(w, result.deployments, result.eTag)
}
case <-time.After(time.Duration(timeout) * time.Second):
- removeSubscriber <- newDeploymentsChannel
+ a.removeSubscriber <- newDeploymentsChannel
log.Debug("Blocking deployment request timed out.")
if ifNoneMatch != "" {
w.WriteHeader(http.StatusNotModified)
} else {
- sendReadyDeployments(w)
+ a.sendReadyDeployments(w)
}
}
}
-func sendReadyDeployments(w http.ResponseWriter) {
- eTag := getETag()
- deployments, err := dbMan.getReadyDeployments()
+func (a *apiManager) sendReadyDeployments(w http.ResponseWriter) {
+ eTag := a.getETag()
+ deployments, err := a.dbMan.getReadyDeployments()
if err != nil {
- writeInternalError(w, "Database error")
+ a.writeInternalError(w, "Database error")
return
}
- sendDeployments(w, deployments, eTag)
+ a.sendDeployments(w, deployments, eTag)
}
-func getHttpHost() string {
- // apid-core has to set this according to the protocol apid is to be run: http/https
- proto := config.GetString("protocol_type")
- if proto == "" {
- proto = "http"
- }
- proto = proto + "://" + config.GetString("api_listen")
- return proto
-}
-
-func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
+func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
apiDeps := ApiDeploymentResponse{}
apiDepDetails := make([]ApiDeploymentDetails, 0)
- apiDeps.Kind = "Collections"
- apiDeps.Self = getHttpHost() + "/configurations"
+ apiDeps.Kind = kindCollection
+ apiDeps.Self = getHttpHost() + a.deploymentsEndpoint
for _, d := range dataDeps {
apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{
@@ -297,7 +305,7 @@
Type: d.Type,
Org: d.OrgID,
Env: d.EnvID,
- Scope: getDeploymentScope(),
+ Scope: a.getDeploymentScope(),
Revision: d.Revision,
BlobId: d.GWBlobID,
BlobURL: d.BlobURL,
@@ -321,18 +329,18 @@
}
// call whenever the list of deployments changes
-func incrementETag() string {
- e := atomic.AddInt64(&eTag, 1)
+func (a *apiManager) incrementETag() string {
+ e := atomic.AddInt64(&a.eTag, 1)
return strconv.FormatInt(e, 10)
}
-func getETag() string {
- e := atomic.LoadInt64(&eTag)
+func (a *apiManager) getETag() string {
+ e := atomic.LoadInt64(&a.eTag)
return strconv.FormatInt(e, 10)
}
// TODO
-func getDeploymentScope() string {
+func (a *apiManager) getDeploymentScope() string {
return ""
}
@@ -350,3 +358,13 @@
log.Error("convertTime: Unsupported time format: " + t)
return t
}
+
+func getHttpHost() string {
+ // apid-core has to set this according to the protocol apid is to be run: http/https
+ proto := config.GetString("protocol_type")
+ if proto == "" {
+ proto = "http"
+ }
+ proto = proto + "://" + config.GetString("api_listen")
+ return proto
+}
diff --git a/api_test.go b/api_test.go
index 150601d..aceb65d 100644
--- a/api_test.go
+++ b/api_test.go
@@ -21,18 +21,46 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "strconv"
+ "time"
+)
+
+const (
+ testUrl = "http://127.0.0.1:9000"
)
var _ = Describe("api", func() {
Context("GET /deployments", func() {
+ var testCount int
+ var testApiMan *apiManager
+
+ var _ = BeforeEach(func() {
+ testCount += 1
+ dbMan := &dummyDbMan{}
+ testApiMan = &apiManager{
+ dbMan: dbMan,
+ deploymentsEndpoint: deploymentsEndpoint + strconv.Itoa(testCount),
+ blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}",
+ eTag: int64(testCount * 10),
+ deploymentsChanged: make(chan interface{}, 5),
+ addSubscriber: make(chan chan deploymentsResult),
+ removeSubscriber: make(chan chan deploymentsResult),
+ }
+ testApiMan.InitAPI()
+ time.Sleep(100 * time.Millisecond)
+ })
+
+ var _ = AfterEach(func() {
+ testApiMan = nil
+ })
It("should get empty set if no deployments", func() {
- //only called once
- InitAPI()
+ uri, err := url.Parse(testUrl)
+ Expect(err).Should(Succeed())
+ uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
+ log.Debug("uri string: " + uri.String())
+ log.Debug("port: " + config.GetString("api_port"))
- var uri url.URL
- uri = *apiServerBaseURI
- uri.Path = deploymentsEndpoint
res, err := http.Get(uri.String())
Expect(err).Should(Succeed())
defer res.Body.Close()
@@ -44,15 +72,17 @@
Expect(err).ShouldNot(HaveOccurred())
json.Unmarshal(body, &depRes)
- log.Error(depRes)
+ Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0))
+ Expect(depRes.Kind).Should(Equal(kindCollection))
+ Expect(depRes.Self).Should(Equal(testUrl + deploymentsEndpoint + strconv.Itoa(testCount)))
- //Expect(len(depRes)).To(Equal(0))
- //Expect(string(body)).Should(Equal("[]"))
})
})
})
type dummyDbMan struct {
+ unreadyDeployments []DataDeployment
+ readyDeployments []DataDeployment
}
func (d *dummyDbMan) setDbVersion(version string) {
@@ -64,7 +94,7 @@
}
func (d *dummyDbMan) getUnreadyDeployments() ([]DataDeployment, error) {
- return nil, nil
+ return d.unreadyDeployments, nil
}
func (d *dummyDbMan) getReadyDeployments() ([]DataDeployment, error) {
diff --git a/apidGatewayConfDeploy_suite_test.go b/apidGatewayConfDeploy_suite_test.go
index 165c3a5..daa2496 100644
--- a/apidGatewayConfDeploy_suite_test.go
+++ b/apidGatewayConfDeploy_suite_test.go
@@ -15,6 +15,12 @@
tmpDir string
)
+const (
+ configLevel = "log_level"
+ localStoragePathKey = "local_storage_path"
+ configBlobServerPort = "5555"
+)
+
var _ = BeforeSuite(func() {
apid.Initialize(factory.DefaultServicesFactory())
config := apid.Config()
@@ -22,14 +28,20 @@
tmpDir, err = ioutil.TempDir("", "api_test")
Expect(err).NotTo(HaveOccurred())
- config.Set("local_storage_path", tmpDir)
+ config.Set(configLevel, "debug")
+ config.Set(configBlobServerBaseURI, "http://localhost:"+configBlobServerPort)
+ config.Set(localStoragePathKey, tmpDir)
config.Set(configApidInstanceID, "INSTANCE_ID")
config.Set(configApidClusterID, "CLUSTER_ID")
config.Set(configApiServerBaseURI, "http://localhost")
config.Set(configDebounceDuration, "1ms")
config.Set(configDownloadQueueSize, 1)
config.Set(configBundleCleanupDelay, time.Millisecond)
-})
+ apid.InitializePlugins("0.0.0")
+ go apid.API().Listen()
+ time.Sleep(1 * time.Second)
+ log.Debug("initialized")
+}, 2)
var _ = AfterSuite(func() {
apid.Events().Close()
diff --git a/bundle.go b/bundle.go
index 86fa7c8..d104722 100644
--- a/bundle.go
+++ b/bundle.go
@@ -31,10 +31,6 @@
BLOBSTORE_URI = "/v1/blobstore/signeduri"
)
-var (
- bundleMan bundleManagerInterface
-)
-
type bundleManagerInterface interface {
initializeBundleDownloading()
queueDownloadRequest(*DataDeployment)
@@ -44,6 +40,8 @@
}
type bundleManager struct {
+ dbMan dbManagerInterface
+ apiMan apiManagerInterface
concurrentDownloads int
markDeploymentFailedAfter time.Duration
bundleDownloadConnTimeout time.Duration
@@ -63,6 +61,7 @@
worker := BundleDownloader{
id: i + 1,
workChan: make(chan *DownloadRequest),
+ bm: bm,
}
bm.workers[i] = &worker
worker.Start()
@@ -75,6 +74,7 @@
maxBackOff := 5 * time.Minute
markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter)
req := &DownloadRequest{
+ bm: bm,
dep: dep,
bundleFile: getBundleFile(dep),
backoffFunc: createBackoff(retryIn, maxBackOff),
@@ -117,6 +117,7 @@
}
type DownloadRequest struct {
+ bm *bundleManager
dep *DataDeployment
bundleFile string
backoffFunc func()
@@ -152,7 +153,7 @@
blobId := atomic.AddInt64(&gwBlobId, 1)
blobIds := strconv.FormatInt(blobId, 10)
- err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile)
+ err = r.bm.dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile)
if err != nil {
return err
}
@@ -161,7 +162,7 @@
log.Debugf("bundle for depId=%s downloaded: blobId=%s", dep.ID, dep.BlobID)
// send deployments to client
- deploymentsChanged <- dep.ID
+ r.bm.apiMan.addChangedDeployment(dep.ID)
return nil
}
diff --git a/data.go b/data.go
index 0be62ca..267ed15 100644
--- a/data.go
+++ b/data.go
@@ -21,7 +21,6 @@
)
var (
- dbMan dbManagerInterface
gwBlobId int64
)
@@ -163,7 +162,7 @@
func (dbc *dbManager) updateLocalFsLocation(depID, bundleId, localFsLocation string) error {
- access_url := getHttpHost() + "/blob/" + bundleId
+ access_url := getHttpHost() + blobEndpointPath + "/" + bundleId
stmt, err := dbc.getDb().Prepare(`
INSERT INTO edgex_blob_available (runtime_meta_id, gwblobid, local_fs_location, access_url)
VALUES (?, ?, ?, ?)`)
diff --git a/init.go b/init.go
index 552ad3a..393e2a5 100644
--- a/init.go
+++ b/init.go
@@ -114,23 +114,27 @@
return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout)
}
- concurrentDownloads := config.GetInt(configConcurrentDownloads)
- downloadQueueSize := config.GetInt(configDownloadQueueSize)
- bundleMan = &bundleManager{
- concurrentDownloads: concurrentDownloads,
- markDeploymentFailedAfter: markDeploymentFailedAfter,
- bundleDownloadConnTimeout: bundleDownloadConnTimeout,
- bundleRetryDelay: time.Second,
- bundleCleanupDelay: bundleCleanupDelay,
- downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
- isClosed: new(int32),
- }
+ // initialize db manager
- dbMan = &dbManager{
+ dbMan := &dbManager{
data: services.Data(),
dbMux: sync.RWMutex{},
}
+ // initialize api manager
+
+ apiMan := &apiManager{
+ dbMan: dbMan,
+ deploymentsEndpoint: deploymentsEndpoint,
+ blobEndpoint: blobEndpoint,
+ eTag: 0,
+ deploymentsChanged: make(chan interface{}, 5),
+ addSubscriber: make(chan chan deploymentsResult),
+ removeSubscriber: make(chan chan deploymentsResult),
+ }
+
+ // initialize bundle manager
+
blobServerURL = config.GetString(configBlobServerBaseURI)
relativeBundlePath := config.GetString(configBundleDirKey)
storagePath := config.GetString("local_storage_path")
@@ -139,14 +143,30 @@
return pluginData, fmt.Errorf("Failed bundle directory creation: %v", err)
}
log.Infof("Bundle directory path is %s", bundlePath)
+ concurrentDownloads := config.GetInt(configConcurrentDownloads)
+ downloadQueueSize := config.GetInt(configDownloadQueueSize)
+ bundleMan := &bundleManager{
+ dbMan: dbMan,
+ apiMan: apiMan,
+ concurrentDownloads: concurrentDownloads,
+ markDeploymentFailedAfter: markDeploymentFailedAfter,
+ bundleDownloadConnTimeout: bundleDownloadConnTimeout,
+ bundleRetryDelay: time.Second,
+ bundleCleanupDelay: bundleCleanupDelay,
+ downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
+ isClosed: new(int32),
+ }
bundleMan.initializeBundleDownloading()
+ go apiMan.distributeEvents()
- go distributeEvents()
-
- initListener(services)
+ initListener(services, dbMan, apiMan, bundleMan)
log.Debug("end init")
return pluginData, nil
}
+
+func setServices() {
+
+}
diff --git a/listener.go b/listener.go
index 77c30a5..12f5740 100644
--- a/listener.go
+++ b/listener.go
@@ -28,8 +28,14 @@
var apiInitialized bool
-func initListener(services apid.Services) {
- services.Events().Listen(APIGEE_SYNC_EVENT, &apigeeSyncHandler{})
+func initListener(services apid.Services, dbMan dbManagerInterface, apiMan apiManagerInterface, bundleMan bundleManagerInterface) {
+ handler := &apigeeSyncHandler{
+ dbMan: dbMan,
+ apiMan: apiMan,
+ bundleMan: bundleMan,
+ }
+
+ services.Events().Listen(APIGEE_SYNC_EVENT, handler)
}
type bundleConfigJson struct {
@@ -40,6 +46,9 @@
}
type apigeeSyncHandler struct {
+ dbMan dbManagerInterface
+ apiMan apiManagerInterface
+ bundleMan bundleManagerInterface
}
func (h *apigeeSyncHandler) String() string {
@@ -49,43 +58,43 @@
func (h *apigeeSyncHandler) Handle(e apid.Event) {
if changeSet, ok := e.(*common.ChangeList); ok {
- processChangeList(changeSet)
+ h.processChangeList(changeSet)
} else if snapData, ok := e.(*common.Snapshot); ok {
- processSnapshot(snapData)
+ h.processSnapshot(snapData)
} else {
log.Debugf("Received invalid event. Ignoring. %v", e)
}
}
-func processSnapshot(snapshot *common.Snapshot) {
+func (h *apigeeSyncHandler) processSnapshot(snapshot *common.Snapshot) {
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
- dbMan.setDbVersion(snapshot.SnapshotInfo)
+ h.dbMan.setDbVersion(snapshot.SnapshotInfo)
- startupOnExistingDatabase()
+ h.startupOnExistingDatabase()
if !apiInitialized {
- InitAPI()
+ h.apiMan.InitAPI()
}
log.Debug("Snapshot processed")
}
-func startupOnExistingDatabase() {
+func (h *apigeeSyncHandler) startupOnExistingDatabase() {
// start bundle downloads that didn't finish
go func() {
- deployments, err := dbMan.getUnreadyDeployments()
+ deployments, err := h.dbMan.getUnreadyDeployments()
if err != nil && err != sql.ErrNoRows {
log.Panicf("unable to query database for unready deployments: %v", err)
}
log.Debugf("Queuing %d deployments for bundle download", len(deployments))
for _, dep := range deployments {
- go bundleMan.queueDownloadRequest(&dep)
+ go h.bundleMan.queueDownloadRequest(&dep)
}
}()
}
-func processChangeList(changes *common.ChangeList) {
+func (h *apigeeSyncHandler) processChangeList(changes *common.ChangeList) {
log.Debugf("Processing changes")
// changes have been applied to DB
@@ -112,17 +121,17 @@
}
for _, d := range deletedDeployments {
- deploymentsChanged <- d.ID
+ h.apiMan.addChangedDeployment(d.ID)
}
for _, dep := range insertedDeployments {
- go bundleMan.queueDownloadRequest(&dep)
+ go h.bundleMan.queueDownloadRequest(&dep)
}
// clean up old bundles
if len(deletedDeployments) > 0 {
log.Debugf("will delete %d old bundles", len(deletedDeployments))
- bundleMan.deleteBundles(deletedDeployments)
+ h.bundleMan.deleteBundles(deletedDeployments)
}
}