reformat api.go, listener.go and data.go, rewrite bundle.go
diff --git a/api.go b/api.go index 5aea7f2..3d4fe23 100644 --- a/api.go +++ b/api.go
@@ -47,19 +47,16 @@ changeTimeFormat = "2006-01-02 15:04:05.999" ) +const ( + kindCollection = "Collection" +) + type deploymentsResult struct { deployments []DataDeployment err error eTag string } -var ( - deploymentsChanged = make(chan interface{}, 5) - addSubscriber = make(chan chan deploymentsResult) - removeSubscriber = make(chan chan deploymentsResult) - eTag int64 -) - type errorResponse struct { ErrorCode int `json:"errorCode"` Reason string `json:"reason"` @@ -87,15 +84,36 @@ } const deploymentsEndpoint = "/configurations" -const BlobEndpoint = "/blob/{blobId}" +const blobEndpointPath = "/blob" +const blobEndpoint = blobEndpointPath + "/{blobId}" -func InitAPI() { - log.Debug("API endpoints initialized") - services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET") - services.API().HandleFunc(BlobEndpoint, apiReturnBlobData).Methods("GET") +type apiManagerInterface interface { + InitAPI() + addChangedDeployment(string) + distributeEvents() } -func writeError(w http.ResponseWriter, status int, code int, reason string) { +type apiManager struct { + dbMan dbManagerInterface + deploymentsEndpoint string + blobEndpoint string + eTag int64 + deploymentsChanged chan interface{} + addSubscriber chan chan deploymentsResult + removeSubscriber chan chan deploymentsResult +} + +func (a *apiManager) InitAPI() { + log.Debug("API endpoints initialized") + services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET") + services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET") +} + +func (a *apiManager) addChangedDeployment(id string) { + a.deploymentsChanged <- id +} + +func (a *apiManager) writeError(w http.ResponseWriter, status int, code int, reason string) { w.WriteHeader(status) e := errorResponse{ ErrorCode: code, @@ -110,11 +128,11 @@ log.Debugf("sending %d error to client: %s", status, reason) } -func writeInternalError(w http.ResponseWriter, err string) { - writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err) +func (a *apiManager) writeInternalError(w http.ResponseWriter, err string) { + a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err) } -func debounce(in chan interface{}, out chan []interface{}, window time.Duration) { +func (a *apiManager) debounce(in chan interface{}, out chan []interface{}, window time.Duration) { send := func(toSend []interface{}) { if toSend != nil { log.Debugf("debouncer sending: %v", toSend) @@ -141,11 +159,11 @@ } } -func distributeEvents() { +func (a *apiManager) distributeEvents() { subscribers := make(map[chan deploymentsResult]bool) deliverDeployments := make(chan []interface{}, 1) - go debounce(deploymentsChanged, deliverDeployments, debounceDuration) + go a.debounce(a.deploymentsChanged, deliverDeployments, debounceDuration) for { select { @@ -156,46 +174,46 @@ subs := subscribers subscribers = make(map[chan deploymentsResult]bool) go func() { - eTag := incrementETag() - deployments, err := dbMan.getUnreadyDeployments() + eTag := a.incrementETag() + deployments, err := a.dbMan.getUnreadyDeployments() log.Debugf("delivering deployments to %d subscribers", len(subs)) for subscriber := range subs { log.Debugf("delivering to: %v", subscriber) subscriber <- deploymentsResult{deployments, err, eTag} } }() - case subscriber := <-addSubscriber: + case subscriber := <-a.addSubscriber: log.Debugf("Add subscriber: %v", subscriber) subscribers[subscriber] = true - case subscriber := <-removeSubscriber: + case subscriber := <-a.removeSubscriber: log.Debugf("Remove subscriber: %v", subscriber) delete(subscribers, subscriber) } } } -func apiReturnBlobData(w http.ResponseWriter, r *http.Request) { +func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) blobId := vars["blobId"] - fs, err := dbMan.getLocalFSLocation(blobId) + fs, err := a.dbMan.getLocalFSLocation(blobId) if err != nil { - writeInternalError(w, "BlobId "+blobId+" has no mapping blob file") + a.writeInternalError(w, "BlobId "+blobId+" has no mapping blob file") return } byte, err := ioutil.ReadFile(fs) if err != nil { - writeInternalError(w, err.Error()) + a.writeInternalError(w, err.Error()) return } _, err = io.Copy(w, bytes.NewReader(byte)) if err != nil { - writeInternalError(w, err.Error()) + a.writeInternalError(w, err.Error()) } } -func apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) { +func (a *apiManager) apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) { // If returning without a bundle (immediately or after timeout), status = 404 // If returning If-None-Match value is equal to current deployment, status = 304 @@ -209,7 +227,7 @@ var err error timeout, err = strconv.Atoi(b) if err != nil { - writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds") + a.writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds") return } } @@ -222,7 +240,7 @@ log.Debugf("if-none-match: %s", ifNoneMatch) // send unmodified if matches prior eTag and no timeout - eTag := getETag() + eTag := a.getETag() if eTag == ifNoneMatch && timeout == 0 { w.WriteHeader(http.StatusNotModified) return @@ -230,7 +248,7 @@ // send results if different eTag if eTag != ifNoneMatch { - sendReadyDeployments(w) + a.sendReadyDeployments(w) return } @@ -238,7 +256,7 @@ var newDeploymentsChannel chan deploymentsResult if timeout > 0 && ifNoneMatch != "" { newDeploymentsChannel = make(chan deploymentsResult, 1) - addSubscriber <- newDeploymentsChannel + a.addSubscriber <- newDeploymentsChannel } log.Debug("Blocking request... Waiting for new Deployments.") @@ -246,49 +264,39 @@ select { case result := <-newDeploymentsChannel: if result.err != nil { - writeInternalError(w, "Database error") + a.writeInternalError(w, "Database error") } else { - sendDeployments(w, result.deployments, result.eTag) + a.sendDeployments(w, result.deployments, result.eTag) } case <-time.After(time.Duration(timeout) * time.Second): - removeSubscriber <- newDeploymentsChannel + a.removeSubscriber <- newDeploymentsChannel log.Debug("Blocking deployment request timed out.") if ifNoneMatch != "" { w.WriteHeader(http.StatusNotModified) } else { - sendReadyDeployments(w) + a.sendReadyDeployments(w) } } } -func sendReadyDeployments(w http.ResponseWriter) { - eTag := getETag() - deployments, err := dbMan.getReadyDeployments() +func (a *apiManager) sendReadyDeployments(w http.ResponseWriter) { + eTag := a.getETag() + deployments, err := a.dbMan.getReadyDeployments() if err != nil { - writeInternalError(w, "Database error") + a.writeInternalError(w, "Database error") return } - sendDeployments(w, deployments, eTag) + a.sendDeployments(w, deployments, eTag) } -func getHttpHost() string { - // apid-core has to set this according to the protocol apid is to be run: http/https - proto := config.GetString("protocol_type") - if proto == "" { - proto = "http" - } - proto = proto + "://" + config.GetString("api_listen") - return proto -} - -func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) { +func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) { apiDeps := ApiDeploymentResponse{} apiDepDetails := make([]ApiDeploymentDetails, 0) - apiDeps.Kind = "Collections" - apiDeps.Self = getHttpHost() + "/configurations" + apiDeps.Kind = kindCollection + apiDeps.Self = getHttpHost() + a.deploymentsEndpoint for _, d := range dataDeps { apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{ @@ -297,7 +305,7 @@ Type: d.Type, Org: d.OrgID, Env: d.EnvID, - Scope: getDeploymentScope(), + Scope: a.getDeploymentScope(), Revision: d.Revision, BlobId: d.GWBlobID, BlobURL: d.BlobURL, @@ -321,18 +329,18 @@ } // call whenever the list of deployments changes -func incrementETag() string { - e := atomic.AddInt64(&eTag, 1) +func (a *apiManager) incrementETag() string { + e := atomic.AddInt64(&a.eTag, 1) return strconv.FormatInt(e, 10) } -func getETag() string { - e := atomic.LoadInt64(&eTag) +func (a *apiManager) getETag() string { + e := atomic.LoadInt64(&a.eTag) return strconv.FormatInt(e, 10) } // TODO -func getDeploymentScope() string { +func (a *apiManager) getDeploymentScope() string { return "" } @@ -350,3 +358,13 @@ log.Error("convertTime: Unsupported time format: " + t) return t } + +func getHttpHost() string { + // apid-core has to set this according to the protocol apid is to be run: http/https + proto := config.GetString("protocol_type") + if proto == "" { + proto = "http" + } + proto = proto + "://" + config.GetString("api_listen") + return proto +}
diff --git a/api_test.go b/api_test.go index 150601d..aceb65d 100644 --- a/api_test.go +++ b/api_test.go
@@ -21,18 +21,46 @@ . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "strconv" + "time" +) + +const ( + testUrl = "http://127.0.0.1:9000" ) var _ = Describe("api", func() { Context("GET /deployments", func() { + var testCount int + var testApiMan *apiManager + + var _ = BeforeEach(func() { + testCount += 1 + dbMan := &dummyDbMan{} + testApiMan = &apiManager{ + dbMan: dbMan, + deploymentsEndpoint: deploymentsEndpoint + strconv.Itoa(testCount), + blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}", + eTag: int64(testCount * 10), + deploymentsChanged: make(chan interface{}, 5), + addSubscriber: make(chan chan deploymentsResult), + removeSubscriber: make(chan chan deploymentsResult), + } + testApiMan.InitAPI() + time.Sleep(100 * time.Millisecond) + }) + + var _ = AfterEach(func() { + testApiMan = nil + }) It("should get empty set if no deployments", func() { - //only called once - InitAPI() + uri, err := url.Parse(testUrl) + Expect(err).Should(Succeed()) + uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + log.Debug("uri string: " + uri.String()) + log.Debug("port: " + config.GetString("api_port")) - var uri url.URL - uri = *apiServerBaseURI - uri.Path = deploymentsEndpoint res, err := http.Get(uri.String()) Expect(err).Should(Succeed()) defer res.Body.Close() @@ -44,15 +72,17 @@ Expect(err).ShouldNot(HaveOccurred()) json.Unmarshal(body, &depRes) - log.Error(depRes) + Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0)) + Expect(depRes.Kind).Should(Equal(kindCollection)) + Expect(depRes.Self).Should(Equal(testUrl + deploymentsEndpoint + strconv.Itoa(testCount))) - //Expect(len(depRes)).To(Equal(0)) - //Expect(string(body)).Should(Equal("[]")) }) }) }) type dummyDbMan struct { + unreadyDeployments []DataDeployment + readyDeployments []DataDeployment } func (d *dummyDbMan) setDbVersion(version string) { @@ -64,7 +94,7 @@ } func (d *dummyDbMan) getUnreadyDeployments() ([]DataDeployment, error) { - return nil, nil + return d.unreadyDeployments, nil } func (d *dummyDbMan) getReadyDeployments() ([]DataDeployment, error) {
diff --git a/apidGatewayConfDeploy_suite_test.go b/apidGatewayConfDeploy_suite_test.go index 165c3a5..daa2496 100644 --- a/apidGatewayConfDeploy_suite_test.go +++ b/apidGatewayConfDeploy_suite_test.go
@@ -15,6 +15,12 @@ tmpDir string ) +const ( + configLevel = "log_level" + localStoragePathKey = "local_storage_path" + configBlobServerPort = "5555" +) + var _ = BeforeSuite(func() { apid.Initialize(factory.DefaultServicesFactory()) config := apid.Config() @@ -22,14 +28,20 @@ tmpDir, err = ioutil.TempDir("", "api_test") Expect(err).NotTo(HaveOccurred()) - config.Set("local_storage_path", tmpDir) + config.Set(configLevel, "debug") + config.Set(configBlobServerBaseURI, "http://localhost:"+configBlobServerPort) + config.Set(localStoragePathKey, tmpDir) config.Set(configApidInstanceID, "INSTANCE_ID") config.Set(configApidClusterID, "CLUSTER_ID") config.Set(configApiServerBaseURI, "http://localhost") config.Set(configDebounceDuration, "1ms") config.Set(configDownloadQueueSize, 1) config.Set(configBundleCleanupDelay, time.Millisecond) -}) + apid.InitializePlugins("0.0.0") + go apid.API().Listen() + time.Sleep(1 * time.Second) + log.Debug("initialized") +}, 2) var _ = AfterSuite(func() { apid.Events().Close()
diff --git a/bundle.go b/bundle.go index 86fa7c8..d104722 100644 --- a/bundle.go +++ b/bundle.go
@@ -31,10 +31,6 @@ BLOBSTORE_URI = "/v1/blobstore/signeduri" ) -var ( - bundleMan bundleManagerInterface -) - type bundleManagerInterface interface { initializeBundleDownloading() queueDownloadRequest(*DataDeployment) @@ -44,6 +40,8 @@ } type bundleManager struct { + dbMan dbManagerInterface + apiMan apiManagerInterface concurrentDownloads int markDeploymentFailedAfter time.Duration bundleDownloadConnTimeout time.Duration @@ -63,6 +61,7 @@ worker := BundleDownloader{ id: i + 1, workChan: make(chan *DownloadRequest), + bm: bm, } bm.workers[i] = &worker worker.Start() @@ -75,6 +74,7 @@ maxBackOff := 5 * time.Minute markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) req := &DownloadRequest{ + bm: bm, dep: dep, bundleFile: getBundleFile(dep), backoffFunc: createBackoff(retryIn, maxBackOff), @@ -117,6 +117,7 @@ } type DownloadRequest struct { + bm *bundleManager dep *DataDeployment bundleFile string backoffFunc func() @@ -152,7 +153,7 @@ blobId := atomic.AddInt64(&gwBlobId, 1) blobIds := strconv.FormatInt(blobId, 10) - err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile) + err = r.bm.dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile) if err != nil { return err } @@ -161,7 +162,7 @@ log.Debugf("bundle for depId=%s downloaded: blobId=%s", dep.ID, dep.BlobID) // send deployments to client - deploymentsChanged <- dep.ID + r.bm.apiMan.addChangedDeployment(dep.ID) return nil }
diff --git a/data.go b/data.go index 0be62ca..267ed15 100644 --- a/data.go +++ b/data.go
@@ -21,7 +21,6 @@ ) var ( - dbMan dbManagerInterface gwBlobId int64 ) @@ -163,7 +162,7 @@ func (dbc *dbManager) updateLocalFsLocation(depID, bundleId, localFsLocation string) error { - access_url := getHttpHost() + "/blob/" + bundleId + access_url := getHttpHost() + blobEndpointPath + "/" + bundleId stmt, err := dbc.getDb().Prepare(` INSERT INTO edgex_blob_available (runtime_meta_id, gwblobid, local_fs_location, access_url) VALUES (?, ?, ?, ?)`)
diff --git a/init.go b/init.go index 552ad3a..393e2a5 100644 --- a/init.go +++ b/init.go
@@ -114,23 +114,27 @@ return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout) } - concurrentDownloads := config.GetInt(configConcurrentDownloads) - downloadQueueSize := config.GetInt(configDownloadQueueSize) - bundleMan = &bundleManager{ - concurrentDownloads: concurrentDownloads, - markDeploymentFailedAfter: markDeploymentFailedAfter, - bundleDownloadConnTimeout: bundleDownloadConnTimeout, - bundleRetryDelay: time.Second, - bundleCleanupDelay: bundleCleanupDelay, - downloadQueue: make(chan *DownloadRequest, downloadQueueSize), - isClosed: new(int32), - } + // initialize db manager - dbMan = &dbManager{ + dbMan := &dbManager{ data: services.Data(), dbMux: sync.RWMutex{}, } + // initialize api manager + + apiMan := &apiManager{ + dbMan: dbMan, + deploymentsEndpoint: deploymentsEndpoint, + blobEndpoint: blobEndpoint, + eTag: 0, + deploymentsChanged: make(chan interface{}, 5), + addSubscriber: make(chan chan deploymentsResult), + removeSubscriber: make(chan chan deploymentsResult), + } + + // initialize bundle manager + blobServerURL = config.GetString(configBlobServerBaseURI) relativeBundlePath := config.GetString(configBundleDirKey) storagePath := config.GetString("local_storage_path") @@ -139,14 +143,30 @@ return pluginData, fmt.Errorf("Failed bundle directory creation: %v", err) } log.Infof("Bundle directory path is %s", bundlePath) + concurrentDownloads := config.GetInt(configConcurrentDownloads) + downloadQueueSize := config.GetInt(configDownloadQueueSize) + bundleMan := &bundleManager{ + dbMan: dbMan, + apiMan: apiMan, + concurrentDownloads: concurrentDownloads, + markDeploymentFailedAfter: markDeploymentFailedAfter, + bundleDownloadConnTimeout: bundleDownloadConnTimeout, + bundleRetryDelay: time.Second, + bundleCleanupDelay: bundleCleanupDelay, + downloadQueue: make(chan *DownloadRequest, downloadQueueSize), + isClosed: new(int32), + } bundleMan.initializeBundleDownloading() + go apiMan.distributeEvents() - go distributeEvents() - - initListener(services) + initListener(services, dbMan, apiMan, bundleMan) log.Debug("end init") return pluginData, nil } + +func setServices() { + +}
diff --git a/listener.go b/listener.go index 77c30a5..12f5740 100644 --- a/listener.go +++ b/listener.go
@@ -28,8 +28,14 @@ var apiInitialized bool -func initListener(services apid.Services) { - services.Events().Listen(APIGEE_SYNC_EVENT, &apigeeSyncHandler{}) +func initListener(services apid.Services, dbMan dbManagerInterface, apiMan apiManagerInterface, bundleMan bundleManagerInterface) { + handler := &apigeeSyncHandler{ + dbMan: dbMan, + apiMan: apiMan, + bundleMan: bundleMan, + } + + services.Events().Listen(APIGEE_SYNC_EVENT, handler) } type bundleConfigJson struct { @@ -40,6 +46,9 @@ } type apigeeSyncHandler struct { + dbMan dbManagerInterface + apiMan apiManagerInterface + bundleMan bundleManagerInterface } func (h *apigeeSyncHandler) String() string { @@ -49,43 +58,43 @@ func (h *apigeeSyncHandler) Handle(e apid.Event) { if changeSet, ok := e.(*common.ChangeList); ok { - processChangeList(changeSet) + h.processChangeList(changeSet) } else if snapData, ok := e.(*common.Snapshot); ok { - processSnapshot(snapData) + h.processSnapshot(snapData) } else { log.Debugf("Received invalid event. Ignoring. %v", e) } } -func processSnapshot(snapshot *common.Snapshot) { +func (h *apigeeSyncHandler) processSnapshot(snapshot *common.Snapshot) { log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo) - dbMan.setDbVersion(snapshot.SnapshotInfo) + h.dbMan.setDbVersion(snapshot.SnapshotInfo) - startupOnExistingDatabase() + h.startupOnExistingDatabase() if !apiInitialized { - InitAPI() + h.apiMan.InitAPI() } log.Debug("Snapshot processed") } -func startupOnExistingDatabase() { +func (h *apigeeSyncHandler) startupOnExistingDatabase() { // start bundle downloads that didn't finish go func() { - deployments, err := dbMan.getUnreadyDeployments() + deployments, err := h.dbMan.getUnreadyDeployments() if err != nil && err != sql.ErrNoRows { log.Panicf("unable to query database for unready deployments: %v", err) } log.Debugf("Queuing %d deployments for bundle download", len(deployments)) for _, dep := range deployments { - go bundleMan.queueDownloadRequest(&dep) + go h.bundleMan.queueDownloadRequest(&dep) } }() } -func processChangeList(changes *common.ChangeList) { +func (h *apigeeSyncHandler) processChangeList(changes *common.ChangeList) { log.Debugf("Processing changes") // changes have been applied to DB @@ -112,17 +121,17 @@ } for _, d := range deletedDeployments { - deploymentsChanged <- d.ID + h.apiMan.addChangedDeployment(d.ID) } for _, dep := range insertedDeployments { - go bundleMan.queueDownloadRequest(&dep) + go h.bundleMan.queueDownloadRequest(&dep) } // clean up old bundles if len(deletedDeployments) > 0 { log.Debugf("will delete %d old bundles", len(deletedDeployments)) - bundleMan.deleteBundles(deletedDeployments) + h.bundleMan.deleteBundles(deletedDeployments) } }