Barebones Generic gateway deploy.
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index a902a1c..0000000
--- a/.travis.yml
+++ /dev/null
@@ -1,15 +0,0 @@
-language: go
-
-go:
- - 1.7.x
-
-before_install:
- - sudo add-apt-repository ppa:masterminds/glide -y
- - sudo apt-get update -q
- - sudo apt-get install glide -y
-
-install:
- - glide install
-
-script:
- - go test $(glide novendor)
diff --git a/api.go b/api.go
index 676d6fc..d68a498 100644
--- a/api.go
+++ b/api.go
@@ -48,16 +48,11 @@
}
type ApiDeployment struct {
- ID string `json:"id"`
+ Org string `json:"org"`
+ Env string `json:"env"`
ScopeId string `json:"scopeId"`
- Created string `json:"created"`
- CreatedBy string `json:"createdBy"`
- Updated string `json:"updated"`
- UpdatedBy string `json:"updatedBy"`
- ConfigJson json.RawMessage `json:"configuration"`
- BundleConfigJson json.RawMessage `json:"bundleConfiguration"`
- DisplayName string `json:"displayName"`
- URI string `json:"uri"`
+ Type int `json:"type"`
+ BlobURL string `json:"url"`
}
// sent to client
@@ -65,10 +60,12 @@
-const deploymentsEndpoint = "/deployments"
+const deploymentsEndpoint = "/configurations"
+const BlobEndpoint = "/blob/{blobId}"
func InitAPI() {
services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
+ services.API().HandleFunc(BlobEndpoint, apiReturnBlobData).Methods("GET")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
@@ -133,7 +130,7 @@
subscribers = make(map[chan deploymentsResult]struct{})
go func() {
eTag := incrementETag()
- deployments, err := getReadyDeployments()
+ deployments, err := getUnreadyDeployments()
log.Debugf("delivering deployments to %d subscribers", len(subs))
for subscriber := range subs {
log.Debugf("delivering to: %v", subscriber)
@@ -150,6 +147,10 @@
}
}
+func apiReturnBlobData(w http.ResponseWriter, r *http.Request) {
+
+}
+
func apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) {
// If returning without a bundle (immediately or after timeout), status = 404
@@ -233,16 +234,11 @@
for _, d := range dataDeps {
apiDeps = append(apiDeps, ApiDeployment{
- ID: d.ID,
- ScopeId: d.DataScopeID,
- Created: convertTime(d.Created),
- CreatedBy: d.CreatedBy,
- Updated: convertTime(d.Updated),
- UpdatedBy: d.UpdatedBy,
- BundleConfigJson: []byte(d.BundleConfigJSON),
- ConfigJson: []byte(d.ConfigJSON),
- DisplayName: d.BundleName,
- URI: d.LocalBundleURI,
+ ScopeId: d.DataScopeID,
+ Org: d.OrgID,
+ Env: d.EnvID,
+ Type: d.Type,
+ BlobURL: d.BlobURL,
})
}
@@ -269,17 +265,3 @@
return strconv.FormatInt(e, 10)
}
-func convertTime(t string) string {
- if t == "" {
- return ""
- }
- formats := []string{sqliteTimeFormat, sqlTimeFormat, iso8601, time.RFC3339}
- for _, f := range formats {
- timestamp, err := time.Parse(f, t)
- if err == nil {
- return timestamp.Format(iso8601)
- }
- }
- log.Panic("convertTime: Unsupported time format: " + t)
- return ""
-}
diff --git a/api_test.go b/api_test.go
deleted file mode 100644
index 1a9f81b..0000000
--- a/api_test.go
+++ /dev/null
@@ -1,273 +0,0 @@
-package apiGatewayDeploy
-
-import (
- "bytes"
- "encoding/json"
- "io/ioutil"
- "net/http"
- "net/http/httptest"
- "net/url"
- "time"
-
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- "strconv"
-)
-
-var _ = Describe("api", func() {
-
- Context("GET /deployments", func() {
-
- It("should get empty set if no deployments", func() {
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
-
- Expect(res.StatusCode).Should(Equal(http.StatusOK))
-
- var depRes ApiDeploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
-
- Expect(len(depRes)).To(Equal(0))
- Expect(string(body)).Should(Equal("[]"))
- })
-
- It("should debounce requests", func(done Done) {
- var in = make(chan interface{})
- var out = make(chan []interface{})
-
- go debounce(in, out, 3*time.Millisecond)
-
- go func() {
- defer GinkgoRecover()
-
- received, ok := <-out
- Expect(ok).To(BeTrue())
- Expect(len(received)).To(Equal(2))
-
- close(in)
- received, ok = <-out
- Expect(ok).To(BeFalse())
-
- close(done)
- }()
-
- in <- "x"
- in <- "y"
- })
-
- It("should get current deployments", func() {
-
- deploymentID := "api_get_current"
- insertTestDeployment(testServer, deploymentID)
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
-
- Expect(res.StatusCode).Should(Equal(http.StatusOK))
-
- var depRes ApiDeploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
-
- Expect(len(depRes)).To(Equal(1))
-
- dep := depRes[0]
-
- Expect(dep.ID).To(Equal(deploymentID))
- Expect(dep.ScopeId).To(Equal(deploymentID))
- Expect(dep.DisplayName).To(Equal(deploymentID))
-
- var config bundleConfigJson
-
- err = json.Unmarshal(dep.ConfigJson, &config)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(config.Name).To(Equal("/bundles/1"))
-
- err = json.Unmarshal(dep.BundleConfigJson, &config)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(config.Name).To(Equal("/bundles/1"))
- })
-
- It("should get 304 for no change", func() {
-
- deploymentID := "api_no_change"
- insertTestDeployment(testServer, deploymentID)
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
-
- req, err := http.NewRequest("GET", uri.String(), nil)
- req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", res.Header.Get("etag"))
-
- res, err = http.DefaultClient.Do(req)
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.StatusCode).To(Equal(http.StatusNotModified))
- })
-
- It("should get empty set after blocking if no deployments", func() {
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- query := uri.Query()
- query.Add("block", "1")
- uri.RawQuery = query.Encode()
-
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
-
- var depRes ApiDeploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
-
- Expect(res.StatusCode).Should(Equal(http.StatusOK))
- Expect(string(body)).Should(Equal("[]"))
- })
-
- It("should get new deployment set after blocking", func(done Done) {
-
- deploymentID := "api_get_current_blocking"
- insertTestDeployment(testServer, deploymentID)
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- eTag := res.Header.Get("etag")
- Expect(eTag).ShouldNot(BeEmpty())
-
- deploymentID = "api_get_current_blocking2"
- go func() {
- defer GinkgoRecover()
-
- query := uri.Query()
- query.Add("block", "1")
- uri.RawQuery = query.Encode()
- req, err := http.NewRequest("GET", uri.String(), nil)
- req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", eTag)
-
- res, err := http.DefaultClient.Do(req)
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.StatusCode).To(Equal(http.StatusOK))
-
- Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
- Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag))
-
- var depRes ApiDeploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
-
- Expect(len(depRes)).To(Equal(2))
-
- dep := depRes[1]
-
- Expect(dep.ID).To(Equal(deploymentID))
- Expect(dep.ScopeId).To(Equal(deploymentID))
- Expect(dep.DisplayName).To(Equal(deploymentID))
-
- close(done)
- }()
-
- time.Sleep(250 * time.Millisecond) // give api call above time to block
- insertTestDeployment(testServer, deploymentID)
- deploymentsChanged <- deploymentID
- })
-
- It("should get 304 after blocking if no new deployment", func() {
-
- deploymentID := "api_no_change_blocking"
- insertTestDeployment(testServer, deploymentID)
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
-
- query := uri.Query()
- query.Add("block", "1")
- uri.RawQuery = query.Encode()
- req, err := http.NewRequest("GET", uri.String(), nil)
- req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", res.Header.Get("etag"))
-
- res, err = http.DefaultClient.Do(req)
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.StatusCode).To(Equal(http.StatusNotModified))
- })
- })
-
-})
-
-func insertTestDeployment(testServer *httptest.Server, deploymentID string) {
-
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
- bundleJson, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
-
- tx, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
-
- dep := DataDeployment{
- ID: deploymentID,
- BundleConfigID: deploymentID,
- ApidClusterID: deploymentID,
- DataScopeID: deploymentID,
- BundleConfigJSON: string(bundleJson),
- ConfigJSON: string(bundleJson),
- Created: "",
- CreatedBy: "",
- Updated: "",
- UpdatedBy: "",
- BundleName: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
- BundleChecksumType: bundle.ChecksumType,
- LocalBundleURI: "x",
- DeployStatus: "",
- DeployErrorCode: 0,
- DeployErrorMessage: "",
- }
-
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-}
-
-
diff --git a/apidGatewayDeploy-api.yaml b/apidGatewayDeploy-api.yaml
deleted file mode 100644
index c26d47f..0000000
--- a/apidGatewayDeploy-api.yaml
+++ /dev/null
@@ -1,231 +0,0 @@
-swagger: '2.0'
-info:
- version: 0.0.1
- title: Edge X Apid Gateway Deploy
- contact:
- name: Apigee, Inc.
- url: http://www.apigee.com/
- email: sales@apigee.com
- license:
- name: Apache 2.0
- url: https://www.apache.org/licenses/LICENSE-2.0
-basePath: /deployments
-schemes:
- - http
-consumes:
- - application/json
-produces:
- - application/json
-paths:
- /:
- get:
- description: Retrieve current deployment system and bundles to install.
- parameters:
- - name: If-None-Match
- in: header
- type: string
- description: "If request If-None-Match header matches the ETag of deployment, the server returns a 304 Not Modified response indicating that the client already has the most recent bundle list."
- - name: block
- in: query
- type: integer
- description: 'If block > 0 AND if there is no new bundle list available, then block for up to the specified number of seconds until a new bundle list becomes available. If no new deployment becomes available, then return 304 Not Modified if If-None-Match is specified.'
- responses:
- '200':
- headers:
- ETag:
- description: "Client should reuse ETag value in If-None-Match header of the next GET request."
- type: string
- description: The deployment system and bundles to install.
- examples:
- application/json: [
- {
- "id": "1234567890",
- "displayName": "abc123",
- "created":"1481917061",
- "updated":"1481917061",
- "createdBy":"mdobs",
- "updatedBy":"mdobs",
- "scopeId": "ABCDEF",
- "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
- "configurationJson": {
- "PropA": "scope1bundle1propA",
- "PropSCOPE_LEVEL": "aaa",
- "PropCLUSTER_LEVEL": "xxx"
- }
- },
- {
- "id": "1234567891",
- "displayName": "abc123_2",
- "created":"1481917061",
- "updated":"1481917061",
- "createdBy":"mdobs",
- "updatedBy":"mdobs",
- "scopeId": "ABCDEF",
- "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
- "configurationJson": {
- "PropA": "scope1bundle2propA",
- "PropSCOPE_LEVEL": "aaa",
- "PropCLUSTER_LEVEL": "xxx"
- }
- },
- {
- "id": "1234567892",
- "displayName": "abc1234",
- "created":"1481917061",
- "updated":"1481917061",
- "createdBy":"mdobs",
- "updatedBy":"mdobs",
- "scopeId": "ABCDEF",
- "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
- "configurationJson": {
- "PropA": "scope1bundlepropA",
- "PropSCOPE_LEVEL": "aaa",
- "PropCLUSTER_LEVEL": "xxx"
- }
- },
- {
- "id": "1234567893",
- "displayName": "abc123",
- "created":"1481917061",
- "updated":"1481917061",
- "createdBy":"mdobs",
- "updatedBy":"mdobs",
- "scopeId": "EFGHIJK",
- "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
- "configurationJson": {
- "PropA": "scope2bundle1propA",
- "PropSCOPE_LEVEL": "bbb",
- "PropCLUSTER_LEVEL": "xxx"
- }
- },
- {
- "id": "1234567894",
- "displayName": "abc123_2",
- "created":"1481917061",
- "updated":"1481917061",
- "createdBy":"fierrom",
- "updatedBy":"fierrom",
- "scopeId": "EFGHIJK",
- "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
- "configurationJson": {
- "PropA": "scope2bundle2propA",
- "PropSCOPE_LEVEL": "bbb",
- "PropCLUSTER_LEVEL": "xxx"
- }
- }
-]
- schema:
- $ref: '#/definitions/DeploymentResponse'
- '304':
- description: Deployment not modified.
- put:
- description: Save results of deployment
- parameters:
- - name: _
- in: body
- required: true
- description: Success or failure response
- schema:
- $ref: '#/definitions/DeploymentResult'
- responses:
- '200':
- description: OK
- default:
- description: Error response
- schema:
- $ref: '#/definitions/ErrorResponse'
-
-definitions:
-
- ErrorResponse:
- required:
- - errorCode
- - reason
- properties:
- errorCode:
- type: number
- reason:
- type: string
- example: {
- "errorCode": 601,
- "reason": "Something's wrong"
- }
-
- DeploymentResponse:
- type: array
- items:
- $ref: '#/definitions/DeploymentBundle'
-
- DeploymentBundle:
- type: object
- required:
- - id
- - scopeId
- - createdBy
- - created
- - updatedBy
- - updated
- - displayName
- - uri
- - configurationJson
- properties:
- id:
- type: string
- scopeId:
- type: string
- createdBy:
- type: string
- created:
- type: number
- updatedBy:
- type: string
- updated:
- type: number
- displayName:
- type: string
- uri:
- type: string
- configurationJson:
- type: object
-
- DeploymentResult:
- type: array
- items:
- $ref: '#/definitions/DeploymentBundleResult'
- example: [
- {
- "id": "1234567890",
- "status": "SUCCESS"
- },
- {
- "id": "1234567890",
- "status": "SUCCESS"
- },
- {
- "id": "1234567890",
- "status": "SUCCESS"
- }
- ]
-
- DeploymentBundleResult:
- type: object
- required:
- - id
- - status
- properties:
- id:
- type: string
- message:
- type: string
- errorCode:
- type: number
- status:
- type: string
- enum:
- - "SUCCESS"
- - "FAIL"
- description: Status of SUCCESS or FAIL plus error
- example: {
- "id": 1234567890,
- "status": "SUCCESS"
- }
\ No newline at end of file
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
deleted file mode 100644
index 236fe51..0000000
--- a/apidGatewayDeploy_suite_test.go
+++ /dev/null
@@ -1,138 +0,0 @@
-package apiGatewayDeploy
-
-import (
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-
- "encoding/hex"
-
- "github.com/30x/apid-core"
- "github.com/30x/apid-core/factory"
-
- "io/ioutil"
- "net/http"
- "net/http/httptest"
- "net/url"
- "os"
- "testing"
- "time"
-)
-
-var (
- tmpDir string
- testServer *httptest.Server
- testLastTrackerVars map[string]string
- testLastTrackerBody []byte
-)
-
-var _ = BeforeSuite(func() {
- apid.Initialize(factory.DefaultServicesFactory())
-
- config := apid.Config()
-
- var err error
- tmpDir, err = ioutil.TempDir("", "api_test")
- Expect(err).NotTo(HaveOccurred())
-
- config.Set("local_storage_path", tmpDir)
- config.Set(configApidInstanceID, "INSTANCE_ID")
- config.Set(configApidClusterID, "CLUSTER_ID")
- config.Set(configApiServerBaseURI, "http://localhost")
- config.Set(configDebounceDuration, "1ms")
-
- apid.InitializePlugins("")
-
- // init full DB
- db, err := data.DB()
- Expect(err).NotTo(HaveOccurred())
- err = InitDBFullColumns(db)
- Expect(err).NotTo(HaveOccurred())
- SetDB(db)
-
- bundleCleanupDelay = time.Millisecond
- bundleRetryDelay = 10 * time.Millisecond
- markDeploymentFailedAfter = 50 * time.Millisecond
- concurrentDownloads = 1
- downloadQueueSize = 1
-
- router := apid.API().Router()
- // fake an unreliable bundle repo
- count := 1
- failedOnce := false
- router.HandleFunc("/bundles/failonce", func(w http.ResponseWriter, req *http.Request) {
- if failedOnce {
- vars := apid.API().Vars(req)
- w.Write([]byte("/bundles/" + vars["id"]))
- } else {
- failedOnce = true
- w.WriteHeader(500)
- }
- }).Methods("GET")
-
- router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
- count++
- vars := apid.API().Vars(req)
- if count%2 == 0 && vars["id"] != "checksum" {
- w.WriteHeader(500)
- return
- }
- if vars["id"] == "longfail" {
- time.Sleep(markDeploymentFailedAfter + (250 * time.Millisecond))
- }
- w.Write([]byte("/bundles/" + vars["id"]))
-
- }).Methods("GET")
-
- // fake an unreliable APID tracker
- router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments",
- func(w http.ResponseWriter, req *http.Request) {
- count++
- if count%2 == 0 {
- w.WriteHeader(500)
- return
- }
-
- testLastTrackerVars = apid.API().Vars(req)
- testLastTrackerBody, err = ioutil.ReadAll(req.Body)
- Expect(err).ToNot(HaveOccurred())
-
- w.Write([]byte("OK"))
-
- }).Methods("PUT")
- testServer = httptest.NewServer(router)
-})
-
-var _ = AfterSuite(func() {
- apid.Events().Close()
- if testServer != nil {
- testServer.Close()
- }
- os.RemoveAll(tmpDir)
-})
-
-var _ = BeforeEach(func() {
- var err error
- apiServerBaseURI, err = url.Parse(testServer.URL)
- Expect(err).NotTo(HaveOccurred())
-
- _, err = getDB().Exec("DELETE FROM edgex_deployment")
- Expect(err).ShouldNot(HaveOccurred())
-
- _, err = getDB().Exec("UPDATE etag SET value=1")
-})
-
-func TestApidGatewayDeploy(t *testing.T) {
- RegisterFailHandler(Fail)
- RunSpecs(t, "ApidGatewayDeploy Suite")
-}
-
-func testGetChecksum(hashType, uri string) string {
- url, err := url.Parse(uri)
- Expect(err).NotTo(HaveOccurred())
-
- hashWriter, err := getHashWriter(hashType)
- Expect(err).NotTo(HaveOccurred())
-
- hashWriter.Write([]byte(url.Path))
- return hex.EncodeToString(hashWriter.Sum(nil))
-}
diff --git a/bundle.go b/bundle.go
index 5a2c977..8bd756c 100644
--- a/bundle.go
+++ b/bundle.go
@@ -1,22 +1,15 @@
package apiGatewayDeploy
import (
- "crypto/md5"
- "crypto/sha256"
- "crypto/sha512"
+
"encoding/base64"
- "encoding/hex"
- "errors"
"fmt"
- "hash"
- "hash/crc32"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
- "strings"
"time"
)
@@ -42,19 +35,11 @@
func queueDownloadRequest(dep DataDeployment) {
- hashWriter, err := getHashWriter(dep.BundleChecksumType)
- if err != nil {
- msg := fmt.Sprintf("invalid bundle checksum type: %s for deployment: %s", dep.BundleChecksumType, dep.ID)
- log.Error(msg)
- return
- }
-
retryIn := bundleRetryDelay
maxBackOff := 5 * time.Minute
markFailedAt := time.Now().Add(markDeploymentFailedAfter)
req := &DownloadRequest{
dep: dep,
- hashWriter: hashWriter,
bundleFile: getBundleFile(dep),
backoffFunc: createBackoff(retryIn, maxBackOff),
markFailedAt: markFailedAt,
@@ -64,7 +49,6 @@
type DownloadRequest struct {
dep DataDeployment
- hashWriter hash.Hash
bundleFile string
backoffFunc func()
markFailedAt time.Time
@@ -73,18 +57,11 @@
func (r *DownloadRequest) downloadBundle() {
dep := r.dep
- log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BundleURI)
-
- deployments, err := getDeployments("WHERE id=$1", dep.ID)
- if err == nil && len(deployments) == 0 {
- log.Debugf("never mind, deployment %s was deleted", dep.ID)
- return
- }
+ log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BlobID)
r.checkTimeout()
- r.hashWriter.Reset()
- tempFile, err := downloadFromURI(dep.BundleURI, r.hashWriter, dep.BundleChecksum)
+ tempFile, err := downloadFromURI(dep.BlobID)
if err == nil {
err = os.Rename(tempFile, r.bundleFile)
@@ -98,7 +75,7 @@
}
if err == nil {
- err = updateLocalBundleURI(dep.ID, r.bundleFile)
+ err = updatelocal_fs_location(dep.BlobID, r.bundleFile)
}
if err != nil {
@@ -110,7 +87,7 @@
return
}
- log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
+ log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BlobID)
// send deployments to client
deploymentsChanged <- dep.ID
@@ -122,25 +99,59 @@
if time.Now().After(r.markFailedAt) {
r.markFailedAt = time.Time{}
log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s",
- r.dep.ID, r.dep.BundleURI)
+ r.dep.ID, r.dep.BlobID)
}
}
}
func getBundleFile(dep DataDeployment) string {
- // the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI
+ // the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BlobID
// unfortunately, this also means that a bundle cache isn't especially relevant
fileName := dep.DataScopeID + "_" + dep.ID
return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(fileName)))
}
-func downloadFromURI(uri string, hashWriter hash.Hash, expectedHash string) (tempFileName string, err error) {
+func getSignedURL(blobId string) (string, error) {
- log.Debugf("Downloading bundle: %s", uri)
+ blobUri, err := url.Parse(config.GetString(configBlobServerBaseURI))
+ if err != nil {
+ log.Panicf("bad url value for config %s: %s", blobUri, err)
+ }
+
+ //TODO : Just a temp Hack
+ blobUri.Path = path.Join(blobUri.Path, "/v1/blobstore/signeduri?action=GET&key=" + blobId)
+ uri := blobUri.String()
+
+ surl, err := getURIReader(uri)
+ if err != nil {
+ log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err)
+ return "", err
+ }
+
+ signedURL, err := ioutil.ReadAll(surl)
+ if err != nil {
+ log.Errorf("Invalid response from BlobServer for {%s} error: {%v}", uri, err)
+ return "", err
+ }
+ return string(signedURL), nil
+}
+
+
+// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally
+// after downloading the resource from GCS (via the signed URL)
+func downloadFromURI(blobId string) (tempFileName string, err error) {
var tempFile *os.File
+ log.Debugf("Downloading bundle: %s", blobId)
+
+ uri, err := getSignedURL(blobId)
+ if err != nil {
+ log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err)
+ return
+ }
+
tempFile, err = ioutil.TempFile(bundlePath, "download")
if err != nil {
log.Errorf("Unable to create temp file: %v", err)
@@ -149,55 +160,27 @@
defer tempFile.Close()
tempFileName = tempFile.Name()
- var bundleReader io.ReadCloser
- bundleReader, err = getURIFileReader(uri)
+ var confReader io.ReadCloser
+ confReader, err = getURIReader(uri)
if err != nil {
log.Errorf("Unable to retrieve bundle %s: %v", uri, err)
return
}
- defer bundleReader.Close()
+ defer confReader.Close()
- // track checksum
- teedReader := io.TeeReader(bundleReader, hashWriter)
-
- _, err = io.Copy(tempFile, teedReader)
+ _, err = io.Copy(tempFile, confReader)
if err != nil {
log.Errorf("Unable to write bundle %s: %v", tempFileName, err)
return
}
- // check checksum
- checksum := hex.EncodeToString(hashWriter.Sum(nil))
- if checksum != expectedHash {
- err = errors.New(fmt.Sprintf("Bad checksum on %s. calculated: %s, given: %s", tempFileName, checksum, expectedHash))
- log.Error(err.Error())
- return
- }
-
log.Debugf("Bundle %s downloaded to: %s", uri, tempFileName)
return
}
// retrieveBundle retrieves bundle data from a URI
-func getURIFileReader(uriString string) (io.ReadCloser, error) {
+func getURIReader(uriString string) (io.ReadCloser, error) {
- uri, err := url.Parse(uriString)
- if err != nil {
- return nil, fmt.Errorf("DownloadFileUrl: Failed to parse urlStr: %s", uriString)
- }
-
- // todo: add authentication - TBD?
-
- // assume it's a file if no scheme - todo: remove file support?
- if uri.Scheme == "" || uri.Scheme == "file" {
- f, err := os.Open(uri.Path)
- if err != nil {
- return nil, err
- }
- return f, nil
- }
-
- // GET the contents at uriString
client := http.Client{
Timeout: bundleDownloadConnTimeout,
}
@@ -206,42 +189,11 @@
return nil, err
}
if res.StatusCode != 200 {
- return nil, fmt.Errorf("Bundle uri %s failed with status %d", uriString, res.StatusCode)
+ return nil, fmt.Errorf("GET uri %s failed with status %d", uriString, res.StatusCode)
}
return res.Body, nil
}
-func getHashWriter(hashType string) (hash.Hash, error) {
-
- var hashWriter hash.Hash
-
- switch strings.ToLower(hashType) {
- case "":
- hashWriter = fakeHash{md5.New()}
- case "md5":
- hashWriter = md5.New()
- case "crc32":
- hashWriter = crc32.NewIEEE()
- case "sha256":
- hashWriter = sha256.New()
- case "sha512":
- hashWriter = sha512.New()
- default:
- return nil, errors.New(
- fmt.Sprintf("invalid checksumType: %s. valid types: md5, crc32, sha256, sha512", hashType))
- }
-
- return hashWriter, nil
-}
-
-type fakeHash struct {
- hash.Hash
-}
-
-func (f fakeHash) Sum(b []byte) []byte {
- return []byte("")
-}
-
func initializeBundleDownloading() {
// create workers
diff --git a/bundle_test.go b/bundle_test.go
deleted file mode 100644
index 46e7a74..0000000
--- a/bundle_test.go
+++ /dev/null
@@ -1,338 +0,0 @@
-package apiGatewayDeploy
-
-import (
- "encoding/json"
- "net/url"
- "time"
-
- "net/http"
- "net/http/httptest"
-
- "io/ioutil"
-
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-)
-
-var _ = Describe("bundle", func() {
-
- Context("download", func() {
-
- It("should timeout connection and retry", func() {
- defer func() {
- bundleDownloadConnTimeout = time.Second
- }()
- bundleDownloadConnTimeout = 100 * time.Millisecond
- firstTime := true
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if firstTime {
- firstTime = false
- time.Sleep(1 * time.Second)
- w.WriteHeader(500)
- } else {
- w.Write([]byte("/bundles/longfail"))
- }
- }))
- defer ts.Close()
-
- uri, err := url.Parse(ts.URL)
- Expect(err).ShouldNot(HaveOccurred())
- uri.Path = "/bundles/longfail"
-
- tx, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
-
- deploymentID := "bundle_download_fail"
- dep := DataDeployment{
- ID: deploymentID,
- DataScopeID: deploymentID,
- BundleURI: uri.String(),
- BundleChecksum: testGetChecksum("crc32", uri.String()),
- BundleChecksumType: "crc32",
- }
-
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-
- queueDownloadRequest(dep)
-
- var listener = make(chan deploymentsResult)
- addSubscriber <- listener
- result := <-listener
-
- Expect(result.err).NotTo(HaveOccurred())
- Expect(len(result.deployments)).To(Equal(1))
- d := result.deployments[0]
- Expect(d.ID).To(Equal(deploymentID))
- })
-
- It("should timeout deployment, mark status as failed, then finish", func() {
-
- proceed := make(chan bool)
- failedOnce := false
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if failedOnce {
- proceed <- true
- time.Sleep(markDeploymentFailedAfter)
- w.Write([]byte("/bundles/longfail"))
- } else {
- failedOnce = true
- time.Sleep(markDeploymentFailedAfter)
- w.WriteHeader(500)
- }
- }))
- defer ts.Close()
-
- deploymentID := "bundle_download_fail"
-
- uri, err := url.Parse(ts.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/longfail"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
- bundleJson, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
-
- tx, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
-
- dep := DataDeployment{
- ID: deploymentID,
- BundleConfigID: deploymentID,
- ApidClusterID: deploymentID,
- DataScopeID: deploymentID,
- BundleConfigJSON: string(bundleJson),
- ConfigJSON: string(bundleJson),
- Created: "",
- CreatedBy: "",
- Updated: "",
- UpdatedBy: "",
- BundleName: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
- BundleChecksumType: bundle.ChecksumType,
- LocalBundleURI: "",
- DeployStatus: "",
- DeployErrorCode: 0,
- DeployErrorMessage: "",
- }
-
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-
- trackerHit := false
- tracker := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- defer GinkgoRecover()
-
- b, err := ioutil.ReadAll(r.Body)
- Expect(err).ShouldNot(HaveOccurred())
- var received apiDeploymentResults
- json.Unmarshal(b, &received)
-
- expected := apiDeploymentResults{
- {
- ID: deploymentID,
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT,
- Message: "bundle download failed",
- },
- }
- Expect(received).To(Equal(expected))
- trackerHit = true
- w.Write([]byte("OK"))
- }))
- defer tracker.Close()
- apiServerBaseURI, err = url.Parse(tracker.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- queueDownloadRequest(dep)
-
- <-proceed
-
- // get error state deployment
- deployments, err := getDeployments("WHERE id=$1", deploymentID)
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d := deployments[0]
-
- Expect(d.ID).To(Equal(deploymentID))
- Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
- Expect(d.DeployErrorCode).To(Equal(TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT))
- Expect(d.DeployErrorMessage).ToNot(BeEmpty())
- Expect(d.LocalBundleURI).To(BeEmpty())
-
- Expect(trackerHit).To(BeTrue())
-
- var listener = make(chan deploymentsResult)
- addSubscriber <- listener
- <-listener
-
- // get finished deployment
- // still in error state (let client update), but with valid local bundle
- deployments, err = getDeployments("WHERE id=$1", deploymentID)
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d = deployments[0]
-
- Expect(d.ID).To(Equal(deploymentID))
- Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
- Expect(d.DeployErrorCode).To(Equal(TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT))
- Expect(d.DeployErrorMessage).ToNot(BeEmpty())
- Expect(d.LocalBundleURI).To(BeAnExistingFile())
- })
-
- It("should not continue attempts if deployment has been deleted", func() {
-
- deploymentID := "bundle_download_deployment_deleted"
-
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/failonce"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
- bundleJson, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
-
- tx, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
-
- dep := DataDeployment{
- ID: deploymentID,
- BundleConfigID: deploymentID,
- ApidClusterID: deploymentID,
- DataScopeID: deploymentID,
- BundleConfigJSON: string(bundleJson),
- ConfigJSON: string(bundleJson),
- Created: "",
- CreatedBy: "",
- Updated: "",
- UpdatedBy: "",
- BundleName: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
- BundleChecksumType: bundle.ChecksumType,
- LocalBundleURI: "",
- DeployStatus: "",
- DeployErrorCode: 0,
- DeployErrorMessage: "",
- }
-
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-
- queueDownloadRequest(dep)
-
- // skip first try
- time.Sleep(bundleRetryDelay)
-
- // delete deployment
- tx, err = getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
- deleteDeployment(tx, dep.ID)
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-
- // wait for final
- time.Sleep(bundleRetryDelay)
-
- // No way to test this programmatically currently
- // search logs for "never mind, deployment bundle_download_deployment_deleted was deleted"
- })
- })
-
- Context("checksums", func() {
-
- It("should download with empty checksumType", func() {
- checksumDownloadValid("")
- })
-
- It("should fail download with bad empty checksumType", func() {
- checksumDownloadInvalid("")
- })
-
- It("should fail download with bad md5", func() {
- checksumDownloadInvalid("md5")
- })
-
- It("should download with good md5", func() {
- checksumDownloadValid("md5")
- })
-
- It("should fail download with bad md5", func() {
- checksumDownloadInvalid("md5")
- })
-
- It("should download with good crc32", func() {
- checksumDownloadValid("crc32")
- })
-
- It("should fail download with bad crc32", func() {
- checksumDownloadInvalid("crc32")
- })
-
- It("should download with good sha256", func() {
- checksumDownloadValid("sha256")
- })
-
- It("should fail download with bad sha256", func() {
- checksumDownloadInvalid("sha256")
- })
-
- It("should download correctly with sha512", func() {
- checksumDownloadValid("sha512")
- })
-
- It("should fail download with bad sha512", func() {
- checksumDownloadInvalid("sha512")
- })
- })
-})
-
-func checksumDownloadValid(checksumType string) {
- defer GinkgoRecover()
-
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
- uri.Path = "/bundles/checksum"
- checksum := testGetChecksum(checksumType, uri.String())
- hash, err := getHashWriter(checksumType)
- Expect(err).NotTo(HaveOccurred())
- _, err = downloadFromURI(uri.String(), hash, checksum)
- Expect(err).NotTo(HaveOccurred())
-}
-
-func checksumDownloadInvalid(checksumType string) {
- defer GinkgoRecover()
-
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
- uri.Path = "/bundles/checksum"
- checksum := "invalidchecksum"
- hash, err := getHashWriter(checksumType)
- Expect(err).NotTo(HaveOccurred())
- _, err = downloadFromURI(uri.String(), hash, checksum)
- Expect(err).To(HaveOccurred())
-}
diff --git a/cmd/apidGatewayDeploy/deployments.json b/cmd/apidGatewayDeploy/deployments.json
deleted file mode 100644
index 72efd9f..0000000
--- a/cmd/apidGatewayDeploy/deployments.json
+++ /dev/null
@@ -1,24 +0,0 @@
-[
- {
- "id": "api_no_change_blocking",
- "scopeId": "api_no_change_blocking",
- "created": "",
- "createdBy": "",
- "updated": "",
- "updatedBy": "",
- "configuration": {
- "name": "/bundles/1",
- "uri": "http://127.0.0.1:53589/bundles/1",
- "checksumType": "crc-32",
- "checksum": "c65a0f2a"
- },
- "bundleConfiguration": {
- "name": "/bundles/1",
- "uri": "http://127.0.0.1:53589/bundles/1",
- "checksumType": "crc-32",
- "checksum": "c65a0f2a"
- },
- "displayName": "api_no_change_blocking",
- "uri": "x"
- }
-]
\ No newline at end of file
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go
deleted file mode 100644
index 3939944..0000000
--- a/cmd/apidGatewayDeploy/main.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package main
-
-import (
- "encoding/json"
- "flag"
- "github.com/30x/apid-core"
- "github.com/30x/apid-core/factory"
- "github.com/30x/apidGatewayDeploy"
- _ "github.com/30x/apidGatewayDeploy"
- "io/ioutil"
- "os"
-)
-
-func main() {
- deploymentsFlag := flag.String("deployments", "", "file path to a deployments file (for testing)")
- flag.Parse()
- deploymentsFile := *deploymentsFlag
-
- apid.Initialize(factory.DefaultServicesFactory())
-
- log := apid.Log()
- log.Debug("initializing...")
-
- configService := apid.Config()
-
- var deployments apiGatewayDeploy.ApiDeploymentResponse
- if deploymentsFile != "" {
- log.Printf("Running in temp dir using deployments file: %s", deploymentsFile)
- tmpDir, err := ioutil.TempDir("", "apidGatewayDeploy")
- if err != nil {
- log.Panicf("ERROR: Unable to create temp dir", err)
- }
- defer os.RemoveAll(tmpDir)
-
- configService.Set("data_path", tmpDir)
-
- if deploymentsFile != "" {
- bytes, err := ioutil.ReadFile(deploymentsFile)
- if err != nil {
- log.Errorf("ERROR: Unable to read bundle config file at %s", deploymentsFile)
- return
-
- }
-
- err = json.Unmarshal(bytes, &deployments)
- if err != nil {
- log.Errorf("ERROR: Unable to parse deployments %v", err)
- return
- }
- }
- }
-
- apid.InitializePlugins("")
-
- insertTestDeployments(deployments)
-
- // print the base url to the console
- basePath := "/deployments"
- port := configService.GetString("api_port")
- log.Print()
- log.Printf("API is at: http://localhost:%s%s", port, basePath)
- log.Print()
-
- // start client API listener
- api := apid.API()
- err := api.Listen()
- if err != nil {
- log.Print(err)
- }
-}
-
-func insertTestDeployments(deployments apiGatewayDeploy.ApiDeploymentResponse) error {
-
- if len(deployments) == 0 {
- return nil
- }
-
- log := apid.Log()
-
- db, err := apid.Data().DB()
- if err != nil {
- return err
- }
- apiGatewayDeploy.SetDB(db)
-
- err = apiGatewayDeploy.InitDB(db)
- if err != nil {
- return err
- }
-
- tx, err := db.Begin()
- if err != nil {
- return err
- }
-
- for _, ad := range deployments {
-
- dep := apiGatewayDeploy.DataDeployment{
- ID: ad.ID,
- BundleConfigID: ad.ID,
- ApidClusterID: ad.ID,
- DataScopeID: ad.ScopeId,
- BundleConfigJSON: string(ad.BundleConfigJson),
- ConfigJSON: string(ad.ConfigJson),
- Created: "",
- CreatedBy: "",
- Updated: "",
- UpdatedBy: "",
- BundleName: ad.DisplayName,
- BundleURI: ad.URI,
- BundleChecksum: "",
- BundleChecksumType: "",
- LocalBundleURI: ad.URI,
- }
-
- err = apiGatewayDeploy.InsertDeployment(tx, dep)
- if err != nil {
- log.Error("Unable to insert deployment")
- return err
- }
-
- }
-
- err = tx.Commit()
- if err != nil {
- return err
- }
-
- apiGatewayDeploy.InitAPI()
-
- return nil
-}
diff --git a/cover.sh b/cover.sh
deleted file mode 100755
index a378f75..0000000
--- a/cover.sh
+++ /dev/null
@@ -1,13 +0,0 @@
-#!/usr/bin/env bash
-
-set -e
-echo "mode: atomic" > coverage.txt
-
-for d in $(go list ./... | grep -v vendor); do
- go test -coverprofile=profile.out -covermode=atomic $d
- if [ -f profile.out ]; then
- tail -n +2 profile.out >> coverage.txt
- rm profile.out
- fi
-done
-go tool cover -html=coverage.txt -o cover.html
diff --git a/data.go b/data.go
index b09c0cb..d261023 100644
--- a/data.go
+++ b/data.go
@@ -4,9 +4,8 @@
"database/sql"
"sync"
- "encoding/json"
"github.com/30x/apid-core"
- "strings"
+
)
var (
@@ -16,76 +15,32 @@
type DataDeployment struct {
ID string
- BundleConfigID string
- ApidClusterID string
+ OrgID string
+ EnvID string
DataScopeID string
- BundleConfigJSON string
- ConfigJSON string
- Created string
- CreatedBy string
+ Type int
+ Name string
+ Revision string
+ BlobID string
+ BlobResourceID string
Updated string
UpdatedBy string
- BundleName string
- BundleURI string
- LocalBundleURI string
- BundleChecksum string
- BundleChecksumType string
- DeployStatus string
- DeployErrorCode int
- DeployErrorMessage string
+ Created string
+ CreatedBy string
+ BlobFSLocation string
+ BlobURL string
}
type SQLExec interface {
Exec(query string, args ...interface{}) (sql.Result, error)
}
-func InitDBFullColumns(db apid.DB) error {
- _, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS edgex_deployment (
- id character varying(36) NOT NULL,
- bundle_config_id varchar(36) NOT NULL,
- apid_cluster_id varchar(36) NOT NULL,
- data_scope_id varchar(36) NOT NULL,
- bundle_config_json text NOT NULL,
- config_json text NOT NULL,
- created timestamp without time zone,
- created_by text,
- updated timestamp without time zone,
- updated_by text,
- bundle_config_name text,
- bundle_uri text,
- local_bundle_uri text,
- bundle_checksum text,
- bundle_checksum_type text,
- deploy_status string,
- deploy_error_code int,
- deploy_error_message text,
- PRIMARY KEY (id)
- );
- `)
- if err != nil {
- return err
- }
-
- log.Debug("Database tables created.")
- return nil
-}
-
func InitDB(db apid.DB) error {
_, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS edgex_deployment (
- id character varying(36) NOT NULL,
- bundle_config_id varchar(36) NOT NULL,
- apid_cluster_id varchar(36) NOT NULL,
- data_scope_id varchar(36) NOT NULL,
- bundle_config_json text NOT NULL,
- config_json text NOT NULL,
- created timestamp without time zone,
- created_by text,
- updated timestamp without time zone,
- updated_by text,
- bundle_config_name text,
- PRIMARY KEY (id)
+ CREATE TABLE IF NOT EXISTS edgex_blob_available (
+ blob_id character varying NOT NULL,
+ local_fs_location character varying NOT NULL,
+ access_url character varying
);
`)
if err != nil {
@@ -96,31 +51,6 @@
return nil
}
-func alterTable(db apid.DB) error {
- queries := []string{
- "ALTER TABLE edgex_deployment ADD COLUMN bundle_uri text;",
- "ALTER TABLE edgex_deployment ADD COLUMN local_bundle_uri text;",
- "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum text;",
- "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum_type text;",
- "ALTER TABLE edgex_deployment ADD COLUMN deploy_status string;",
- "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_code int;",
- "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_message text;",
- }
-
- for _, query := range queries {
- _, err := db.Exec(query)
- if err != nil {
- if strings.Contains(err.Error(), "duplicate column name") {
- log.Warnf("AlterTable warning: %s", err)
- } else {
- return err
- }
- }
- }
- log.Debug("Database table altered.")
- return nil
-}
-
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
@@ -136,232 +66,113 @@
unsafeDB = db
}
-func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
- return insertDeployments(tx, []DataDeployment{dep})
-}
-
-func insertDeployments(tx *sql.Tx, deps []DataDeployment) error {
-
- log.Debugf("inserting %d edgex_deployment", len(deps))
-
- stmt, err := tx.Prepare(`
- INSERT INTO edgex_deployment
- (id, bundle_config_id, apid_cluster_id, data_scope_id,
- bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri,
- bundle_checksum, bundle_checksum_type, deploy_status,
- deploy_error_code, deploy_error_message)
- VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
- `)
- if err != nil {
- log.Errorf("prepare insert into edgex_deployment failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- for _, dep := range deps {
- log.Debugf("insertDeployment: %s", dep.ID)
-
- _, err = stmt.Exec(
- dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
- dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
- dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
- dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
- dep.DeployErrorCode, dep.DeployErrorMessage)
- if err != nil {
- log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
- return err
- }
- }
-
- log.Debug("inserting edgex_deployment succeeded")
- return err
-}
-
-func updateDeploymentsColumns(tx *sql.Tx, deps []DataDeployment) error {
-
- log.Debugf("updating %d edgex_deployment", len(deps))
-
- stmt, err := tx.Prepare(`
- UPDATE edgex_deployment SET
- (bundle_uri, local_bundle_uri,
- bundle_checksum, bundle_checksum_type, deploy_status,
- deploy_error_code, deploy_error_message)
- = ($1,$2,$3,$4,$5,$6,$7) WHERE id = $8
- `)
- if err != nil {
- log.Errorf("prepare update edgex_deployment failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- for _, dep := range deps {
- log.Debugf("updateDeploymentsColumns: processing deployment %s, %v", dep.ID, dep.BundleURI)
-
- _, err = stmt.Exec(
- dep.BundleURI,
- dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
- dep.DeployErrorCode, dep.DeployErrorMessage, dep.ID)
- if err != nil {
- log.Errorf("updateDeploymentsColumns of edgex_deployment %s failed: %v", dep.ID, err)
- return err
- }
- }
-
- log.Debug("updateDeploymentsColumns of edgex_deployment succeeded")
- return err
-}
-
-func getDeploymentsToUpdate(db apid.DB) (deployments []DataDeployment, err error) {
- deployments, err = getDeployments("WHERE bundle_uri IS NULL AND local_bundle_uri IS NULL AND deploy_status IS NULL")
- if err != nil {
- log.Errorf("getDeployments in getDeploymentsToUpdate failed: %v", err)
- return
- }
- var bc bundleConfigJson
- for i, _ := range deployments {
- log.Debugf("getDeploymentsToUpdate: processing deployment %v, %v", deployments[i].ID, deployments[i].BundleConfigJSON)
- json.Unmarshal([]byte(deployments[i].BundleConfigJSON), &bc)
- if err != nil {
- log.Errorf("JSON decoding Manifest failed: %v", err)
- return
- }
- deployments[i].BundleName = bc.Name
- deployments[i].BundleURI = bc.URI
- deployments[i].BundleChecksumType = bc.ChecksumType
- deployments[i].BundleChecksum = bc.Checksum
-
- log.Debugf("Unmarshal: %v", deployments[i].BundleURI)
- }
- return
-}
-
-func deleteDeployment(tx *sql.Tx, depID string) error {
-
- log.Debugf("deleteDeployment: %s", depID)
-
- stmt, err := tx.Prepare("DELETE FROM edgex_deployment where id = $1;")
- if err != nil {
- log.Errorf("prepare delete from edgex_deployment %s failed: %v", depID, err)
- return err
- }
- defer stmt.Close()
-
- _, err = stmt.Exec(depID)
- if err != nil {
- log.Errorf("delete from edgex_deployment %s failed: %v", depID, err)
- return err
- }
-
- log.Debugf("deleteDeployment %s succeeded", depID)
- return err
-}
-
-// getReadyDeployments() returns array of deployments that are ready to deploy
-func getReadyDeployments() (deployments []DataDeployment, err error) {
- return getDeployments("WHERE local_bundle_uri != $1", "")
-}
-
-// getUnreadyDeployments() returns array of deployments that are not yet ready to deploy
+// getUnreadyDeployments() returns array of resources that are not yet to be processed
func getUnreadyDeployments() (deployments []DataDeployment, err error) {
- return getDeployments("WHERE local_bundle_uri = $1", "")
-}
-// getDeployments() accepts a "WHERE ..." clause and optional parameters and returns the list of deployments
-func getDeployments(where string, a ...interface{}) (deployments []DataDeployment, err error) {
+ err = nil
db := getDB()
- var stmt *sql.Stmt
- stmt, err = db.Prepare(`
- SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
- bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_config_name, bundle_uri,
- local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
- deploy_error_code, deploy_error_message
- FROM edgex_deployment
- ` + where)
+ rows, err := db.Query(`
+ SELECT id, org_id, env_id, name, revision, project_runtime_blob_metadata.blob_id, resource_blob_id
+ FROM project_runtime_blob_metadata
+ LEFT JOIN edgex_blob_available
+ ON project_runtime_blob_metadata.blob_id = edgex_blob_available.blob_id
+ WHERE edgex_blob_available.blob_id IS NULL;
+ `)
+
if err != nil {
- return
- }
- defer stmt.Close()
- var rows *sql.Rows
- rows, err = stmt.Query(a...)
- if err != nil {
- if err == sql.ErrNoRows {
- return
- }
- log.Errorf("Error querying edgex_deployment: %v", err)
+ log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
return
}
defer rows.Close()
- deployments = dataDeploymentsFromRows(rows)
-
- return
-}
-
-func dataDeploymentsFromRows(rows *sql.Rows) (deployments []DataDeployment) {
for rows.Next() {
dep := DataDeployment{}
- rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID,
- &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Created, &dep.CreatedBy,
- &dep.Updated, &dep.UpdatedBy, &dep.BundleName, &dep.BundleURI,
- &dep.LocalBundleURI, &dep.BundleChecksum, &dep.BundleChecksumType, &dep.DeployStatus,
- &dep.DeployErrorCode, &dep.DeployErrorMessage,
- )
+ rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID,
+ &dep.BlobResourceID)
deployments = append(deployments, dep)
+ log.Debugf("New configurations to be processed Id {%s}, blobId {%s}", dep.ID, dep.BlobID)
+ }
+ if len(deployments) == 0 {
+ log.Debug("No new resources found to be processed")
+ err = sql.ErrNoRows
}
return
+
}
+// getDeployments()
+func getReadyDeployments() (deployments []DataDeployment, err error) {
+ err = nil
+ db := getDB()
-func updateLocalBundleURI(depID, localBundleUri string) error {
-
- stmt, err := getDB().Prepare("UPDATE edgex_deployment SET local_bundle_uri=$1 WHERE id=$2;")
- if err != nil {
- log.Errorf("prepare updateLocalBundleURI failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- _, err = stmt.Exec(localBundleUri, depID)
- if err != nil {
- log.Errorf("update edgex_deployment %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
- return err
- }
-
- log.Debugf("update edgex_deployment %s localBundleUri to %s succeeded", depID, localBundleUri)
-
- return nil
-}
-
-func InsertTestDeployment(tx *sql.Tx, dep DataDeployment) error {
-
- stmt, err := tx.Prepare(`
- INSERT INTO edgex_deployment
- (id, bundle_config_id, apid_cluster_id, data_scope_id,
- bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_config_name)
- VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);
+ rows, err := db.Query(`
+ SELECT a.id, a.org_id, a.env_id, a.name, a.revision, a.blob_id,
+ a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
+ b.local_fs_location, b.access_url
+ FROM project_runtime_blob_metadata as a
+ INNER JOIN edgex_blob_available as b
+ ON a.blob_id = b.blob_id
`)
+
if err != nil {
- log.Errorf("prepare insert into edgex_deployment failed: %v", err)
+ log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
+ return
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ dep := DataDeployment{}
+ rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID,
+ &dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated,
+ &dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL)
+ deployments = append(deployments, dep)
+ log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID)
+ }
+ if len(deployments) == 0 {
+ log.Debug("No resources ready to be deployed")
+ err = sql.ErrNoRows
+ }
+ return
+
+}
+
+func updatelocal_fs_location(depID, local_fs_location string) error {
+
+ // TODO DEMO ONLY
+ access_url := "http://localhost:9090/blob/" + depID
+ stmt, err := getDB().Prepare(`
+ INSERT INTO edgex_blob_available (blob_id, local_fs_location, access_url)
+ VALUES (?, ?, ?)`)
+ if err != nil {
+ log.Errorf("PREPARE updatelocal_fs_location failed: %v", err)
return err
}
defer stmt.Close()
- log.Debugf("InsertTestDeployment: %s", dep.ID)
-
- _, err = stmt.Exec(
- dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
- dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
- dep.Updated, dep.UpdatedBy, dep.BundleName)
+ _, err = stmt.Exec(depID, local_fs_location, access_url)
if err != nil {
- log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
+ log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, local_fs_location, err)
return err
}
- log.Debug("InsertTestDeployment edgex_deployment succeeded")
- return err
+ log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, local_fs_location)
+ return nil
+
+}
+
+func getLocalFSLocation (blobId string) (locfs string , err error) {
+
+ db := getDB()
+
+ rows, err := db.Query("SELECT local_fs_location FROM edgex_blob_available WHERE blob_id = " + blobId)
+ if err != nil {
+ log.Errorf("SELECT local_fs_location failed %v", err)
+ return "", err
+ }
+
+ defer rows.Close()
+ rows.Scan(&locfs)
+ return
}
diff --git a/glide.yaml b/glide.yaml
deleted file mode 100644
index c36bfa1..0000000
--- a/glide.yaml
+++ /dev/null
@@ -1,11 +0,0 @@
-package: github.com/30x/apidGatewayDeploy
-import:
-- package: github.com/30x/apid-core
- version: master
-- package: github.com/apigee-labs/transicator/common
- version: master
-testImport:
-- package: github.com/onsi/ginkgo
- version: master
-- package: github.com/onsi/gomega
- version: master
diff --git a/init.go b/init.go
index 98b9346..96b1bde 100644
--- a/init.go
+++ b/init.go
@@ -21,16 +21,19 @@
configApidClusterID = "apigeesync_cluster_id"
configConcurrentDownloads = "apigeesync_concurrent_downloads"
configDownloadQueueSize = "apigeesync_download_queue_size"
+ configBlobServerBaseURI = "apigeesync_blob_server_base"
)
var (
services apid.Services
log apid.LogService
data apid.DataService
+ config apid.ConfigService
bundlePath string
debounceDuration time.Duration
bundleCleanupDelay time.Duration
apiServerBaseURI *url.URL
+ blobServerURL string
apidInstanceID string
apidClusterID string
downloadQueueSize int
@@ -46,11 +49,16 @@
log = services.Log().ForModule("apiGatewayDeploy")
log.Debug("start init")
- config := services.Config()
+ config = services.Config()
if !config.IsSet(configApiServerBaseURI) {
return pluginData, fmt.Errorf("Missing required config value: %s", configApiServerBaseURI)
}
+
+ if !config.IsSet(configBlobServerBaseURI) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", configBlobServerBaseURI)
+ }
+
var err error
apiServerBaseURI, err = url.Parse(config.GetString(configApiServerBaseURI))
if err != nil {
@@ -96,7 +104,7 @@
}
data = services.Data()
-
+ blobServerURL = config.GetString(configBlobServerBaseURI)
concurrentDownloads = config.GetInt(configConcurrentDownloads)
downloadQueueSize = config.GetInt(configDownloadQueueSize)
relativeBundlePath := config.GetString(configBundleDirKey)
diff --git a/listener.go b/listener.go
index e9cd113..95d20f4 100644
--- a/listener.go
+++ b/listener.go
@@ -1,12 +1,12 @@
package apiGatewayDeploy
import (
- "encoding/json"
"os"
"time"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
+ "database/sql"
)
const (
@@ -52,35 +52,12 @@
log.Panicf("Unable to access database: %v", err)
}
- // alter table
- err = alterTable(db)
- if err != nil {
- log.Panicf("Alter table failed: %v", err)
- }
- // ensure that no new database updates are made on old database
+ // Update the DB pointer
dbMux.Lock()
SetDB(db)
dbMux.Unlock()
- // update deployments
- deps, err := getDeploymentsToUpdate(db)
- if err != nil {
- log.Panicf("Unable to getDeploymentsToUpdate: %v", err)
- }
- tx, err := db.Begin()
- if err != nil {
- log.Panicf("Error starting transaction: %v", err)
- }
- defer tx.Rollback()
- err = updateDeploymentsColumns(tx, deps)
- if err != nil {
- log.Panicf("updateDeploymentsColumns failed: %v", err)
- }
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error committing Snapshot update: %v", err)
- }
-
+ InitDB(db)
startupOnExistingDatabase()
log.Debug("Snapshot processed")
}
@@ -89,7 +66,8 @@
// start bundle downloads that didn't finish
go func() {
deployments, err := getUnreadyDeployments()
- if err != nil {
+
+ if err != nil && err != sql.ErrNoRows {
log.Panicf("unable to query database for unready deployments: %v", err)
}
log.Debugf("Queuing %d deployments for bundle download", len(deployments))
@@ -156,27 +134,18 @@
func dataDeploymentFromRow(row common.Row) (d DataDeployment, err error) {
row.Get("id", &d.ID)
- row.Get("bundle_config_id", &d.BundleConfigID)
- row.Get("apid_cluster_id", &d.ApidClusterID)
+ row.Get("org_id", &d.OrgID)
+ row.Get("env_id", &d.EnvID)
row.Get("data_scope_id", &d.DataScopeID)
- row.Get("bundle_config_json", &d.BundleConfigJSON)
- row.Get("config_json", &d.ConfigJSON)
- row.Get("created", &d.Created)
- row.Get("created_by", &d.CreatedBy)
- row.Get("updated", &d.Updated)
- row.Get("updated_by", &d.UpdatedBy)
-
- var bc bundleConfigJson
- json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
- if err != nil {
- log.Errorf("JSON decoding Manifest failed: %v", err)
- return
- }
-
- d.BundleName = bc.Name
- d.BundleURI = bc.URI
- d.BundleChecksumType = bc.ChecksumType
- d.BundleChecksum = bc.Checksum
+ row.Get("bundle_config_json", &d.Type)
+ row.Get("config_json", &d.Name)
+ row.Get("created", &d.Revision)
+ row.Get("created_by", &d.BlobID)
+ row.Get("created_by", &d.BlobResourceID)
+ row.Get("created_by", &d.Updated)
+ row.Get("created_by", &d.UpdatedBy)
+ row.Get("created_by", &d.Created)
+ row.Get("updated", &d.CreatedBy)
return
}
diff --git a/listener_test.go b/listener_test.go
deleted file mode 100644
index c253eb3..0000000
--- a/listener_test.go
+++ /dev/null
@@ -1,343 +0,0 @@
-package apiGatewayDeploy
-
-import (
- "encoding/json"
- "net/url"
-
- "net/http/httptest"
-
- "net/http"
-
- "fmt"
- "github.com/30x/apid-core"
- "github.com/apigee-labs/transicator/common"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-)
-
-var _ = Describe("listener", func() {
-
- Context("ApigeeSync snapshot event", func() {
-
- /*
- * Note that the test snapshot should not be empty.
- * If it's empty, you can't use deploymentsResult chan to mark the end of processing,
- * so the threads generated by startupOnExistingDatabase() will mess up later tests
- */
- It("should set DB to appropriate version", func(done Done) {
- saveDB := getDB()
- deploymentID := "set_version_test"
- snapshot, dep := createSnapshotDeployment(deploymentID, "test_version")
-
- db, err := data.DBVersion(snapshot.SnapshotInfo)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = InitDB(db)
- Expect(err).ShouldNot(HaveOccurred())
-
- insertDeploymentToDb(dep, db)
- expectedDB, err := data.DBVersion(snapshot.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
-
- var listener = make(chan deploymentsResult)
- addSubscriber <- listener
-
- apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
-
- result := <-listener
- Expect(result.err).ShouldNot(HaveOccurred())
-
- // DB should have been set
- Expect(getDB() == expectedDB).Should(BeTrue())
-
- SetDB(saveDB)
- close(done)
- })
-
- It("should process unready on existing db startup event", func(done Done) {
-
- saveDB := getDB()
-
- deploymentID := "startup_test"
-
- snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready")
-
- db, err := data.DBVersion(snapshot.SnapshotInfo)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = InitDB(db)
- Expect(err).ShouldNot(HaveOccurred())
-
- insertDeploymentToDb(dep, db)
-
- var listener = make(chan deploymentsResult)
- addSubscriber <- listener
-
- apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
-
- result := <-listener
- Expect(result.err).ShouldNot(HaveOccurred())
-
- Expect(len(result.deployments)).To(Equal(1))
- d := result.deployments[0]
-
- Expect(d.ID).To(Equal(deploymentID))
-
- SetDB(saveDB)
- close(done)
- })
-
- It("should send deployment statuses on existing db startup event", func(done Done) {
-
- saveDB := getDB()
-
- successDep := DataDeployment{
- ID: "success",
- LocalBundleURI: "x",
- DeployStatus: RESPONSE_STATUS_SUCCESS,
- DeployErrorCode: 1,
- DeployErrorMessage: "message",
- }
-
- failDep := DataDeployment{
- ID: "fail",
- LocalBundleURI: "x",
- DeployStatus: RESPONSE_STATUS_FAIL,
- DeployErrorCode: 1,
- DeployErrorMessage: "message",
- }
-
- blankDep := DataDeployment{
- ID: "blank",
- LocalBundleURI: "x",
- }
-
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- defer GinkgoRecover()
-
- var results apiDeploymentResults
- err := json.NewDecoder(r.Body).Decode(&results)
- Expect(err).ToNot(HaveOccurred())
-
- Expect(results).To(HaveLen(2))
-
- Expect(results).To(ContainElement(apiDeploymentResult{
- ID: successDep.ID,
- Status: successDep.DeployStatus,
- ErrorCode: successDep.DeployErrorCode,
- Message: successDep.DeployErrorMessage,
- }))
- Expect(results).To(ContainElement(apiDeploymentResult{
- ID: failDep.ID,
- Status: failDep.DeployStatus,
- ErrorCode: failDep.DeployErrorCode,
- Message: failDep.DeployErrorMessage,
- }))
-
- SetDB(saveDB)
- close(done)
- }))
-
- var err error
- apiServerBaseURI, err = url.Parse(ts.URL)
- Expect(err).NotTo(HaveOccurred())
-
- // init without info == startup on existing DB
- var snapshot = common.Snapshot{
- SnapshotInfo: "test",
- Tables: []common.Table{},
- }
-
- db, err := data.DBVersion(snapshot.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
-
- err = InitDBFullColumns(db)
- Expect(err).NotTo(HaveOccurred())
-
- tx, err := db.Begin()
- Expect(err).ShouldNot(HaveOccurred())
-
- err = InsertDeployment(tx, successDep)
- Expect(err).ShouldNot(HaveOccurred())
- err = InsertDeployment(tx, failDep)
- Expect(err).ShouldNot(HaveOccurred())
- err = InsertDeployment(tx, blankDep)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-
- apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
- })
- })
-
- Context("ApigeeSync change event", func() {
-
- It("inserting event should deliver the deployment to subscribers", func(done Done) {
-
- deploymentID := "add_test_1"
-
- event, dep := createChangeDeployment(deploymentID)
-
- // insert full deployment columns
- tx, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-
- var listener = make(chan deploymentsResult)
- addSubscriber <- listener
-
- apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
-
- // wait for event to propagate
- result := <-listener
- Expect(result.err).ShouldNot(HaveOccurred())
-
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d := deployments[0]
-
- Expect(d.ID).To(Equal(deploymentID))
- Expect(d.BundleName).To(Equal(dep.BundleName))
- Expect(d.BundleURI).To(Equal(dep.BundleURI))
-
- close(done)
- })
-
- It("delete event should deliver to subscribers", func(done Done) {
-
- deploymentID := "delete_test_1"
-
- // insert deployment
- event, dep := createChangeDeployment(deploymentID)
-
- // insert full deployment columns
- tx, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-
- listener := make(chan deploymentsResult)
- addSubscriber <- listener
- apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- // wait for event to propagate
- result := <-listener
- Expect(result.err).ShouldNot(HaveOccurred())
-
- // delete deployment
- deletDeploymentFromDb(dep, getDB())
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- event = common.ChangeList{
- Changes: []common.Change{
- {
- Operation: common.Delete,
- Table: DEPLOYMENT_TABLE,
- OldRow: row,
- },
- },
- }
-
- listener = make(chan deploymentsResult)
- addSubscriber <- listener
- apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- result = <-listener
- Expect(result.err).ShouldNot(HaveOccurred())
- Expect(len(result.deployments)).To(Equal(0))
- close(done)
- })
- })
-})
-
-func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) {
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
- bundle1Json, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
-
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
-
- changeList := common.ChangeList{
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: DEPLOYMENT_TABLE,
- NewRow: row,
- },
- },
- }
- dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow)
- return changeList, dep
-}
-
-func insertDeploymentToDb(dep DataDeployment, db apid.DB) {
- tx, err := db.Begin()
- Expect(err).ShouldNot(HaveOccurred())
- defer tx.Rollback()
- err = InsertTestDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-}
-
-func deletDeploymentFromDb(dep DataDeployment, db apid.DB) {
- tx, err := db.Begin()
- Expect(err).ShouldNot(HaveOccurred())
- defer tx.Rollback()
- err = deleteDeployment(tx, dep.ID)
- Expect(err).ShouldNot(HaveOccurred())
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-}
-
-func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) {
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
-
- jsonBytes, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
- fmt.Println("JSON :" + string(jsonBytes))
-
- dep := DataDeployment{
- ID: deploymentID,
- DataScopeID: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
- BundleChecksumType: bundle.ChecksumType,
- BundleConfigJSON: string(jsonBytes),
- }
-
- // init without info == startup on existing DB
- var snapshot = common.Snapshot{
- SnapshotInfo: snapInfo,
- Tables: []common.Table{},
- }
- return snapshot, dep
-}