format response
diff --git a/api.go b/api.go
index 5f61078..5054d98 100644
--- a/api.go
+++ b/api.go
@@ -1,3 +1,16 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
package apiGatewayConfDeploy
import (
@@ -31,6 +44,7 @@
sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST"
iso8601 = "2006-01-02T15:04:05.999Z07:00"
sqliteTimeFormat = "2006-01-02 15:04:05.999-07:00"
+ changeTimeFormat = "2006-01-02 15:04:05.999"
)
type deploymentsResult struct {
@@ -54,27 +68,29 @@
type ApiDeploymentDetails struct {
Self string `json:"self"`
Name string `json:"name"`
- Org string `json:"org"`
- Env string `json:"env"`
Type string `json:"type"`
- BlobURL string `json:"bloburl"`
+ Org string `json:"organization"`
+ Env string `json:"environment"`
+ Scope string `json:"scope"`
Revision string `json:"revision"`
BlobId string `json:"blobId"`
+ BlobURL string `json:"bloburl"`
ResourceBlobId string `json:"resourceBlobId"`
Created string `json:"created"`
Updated string `json:"updated"`
}
type ApiDeploymentResponse struct {
- Kind string `json:"kind"`
- Self string `json:"self"`
- ApiDeploymentResponse []ApiDeploymentDetails `json:"contents"`
+ Kind string `json:"kind"`
+ Self string `json:"self"`
+ ApiDeploymentsResponse []ApiDeploymentDetails `json:"contents"`
}
const deploymentsEndpoint = "/configurations"
const BlobEndpoint = "/blob/{blobId}"
func InitAPI() {
+ log.Debug("API endpoints initialized")
services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
services.API().HandleFunc(BlobEndpoint, apiReturnBlobData).Methods("GET")
}
@@ -126,7 +142,7 @@
}
func distributeEvents() {
- subscribers := make(map[chan deploymentsResult]struct{})
+ subscribers := make(map[chan deploymentsResult]bool)
deliverDeployments := make(chan []interface{}, 1)
go debounce(deploymentsChanged, deliverDeployments, debounceDuration)
@@ -138,10 +154,10 @@
return // todo: using this?
}
subs := subscribers
- subscribers = make(map[chan deploymentsResult]struct{})
+ subscribers = make(map[chan deploymentsResult]bool)
go func() {
eTag := incrementETag()
- deployments, err := getUnreadyDeployments()
+ deployments, err := dbMan.getUnreadyDeployments()
log.Debugf("delivering deployments to %d subscribers", len(subs))
for subscriber := range subs {
log.Debugf("delivering to: %v", subscriber)
@@ -150,7 +166,7 @@
}()
case subscriber := <-addSubscriber:
log.Debugf("Add subscriber: %v", subscriber)
- subscribers[subscriber] = struct{}{}
+ subscribers[subscriber] = true
case subscriber := <-removeSubscriber:
log.Debugf("Remove subscriber: %v", subscriber)
delete(subscribers, subscriber)
@@ -162,7 +178,7 @@
vars := mux.Vars(r)
blobId := vars["blobId"]
- fs, err := getLocalFSLocation(blobId)
+ fs, err := dbMan.getLocalFSLocation(blobId)
if err != nil {
writeInternalError(w, "BlobId "+blobId+" has no mapping blob file")
return
@@ -248,7 +264,7 @@
func sendReadyDeployments(w http.ResponseWriter) {
eTag := getETag()
- deployments, err := getReadyDeployments()
+ deployments, err := dbMan.getReadyDeployments()
if err != nil {
writeInternalError(w, "Database error")
return
@@ -256,7 +272,7 @@
sendDeployments(w, deployments, eTag)
}
-func get_http_host() string {
+func getHttpHost() string {
// apid-core has to set this according to the protocol apid is to be run: http/https
proto := config.GetString("protocol_type")
if proto == "" {
@@ -269,25 +285,28 @@
func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
apiDeps := ApiDeploymentResponse{}
- apiDepDetails := []ApiDeploymentDetails{}
+ apiDepDetails := make([]ApiDeploymentDetails, 0)
apiDeps.Kind = "Collections"
- apiDeps.Self = get_http_host() + "/configurations"
+ apiDeps.Self = getHttpHost() + "/configurations"
for _, d := range dataDeps {
apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{
+ Self: apiDeps.Self + "/" + d.ID,
+ Name: d.Name,
+ Type: d.Type,
Org: d.OrgID,
Env: d.EnvID,
+ Scope: getDeploymentScope(),
Revision: d.Revision,
BlobId: d.GWBlobID,
- ResourceBlobId: d.BlobResourceID,
- Created: d.Created,
- Updated: d.Updated,
- Type: d.Type,
BlobURL: d.BlobURL,
+ ResourceBlobId: d.BlobResourceID,
+ Created: convertTime(d.Created),
+ Updated: convertTime(d.Updated),
})
}
- apiDeps.ApiDeploymentResponse = apiDepDetails
+ apiDeps.ApiDeploymentsResponse = apiDepDetails
b, err := json.Marshal(apiDeps)
if err != nil {
@@ -311,3 +330,24 @@
e := atomic.LoadInt64(&eTag)
return strconv.FormatInt(e, 10)
}
+
+// TODO
+func getDeploymentScope() string{
+ return ""
+}
+
+func convertTime(t string) string {
+ if t == "" {
+ return ""
+ }
+ formats := []string{sqliteTimeFormat, sqlTimeFormat, iso8601, time.RFC3339, changeTimeFormat}
+ for _, f := range formats {
+ timestamp, err := time.Parse(f, t)
+ if err == nil {
+ return timestamp.Format(iso8601)
+ }
+ }
+ log.Error("convertTime: Unsupported time format: " + t)
+ return t
+}
+
diff --git a/api_test.go b/api_test.go
new file mode 100644
index 0000000..fba8e0c
--- /dev/null
+++ b/api_test.go
@@ -0,0 +1,237 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apiGatewayConfDeploy
+import (
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+
+
+
+
+var _ = Describe("api", func() {
+ Context("GET /deployments", func() {
+
+ It("should get empty set if no deployments", func() {
+
+ uri, err := url.Parse(apiServerBaseURI)
+ Expect(err).Should(Succeed())
+ uri.Path = deploymentsEndpoint
+ res, err := http.Get(uri.String())
+ Expect(err).Should(Succeed())
+ defer res.Body.Close()
+
+ Expect(res.StatusCode).Should(Equal(http.StatusOK))
+
+ var depRes ApiDeploymentResponse
+ body, err := ioutil.ReadAll(res.Body)
+ Expect(err).ShouldNot(HaveOccurred())
+ json.Unmarshal(body, &depRes)
+
+ log.Error(depRes)
+
+ //Expect(len(depRes)).To(Equal(0))
+ //Expect(string(body)).Should(Equal("[]"))
+ })
+ /*
+ It("should debounce requests", func(done Done) {
+ var in = make(chan interface{})
+ var out = make(chan []interface{})
+
+ go debounce(in, out, 3*time.Millisecond)
+
+ go func() {
+ defer GinkgoRecover()
+
+ received, ok := <-out
+ Expect(ok).To(BeTrue())
+ Expect(len(received)).To(Equal(2))
+
+ close(in)
+ received, ok = <-out
+ Expect(ok).To(BeFalse())
+
+ close(done)
+ }()
+
+ in <- "x"
+ in <- "y"
+ })
+
+ It("should get current deployments", func() {
+
+ deploymentID := "api_get_current"
+ insertTestDeployment(testServer, deploymentID)
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = deploymentsEndpoint
+
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+
+ Expect(res.StatusCode).Should(Equal(http.StatusOK))
+
+ var depRes ApiDeploymentResponse
+ body, err := ioutil.ReadAll(res.Body)
+ Expect(err).ShouldNot(HaveOccurred())
+ json.Unmarshal(body, &depRes)
+
+ Expect(len(depRes)).To(Equal(1))
+
+ dep := depRes[0]
+
+ Expect(dep.ID).To(Equal(deploymentID))
+ Expect(dep.ScopeId).To(Equal(deploymentID))
+ Expect(dep.DisplayName).To(Equal(deploymentID))
+
+ var config bundleConfigJson
+
+ err = json.Unmarshal(dep.ConfigJson, &config)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(config.Name).To(Equal("/bundles/1"))
+
+ err = json.Unmarshal(dep.BundleConfigJson, &config)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(config.Name).To(Equal("/bundles/1"))
+ })
+
+ It("should get 304 for no change", func() {
+
+ deploymentID := "api_no_change"
+ insertTestDeployment(testServer, deploymentID)
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = deploymentsEndpoint
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
+
+ req, err := http.NewRequest("GET", uri.String(), nil)
+ req.Header.Add("Content-Type", "application/json")
+ req.Header.Add("If-None-Match", res.Header.Get("etag"))
+
+ res, err = http.DefaultClient.Do(req)
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.StatusCode).To(Equal(http.StatusNotModified))
+ })
+
+ It("should get empty set after blocking if no deployments", func() {
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = deploymentsEndpoint
+
+ query := uri.Query()
+ query.Add("block", "1")
+ uri.RawQuery = query.Encode()
+
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+
+ var depRes ApiDeploymentResponse
+ body, err := ioutil.ReadAll(res.Body)
+ Expect(err).ShouldNot(HaveOccurred())
+ json.Unmarshal(body, &depRes)
+
+ Expect(res.StatusCode).Should(Equal(http.StatusOK))
+ Expect(string(body)).Should(Equal("[]"))
+ })
+
+ It("should get new deployment set after blocking", func(done Done) {
+
+ deploymentID := "api_get_current_blocking"
+ insertTestDeployment(testServer, deploymentID)
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = deploymentsEndpoint
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ eTag := res.Header.Get("etag")
+ Expect(eTag).ShouldNot(BeEmpty())
+
+ deploymentID = "api_get_current_blocking2"
+ go func() {
+ defer GinkgoRecover()
+
+ query := uri.Query()
+ query.Add("block", "1")
+ uri.RawQuery = query.Encode()
+ req, err := http.NewRequest("GET", uri.String(), nil)
+ req.Header.Add("Content-Type", "application/json")
+ req.Header.Add("If-None-Match", eTag)
+
+ res, err := http.DefaultClient.Do(req)
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.StatusCode).To(Equal(http.StatusOK))
+
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
+ Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag))
+
+ var depRes ApiDeploymentResponse
+ body, err := ioutil.ReadAll(res.Body)
+ Expect(err).ShouldNot(HaveOccurred())
+ json.Unmarshal(body, &depRes)
+
+ Expect(len(depRes)).To(Equal(2))
+
+ dep := depRes[1]
+
+ Expect(dep.ID).To(Equal(deploymentID))
+ Expect(dep.ScopeId).To(Equal(deploymentID))
+ Expect(dep.DisplayName).To(Equal(deploymentID))
+
+ close(done)
+ }()
+
+ time.Sleep(250 * time.Millisecond) // give api call above time to block
+ insertTestDeployment(testServer, deploymentID)
+ deploymentsChanged <- deploymentID
+ })
+
+ It("should get 304 after blocking if no new deployment", func() {
+
+ deploymentID := "api_no_change_blocking"
+ insertTestDeployment(testServer, deploymentID)
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = deploymentsEndpoint
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
+
+ query := uri.Query()
+ query.Add("block", "1")
+ uri.RawQuery = query.Encode()
+ req, err := http.NewRequest("GET", uri.String(), nil)
+ req.Header.Add("Content-Type", "application/json")
+ req.Header.Add("If-None-Match", res.Header.Get("etag"))
+
+ res, err = http.DefaultClient.Do(req)
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.StatusCode).To(Equal(http.StatusNotModified))
+ })
+ */
+ })
+})
\ No newline at end of file
diff --git a/apidGatewayConfDeploy_suite_test.go b/apidGatewayConfDeploy_suite_test.go
new file mode 100644
index 0000000..9cc294a
--- /dev/null
+++ b/apidGatewayConfDeploy_suite_test.go
@@ -0,0 +1,46 @@
+package apiGatewayConfDeploy
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "github.com/30x/apid-core"
+ "github.com/30x/apid-core/factory"
+ "io/ioutil"
+ "net/http/httptest"
+ "time"
+ "os"
+ "net/url"
+)
+
+
+var (
+ tmpDir string
+)
+
+var _ = BeforeSuite(func() {
+ apid.Initialize(factory.DefaultServicesFactory())
+ config := apid.Config()
+ var err error
+ tmpDir, err = ioutil.TempDir("", "api_test")
+ Expect(err).NotTo(HaveOccurred())
+
+ config.Set("local_storage_path", tmpDir)
+ config.Set(configApidInstanceID, "INSTANCE_ID")
+ config.Set(configApidClusterID, "CLUSTER_ID")
+ config.Set(configApiServerBaseURI, "http://localhost")
+ config.Set(configDebounceDuration, "1ms")
+ apid.InitializePlugins("")
+
+
+ bundleCleanupDelay = time.Millisecond
+ bundleRetryDelay = 10 * time.Millisecond
+ markDeploymentFailedAfter = 50 * time.Millisecond
+ concurrentDownloads = 1
+ downloadQueueSize = 1
+
+})
+
+var _ = AfterSuite(func() {
+ apid.Events().Close()
+ os.RemoveAll(tmpDir)
+})
\ No newline at end of file
diff --git a/bundle.go b/bundle.go
index 0378f30..ee36cbb 100644
--- a/bundle.go
+++ b/bundle.go
@@ -1,3 +1,16 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
package apiGatewayConfDeploy
import (
@@ -82,7 +95,7 @@
if err == nil {
blobId := atomic.AddInt64(&gwBlobId, 1)
blobIds := strconv.FormatInt(blobId, 10)
- err = updatelocal_fs_location(dep.ID, blobIds, r.bundleFile)
+ err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile)
if err != nil {
dep.GWBlobID = blobIds
}
diff --git a/data.go b/data.go
index a867e7a..3ec5eca 100644
--- a/data.go
+++ b/data.go
@@ -1,3 +1,16 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
package apiGatewayConfDeploy
import (
@@ -8,8 +21,7 @@
)
var (
- unsafeDB apid.DB
- dbMux sync.RWMutex
+ dbMan dbManagerInterface
gwBlobId int64
)
@@ -35,8 +47,41 @@
Exec(query string, args ...interface{}) (sql.Result, error)
}
-func InitDB(db apid.DB) error {
- _, err := db.Exec(`
+
+type dbManagerInterface interface {
+ setDbVersion(string)
+ initDb() error
+ getUnreadyDeployments() ([]DataDeployment, error)
+ getReadyDeployments() ([]DataDeployment, error)
+ updateLocalFsLocation(string, string, string) error
+ getLocalFSLocation(string) (string, error)
+}
+
+
+type dbManager struct {
+ data apid.DataService
+ db apid.DB
+ dbMux sync.RWMutex
+}
+
+func (dbc *dbManager) setDbVersion(version string) {
+ db, err := dbc.data.DBVersion(version)
+ if err != nil {
+ log.Panicf("Unable to access database: %v", err)
+ }
+ dbc.dbMux.Lock()
+ dbc.db = db
+ dbc.dbMux.Unlock()
+}
+
+func (dbc *dbManager) getDb() apid.DB {
+ dbc.dbMux.RLock()
+ defer dbc.dbMux.RUnlock()
+ return dbc.db
+}
+
+func (dbc *dbManager) initDb() error{
+ _, err := dbc.getDb().Exec(`
CREATE TABLE IF NOT EXISTS edgex_blob_available (
gwblobid integer primary key,
runtime_meta_id character varying NOT NULL,
@@ -52,28 +97,11 @@
return nil
}
-func getDB() apid.DB {
- dbMux.RLock()
- db := unsafeDB
- dbMux.RUnlock()
- return db
-}
-
-// caller is responsible for calling dbMux.Lock() and dbMux.Unlock()
-func SetDB(db apid.DB) {
- if unsafeDB == nil { // init API when DB is initialized
- go InitAPI()
- }
- unsafeDB = db
-}
-
// getUnreadyDeployments() returns array of resources that are not yet to be processed
-func getUnreadyDeployments() (deployments []DataDeployment, err error) {
+func (dbc *dbManager) getUnreadyDeployments() (deployments []DataDeployment, err error) {
- err = nil
- db := getDB()
- rows, err := db.Query(`
+ rows, err := dbc.getDb().Query(`
SELECT project_runtime_blob_metadata.id, org_id, env_id, name, revision, blob_id, resource_blob_id
FROM project_runtime_blob_metadata
LEFT JOIN edgex_blob_available
@@ -103,12 +131,10 @@
}
// getDeployments()
-func getReadyDeployments() (deployments []DataDeployment, err error) {
+func (dbc *dbManager) getReadyDeployments() (deployments []DataDeployment, err error) {
- err = nil
- db := getDB()
- rows, err := db.Query(`
+ rows, err := dbc.getDb().Query(`
SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id,
a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
b.local_fs_location, b.access_url, b.gwblobid
@@ -139,10 +165,10 @@
}
-func updatelocal_fs_location(depID, bundleId, local_fs_location string) error {
+func (dbc *dbManager) updateLocalFsLocation(depID, bundleId, localFsLocation string) error {
- access_url := get_http_host() + "/blob/" + bundleId
- stmt, err := getDB().Prepare(`
+ access_url := getHttpHost() + "/blob/" + bundleId
+ stmt, err := dbc.getDb().Prepare(`
INSERT INTO edgex_blob_available (runtime_meta_id, gwblobid, local_fs_location, access_url)
VALUES (?, ?, ?, ?)`)
if err != nil {
@@ -151,22 +177,21 @@
}
defer stmt.Close()
- _, err = stmt.Exec(depID, bundleId, local_fs_location, access_url)
+ _, err = stmt.Exec(depID, bundleId, localFsLocation, access_url)
if err != nil {
- log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, local_fs_location, err)
+ log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, localFsLocation, err)
return err
}
- log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, local_fs_location)
+ log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, localFsLocation)
return nil
}
-func getLocalFSLocation(blobId string) (locfs string, err error) {
+func (dbc *dbManager) getLocalFSLocation(blobId string) (locfs string, err error) {
- db := getDB()
log.Debugf("Getting the blob file for blobId {%s}", blobId)
- rows, err := db.Query("SELECT local_fs_location FROM edgex_blob_available WHERE gwblobid = \"" + blobId + "\"")
+ rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE gwblobid = \"" + blobId + "\"")
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
diff --git a/init.go b/init.go
index 745197f..7c9b530 100644
--- a/init.go
+++ b/init.go
@@ -1,3 +1,16 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
package apiGatewayConfDeploy
import (
@@ -8,6 +21,7 @@
"time"
"github.com/30x/apid-core"
+ "sync"
)
const (
@@ -28,7 +42,6 @@
var (
services apid.Services
log apid.LogService
- data apid.DataService
config apid.ConfigService
bundlePath string
debounceDuration time.Duration
@@ -104,7 +117,11 @@
return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout)
}
- data = services.Data()
+ dbMan = &dbManager{
+ data: services.Data(),
+ dbMux: sync.RWMutex{},
+ }
+
blobServerURL = config.GetString(configBlobServerBaseURI)
concurrentDownloads = config.GetInt(configConcurrentDownloads)
downloadQueueSize = config.GetInt(configDownloadQueueSize)
diff --git a/listener.go b/listener.go
index 23cc071..d30aabb 100644
--- a/listener.go
+++ b/listener.go
@@ -1,3 +1,16 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
package apiGatewayConfDeploy
import (
@@ -14,6 +27,8 @@
CONFIG_METADATA_TABLE = "project.runtime_blob_metadata"
)
+var apiInitialized bool
+
func initListener(services apid.Services) {
services.Events().Listen(APIGEE_SYNC_EVENT, &apigeeSyncHandler{})
}
@@ -47,25 +62,19 @@
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
- db, err := data.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Unable to access database: %v", err)
- }
+ dbMan.setDbVersion(snapshot.SnapshotInfo)
- // Update the DB pointer
- dbMux.Lock()
- SetDB(db)
- dbMux.Unlock()
-
- InitDB(db)
startupOnExistingDatabase()
+ if !apiInitialized {
+ InitAPI()
+ }
log.Debug("Snapshot processed")
}
func startupOnExistingDatabase() {
// start bundle downloads that didn't finish
go func() {
- deployments, err := getUnreadyDeployments()
+ deployments, err := dbMan.getUnreadyDeployments()
if err != nil && err != sql.ErrNoRows {
log.Panicf("unable to query database for unready deployments: %v", err)
diff --git a/pluginData.go b/pluginData.go
index ce1eace..8815183 100644
--- a/pluginData.go
+++ b/pluginData.go
@@ -1,3 +1,16 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
package apiGatewayConfDeploy
import "github.com/30x/apid-core"