Barebones Generic gateway deploy.
diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index a902a1c..0000000 --- a/.travis.yml +++ /dev/null
@@ -1,15 +0,0 @@ -language: go - -go: - - 1.7.x - -before_install: - - sudo add-apt-repository ppa:masterminds/glide -y - - sudo apt-get update -q - - sudo apt-get install glide -y - -install: - - glide install - -script: - - go test $(glide novendor)
diff --git a/api.go b/api.go index 676d6fc..d68a498 100644 --- a/api.go +++ b/api.go
@@ -48,16 +48,11 @@ } type ApiDeployment struct { - ID string `json:"id"` + Org string `json:"org"` + Env string `json:"env"` ScopeId string `json:"scopeId"` - Created string `json:"created"` - CreatedBy string `json:"createdBy"` - Updated string `json:"updated"` - UpdatedBy string `json:"updatedBy"` - ConfigJson json.RawMessage `json:"configuration"` - BundleConfigJson json.RawMessage `json:"bundleConfiguration"` - DisplayName string `json:"displayName"` - URI string `json:"uri"` + Type int `json:"type"` + BlobURL string `json:"url"` } // sent to client @@ -65,10 +60,12 @@ -const deploymentsEndpoint = "/deployments" +const deploymentsEndpoint = "/configurations" +const BlobEndpoint = "/blob/{blobId}" func InitAPI() { services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET") + services.API().HandleFunc(BlobEndpoint, apiReturnBlobData).Methods("GET") } func writeError(w http.ResponseWriter, status int, code int, reason string) { @@ -133,7 +130,7 @@ subscribers = make(map[chan deploymentsResult]struct{}) go func() { eTag := incrementETag() - deployments, err := getReadyDeployments() + deployments, err := getUnreadyDeployments() log.Debugf("delivering deployments to %d subscribers", len(subs)) for subscriber := range subs { log.Debugf("delivering to: %v", subscriber) @@ -150,6 +147,10 @@ } } +func apiReturnBlobData(w http.ResponseWriter, r *http.Request) { + +} + func apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) { // If returning without a bundle (immediately or after timeout), status = 404 @@ -233,16 +234,11 @@ for _, d := range dataDeps { apiDeps = append(apiDeps, ApiDeployment{ - ID: d.ID, - ScopeId: d.DataScopeID, - Created: convertTime(d.Created), - CreatedBy: d.CreatedBy, - Updated: convertTime(d.Updated), - UpdatedBy: d.UpdatedBy, - BundleConfigJson: []byte(d.BundleConfigJSON), - ConfigJson: []byte(d.ConfigJSON), - DisplayName: d.BundleName, - URI: d.LocalBundleURI, + ScopeId: d.DataScopeID, + Org: d.OrgID, + Env: d.EnvID, + Type: d.Type, + BlobURL: d.BlobURL, }) } @@ -269,17 +265,3 @@ return strconv.FormatInt(e, 10) } -func convertTime(t string) string { - if t == "" { - return "" - } - formats := []string{sqliteTimeFormat, sqlTimeFormat, iso8601, time.RFC3339} - for _, f := range formats { - timestamp, err := time.Parse(f, t) - if err == nil { - return timestamp.Format(iso8601) - } - } - log.Panic("convertTime: Unsupported time format: " + t) - return "" -}
diff --git a/api_test.go b/api_test.go deleted file mode 100644 index 1a9f81b..0000000 --- a/api_test.go +++ /dev/null
@@ -1,273 +0,0 @@ -package apiGatewayDeploy - -import ( - "bytes" - "encoding/json" - "io/ioutil" - "net/http" - "net/http/httptest" - "net/url" - "time" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "strconv" -) - -var _ = Describe("api", func() { - - Context("GET /deployments", func() { - - It("should get empty set if no deployments", func() { - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - - Expect(res.StatusCode).Should(Equal(http.StatusOK)) - - var depRes ApiDeploymentResponse - body, err := ioutil.ReadAll(res.Body) - Expect(err).ShouldNot(HaveOccurred()) - json.Unmarshal(body, &depRes) - - Expect(len(depRes)).To(Equal(0)) - Expect(string(body)).Should(Equal("[]")) - }) - - It("should debounce requests", func(done Done) { - var in = make(chan interface{}) - var out = make(chan []interface{}) - - go debounce(in, out, 3*time.Millisecond) - - go func() { - defer GinkgoRecover() - - received, ok := <-out - Expect(ok).To(BeTrue()) - Expect(len(received)).To(Equal(2)) - - close(in) - received, ok = <-out - Expect(ok).To(BeFalse()) - - close(done) - }() - - in <- "x" - in <- "y" - }) - - It("should get current deployments", func() { - - deploymentID := "api_get_current" - insertTestDeployment(testServer, deploymentID) - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - - Expect(res.StatusCode).Should(Equal(http.StatusOK)) - - var depRes ApiDeploymentResponse - body, err := ioutil.ReadAll(res.Body) - Expect(err).ShouldNot(HaveOccurred()) - json.Unmarshal(body, &depRes) - - Expect(len(depRes)).To(Equal(1)) - - dep := depRes[0] - - Expect(dep.ID).To(Equal(deploymentID)) - Expect(dep.ScopeId).To(Equal(deploymentID)) - Expect(dep.DisplayName).To(Equal(deploymentID)) - - var config bundleConfigJson - - err = json.Unmarshal(dep.ConfigJson, &config) - Expect(err).ShouldNot(HaveOccurred()) - Expect(config.Name).To(Equal("/bundles/1")) - - err = json.Unmarshal(dep.BundleConfigJson, &config) - Expect(err).ShouldNot(HaveOccurred()) - Expect(config.Name).To(Equal("/bundles/1")) - }) - - It("should get 304 for no change", func() { - - deploymentID := "api_no_change" - insertTestDeployment(testServer, deploymentID) - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) - - req, err := http.NewRequest("GET", uri.String(), nil) - req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", res.Header.Get("etag")) - - res, err = http.DefaultClient.Do(req) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.StatusCode).To(Equal(http.StatusNotModified)) - }) - - It("should get empty set after blocking if no deployments", func() { - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - query := uri.Query() - query.Add("block", "1") - uri.RawQuery = query.Encode() - - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - - var depRes ApiDeploymentResponse - body, err := ioutil.ReadAll(res.Body) - Expect(err).ShouldNot(HaveOccurred()) - json.Unmarshal(body, &depRes) - - Expect(res.StatusCode).Should(Equal(http.StatusOK)) - Expect(string(body)).Should(Equal("[]")) - }) - - It("should get new deployment set after blocking", func(done Done) { - - deploymentID := "api_get_current_blocking" - insertTestDeployment(testServer, deploymentID) - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - eTag := res.Header.Get("etag") - Expect(eTag).ShouldNot(BeEmpty()) - - deploymentID = "api_get_current_blocking2" - go func() { - defer GinkgoRecover() - - query := uri.Query() - query.Add("block", "1") - uri.RawQuery = query.Encode() - req, err := http.NewRequest("GET", uri.String(), nil) - req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", eTag) - - res, err := http.DefaultClient.Do(req) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.StatusCode).To(Equal(http.StatusOK)) - - Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) - Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag)) - - var depRes ApiDeploymentResponse - body, err := ioutil.ReadAll(res.Body) - Expect(err).ShouldNot(HaveOccurred()) - json.Unmarshal(body, &depRes) - - Expect(len(depRes)).To(Equal(2)) - - dep := depRes[1] - - Expect(dep.ID).To(Equal(deploymentID)) - Expect(dep.ScopeId).To(Equal(deploymentID)) - Expect(dep.DisplayName).To(Equal(deploymentID)) - - close(done) - }() - - time.Sleep(250 * time.Millisecond) // give api call above time to block - insertTestDeployment(testServer, deploymentID) - deploymentsChanged <- deploymentID - }) - - It("should get 304 after blocking if no new deployment", func() { - - deploymentID := "api_no_change_blocking" - insertTestDeployment(testServer, deploymentID) - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) - - query := uri.Query() - query.Add("block", "1") - uri.RawQuery = query.Encode() - req, err := http.NewRequest("GET", uri.String(), nil) - req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", res.Header.Get("etag")) - - res, err = http.DefaultClient.Do(req) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.StatusCode).To(Equal(http.StatusNotModified)) - }) - }) - -}) - -func insertTestDeployment(testServer *httptest.Server, deploymentID string) { - - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - bundleJson, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - - tx, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - - dep := DataDeployment{ - ID: deploymentID, - BundleConfigID: deploymentID, - ApidClusterID: deploymentID, - DataScopeID: deploymentID, - BundleConfigJSON: string(bundleJson), - ConfigJSON: string(bundleJson), - Created: "", - CreatedBy: "", - Updated: "", - UpdatedBy: "", - BundleName: deploymentID, - BundleURI: bundle.URI, - BundleChecksum: bundle.Checksum, - BundleChecksumType: bundle.ChecksumType, - LocalBundleURI: "x", - DeployStatus: "", - DeployErrorCode: 0, - DeployErrorMessage: "", - } - - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) -} - -
diff --git a/apidGatewayDeploy-api.yaml b/apidGatewayDeploy-api.yaml deleted file mode 100644 index c26d47f..0000000 --- a/apidGatewayDeploy-api.yaml +++ /dev/null
@@ -1,231 +0,0 @@ -swagger: '2.0' -info: - version: 0.0.1 - title: Edge X Apid Gateway Deploy - contact: - name: Apigee, Inc. - url: http://www.apigee.com/ - email: sales@apigee.com - license: - name: Apache 2.0 - url: https://www.apache.org/licenses/LICENSE-2.0 -basePath: /deployments -schemes: - - http -consumes: - - application/json -produces: - - application/json -paths: - /: - get: - description: Retrieve current deployment system and bundles to install. - parameters: - - name: If-None-Match - in: header - type: string - description: "If request If-None-Match header matches the ETag of deployment, the server returns a 304 Not Modified response indicating that the client already has the most recent bundle list." - - name: block - in: query - type: integer - description: 'If block > 0 AND if there is no new bundle list available, then block for up to the specified number of seconds until a new bundle list becomes available. If no new deployment becomes available, then return 304 Not Modified if If-None-Match is specified.' - responses: - '200': - headers: - ETag: - description: "Client should reuse ETag value in If-None-Match header of the next GET request." - type: string - description: The deployment system and bundles to install. - examples: - application/json: [ - { - "id": "1234567890", - "displayName": "abc123", - "created":"1481917061", - "updated":"1481917061", - "createdBy":"mdobs", - "updatedBy":"mdobs", - "scopeId": "ABCDEF", - "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", - "configurationJson": { - "PropA": "scope1bundle1propA", - "PropSCOPE_LEVEL": "aaa", - "PropCLUSTER_LEVEL": "xxx" - } - }, - { - "id": "1234567891", - "displayName": "abc123_2", - "created":"1481917061", - "updated":"1481917061", - "createdBy":"mdobs", - "updatedBy":"mdobs", - "scopeId": "ABCDEF", - "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", - "configurationJson": { - "PropA": "scope1bundle2propA", - "PropSCOPE_LEVEL": "aaa", - "PropCLUSTER_LEVEL": "xxx" - } - }, - { - "id": "1234567892", - "displayName": "abc1234", - "created":"1481917061", - "updated":"1481917061", - "createdBy":"mdobs", - "updatedBy":"mdobs", - "scopeId": "ABCDEF", - "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", - "configurationJson": { - "PropA": "scope1bundlepropA", - "PropSCOPE_LEVEL": "aaa", - "PropCLUSTER_LEVEL": "xxx" - } - }, - { - "id": "1234567893", - "displayName": "abc123", - "created":"1481917061", - "updated":"1481917061", - "createdBy":"mdobs", - "updatedBy":"mdobs", - "scopeId": "EFGHIJK", - "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", - "configurationJson": { - "PropA": "scope2bundle1propA", - "PropSCOPE_LEVEL": "bbb", - "PropCLUSTER_LEVEL": "xxx" - } - }, - { - "id": "1234567894", - "displayName": "abc123_2", - "created":"1481917061", - "updated":"1481917061", - "createdBy":"fierrom", - "updatedBy":"fierrom", - "scopeId": "EFGHIJK", - "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", - "configurationJson": { - "PropA": "scope2bundle2propA", - "PropSCOPE_LEVEL": "bbb", - "PropCLUSTER_LEVEL": "xxx" - } - } -] - schema: - $ref: '#/definitions/DeploymentResponse' - '304': - description: Deployment not modified. - put: - description: Save results of deployment - parameters: - - name: _ - in: body - required: true - description: Success or failure response - schema: - $ref: '#/definitions/DeploymentResult' - responses: - '200': - description: OK - default: - description: Error response - schema: - $ref: '#/definitions/ErrorResponse' - -definitions: - - ErrorResponse: - required: - - errorCode - - reason - properties: - errorCode: - type: number - reason: - type: string - example: { - "errorCode": 601, - "reason": "Something's wrong" - } - - DeploymentResponse: - type: array - items: - $ref: '#/definitions/DeploymentBundle' - - DeploymentBundle: - type: object - required: - - id - - scopeId - - createdBy - - created - - updatedBy - - updated - - displayName - - uri - - configurationJson - properties: - id: - type: string - scopeId: - type: string - createdBy: - type: string - created: - type: number - updatedBy: - type: string - updated: - type: number - displayName: - type: string - uri: - type: string - configurationJson: - type: object - - DeploymentResult: - type: array - items: - $ref: '#/definitions/DeploymentBundleResult' - example: [ - { - "id": "1234567890", - "status": "SUCCESS" - }, - { - "id": "1234567890", - "status": "SUCCESS" - }, - { - "id": "1234567890", - "status": "SUCCESS" - } - ] - - DeploymentBundleResult: - type: object - required: - - id - - status - properties: - id: - type: string - message: - type: string - errorCode: - type: number - status: - type: string - enum: - - "SUCCESS" - - "FAIL" - description: Status of SUCCESS or FAIL plus error - example: { - "id": 1234567890, - "status": "SUCCESS" - } \ No newline at end of file
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go deleted file mode 100644 index 236fe51..0000000 --- a/apidGatewayDeploy_suite_test.go +++ /dev/null
@@ -1,138 +0,0 @@ -package apiGatewayDeploy - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "encoding/hex" - - "github.com/30x/apid-core" - "github.com/30x/apid-core/factory" - - "io/ioutil" - "net/http" - "net/http/httptest" - "net/url" - "os" - "testing" - "time" -) - -var ( - tmpDir string - testServer *httptest.Server - testLastTrackerVars map[string]string - testLastTrackerBody []byte -) - -var _ = BeforeSuite(func() { - apid.Initialize(factory.DefaultServicesFactory()) - - config := apid.Config() - - var err error - tmpDir, err = ioutil.TempDir("", "api_test") - Expect(err).NotTo(HaveOccurred()) - - config.Set("local_storage_path", tmpDir) - config.Set(configApidInstanceID, "INSTANCE_ID") - config.Set(configApidClusterID, "CLUSTER_ID") - config.Set(configApiServerBaseURI, "http://localhost") - config.Set(configDebounceDuration, "1ms") - - apid.InitializePlugins("") - - // init full DB - db, err := data.DB() - Expect(err).NotTo(HaveOccurred()) - err = InitDBFullColumns(db) - Expect(err).NotTo(HaveOccurred()) - SetDB(db) - - bundleCleanupDelay = time.Millisecond - bundleRetryDelay = 10 * time.Millisecond - markDeploymentFailedAfter = 50 * time.Millisecond - concurrentDownloads = 1 - downloadQueueSize = 1 - - router := apid.API().Router() - // fake an unreliable bundle repo - count := 1 - failedOnce := false - router.HandleFunc("/bundles/failonce", func(w http.ResponseWriter, req *http.Request) { - if failedOnce { - vars := apid.API().Vars(req) - w.Write([]byte("/bundles/" + vars["id"])) - } else { - failedOnce = true - w.WriteHeader(500) - } - }).Methods("GET") - - router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) { - count++ - vars := apid.API().Vars(req) - if count%2 == 0 && vars["id"] != "checksum" { - w.WriteHeader(500) - return - } - if vars["id"] == "longfail" { - time.Sleep(markDeploymentFailedAfter + (250 * time.Millisecond)) - } - w.Write([]byte("/bundles/" + vars["id"])) - - }).Methods("GET") - - // fake an unreliable APID tracker - router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments", - func(w http.ResponseWriter, req *http.Request) { - count++ - if count%2 == 0 { - w.WriteHeader(500) - return - } - - testLastTrackerVars = apid.API().Vars(req) - testLastTrackerBody, err = ioutil.ReadAll(req.Body) - Expect(err).ToNot(HaveOccurred()) - - w.Write([]byte("OK")) - - }).Methods("PUT") - testServer = httptest.NewServer(router) -}) - -var _ = AfterSuite(func() { - apid.Events().Close() - if testServer != nil { - testServer.Close() - } - os.RemoveAll(tmpDir) -}) - -var _ = BeforeEach(func() { - var err error - apiServerBaseURI, err = url.Parse(testServer.URL) - Expect(err).NotTo(HaveOccurred()) - - _, err = getDB().Exec("DELETE FROM edgex_deployment") - Expect(err).ShouldNot(HaveOccurred()) - - _, err = getDB().Exec("UPDATE etag SET value=1") -}) - -func TestApidGatewayDeploy(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "ApidGatewayDeploy Suite") -} - -func testGetChecksum(hashType, uri string) string { - url, err := url.Parse(uri) - Expect(err).NotTo(HaveOccurred()) - - hashWriter, err := getHashWriter(hashType) - Expect(err).NotTo(HaveOccurred()) - - hashWriter.Write([]byte(url.Path)) - return hex.EncodeToString(hashWriter.Sum(nil)) -}
diff --git a/bundle.go b/bundle.go index 5a2c977..8bd756c 100644 --- a/bundle.go +++ b/bundle.go
@@ -1,22 +1,15 @@ package apiGatewayDeploy import ( - "crypto/md5" - "crypto/sha256" - "crypto/sha512" + "encoding/base64" - "encoding/hex" - "errors" "fmt" - "hash" - "hash/crc32" "io" "io/ioutil" "net/http" "net/url" "os" "path" - "strings" "time" ) @@ -42,19 +35,11 @@ func queueDownloadRequest(dep DataDeployment) { - hashWriter, err := getHashWriter(dep.BundleChecksumType) - if err != nil { - msg := fmt.Sprintf("invalid bundle checksum type: %s for deployment: %s", dep.BundleChecksumType, dep.ID) - log.Error(msg) - return - } - retryIn := bundleRetryDelay maxBackOff := 5 * time.Minute markFailedAt := time.Now().Add(markDeploymentFailedAfter) req := &DownloadRequest{ dep: dep, - hashWriter: hashWriter, bundleFile: getBundleFile(dep), backoffFunc: createBackoff(retryIn, maxBackOff), markFailedAt: markFailedAt, @@ -64,7 +49,6 @@ type DownloadRequest struct { dep DataDeployment - hashWriter hash.Hash bundleFile string backoffFunc func() markFailedAt time.Time @@ -73,18 +57,11 @@ func (r *DownloadRequest) downloadBundle() { dep := r.dep - log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BundleURI) - - deployments, err := getDeployments("WHERE id=$1", dep.ID) - if err == nil && len(deployments) == 0 { - log.Debugf("never mind, deployment %s was deleted", dep.ID) - return - } + log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BlobID) r.checkTimeout() - r.hashWriter.Reset() - tempFile, err := downloadFromURI(dep.BundleURI, r.hashWriter, dep.BundleChecksum) + tempFile, err := downloadFromURI(dep.BlobID) if err == nil { err = os.Rename(tempFile, r.bundleFile) @@ -98,7 +75,7 @@ } if err == nil { - err = updateLocalBundleURI(dep.ID, r.bundleFile) + err = updatelocal_fs_location(dep.BlobID, r.bundleFile) } if err != nil { @@ -110,7 +87,7 @@ return } - log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI) + log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BlobID) // send deployments to client deploymentsChanged <- dep.ID @@ -122,25 +99,59 @@ if time.Now().After(r.markFailedAt) { r.markFailedAt = time.Time{} log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", - r.dep.ID, r.dep.BundleURI) + r.dep.ID, r.dep.BlobID) } } } func getBundleFile(dep DataDeployment) string { - // the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI + // the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BlobID // unfortunately, this also means that a bundle cache isn't especially relevant fileName := dep.DataScopeID + "_" + dep.ID return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(fileName))) } -func downloadFromURI(uri string, hashWriter hash.Hash, expectedHash string) (tempFileName string, err error) { +func getSignedURL(blobId string) (string, error) { - log.Debugf("Downloading bundle: %s", uri) + blobUri, err := url.Parse(config.GetString(configBlobServerBaseURI)) + if err != nil { + log.Panicf("bad url value for config %s: %s", blobUri, err) + } + + //TODO : Just a temp Hack + blobUri.Path = path.Join(blobUri.Path, "/v1/blobstore/signeduri?action=GET&key=" + blobId) + uri := blobUri.String() + + surl, err := getURIReader(uri) + if err != nil { + log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err) + return "", err + } + + signedURL, err := ioutil.ReadAll(surl) + if err != nil { + log.Errorf("Invalid response from BlobServer for {%s} error: {%v}", uri, err) + return "", err + } + return string(signedURL), nil +} + + +// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally +// after downloading the resource from GCS (via the signed URL) +func downloadFromURI(blobId string) (tempFileName string, err error) { var tempFile *os.File + log.Debugf("Downloading bundle: %s", blobId) + + uri, err := getSignedURL(blobId) + if err != nil { + log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err) + return + } + tempFile, err = ioutil.TempFile(bundlePath, "download") if err != nil { log.Errorf("Unable to create temp file: %v", err) @@ -149,55 +160,27 @@ defer tempFile.Close() tempFileName = tempFile.Name() - var bundleReader io.ReadCloser - bundleReader, err = getURIFileReader(uri) + var confReader io.ReadCloser + confReader, err = getURIReader(uri) if err != nil { log.Errorf("Unable to retrieve bundle %s: %v", uri, err) return } - defer bundleReader.Close() + defer confReader.Close() - // track checksum - teedReader := io.TeeReader(bundleReader, hashWriter) - - _, err = io.Copy(tempFile, teedReader) + _, err = io.Copy(tempFile, confReader) if err != nil { log.Errorf("Unable to write bundle %s: %v", tempFileName, err) return } - // check checksum - checksum := hex.EncodeToString(hashWriter.Sum(nil)) - if checksum != expectedHash { - err = errors.New(fmt.Sprintf("Bad checksum on %s. calculated: %s, given: %s", tempFileName, checksum, expectedHash)) - log.Error(err.Error()) - return - } - log.Debugf("Bundle %s downloaded to: %s", uri, tempFileName) return } // retrieveBundle retrieves bundle data from a URI -func getURIFileReader(uriString string) (io.ReadCloser, error) { +func getURIReader(uriString string) (io.ReadCloser, error) { - uri, err := url.Parse(uriString) - if err != nil { - return nil, fmt.Errorf("DownloadFileUrl: Failed to parse urlStr: %s", uriString) - } - - // todo: add authentication - TBD? - - // assume it's a file if no scheme - todo: remove file support? - if uri.Scheme == "" || uri.Scheme == "file" { - f, err := os.Open(uri.Path) - if err != nil { - return nil, err - } - return f, nil - } - - // GET the contents at uriString client := http.Client{ Timeout: bundleDownloadConnTimeout, } @@ -206,42 +189,11 @@ return nil, err } if res.StatusCode != 200 { - return nil, fmt.Errorf("Bundle uri %s failed with status %d", uriString, res.StatusCode) + return nil, fmt.Errorf("GET uri %s failed with status %d", uriString, res.StatusCode) } return res.Body, nil } -func getHashWriter(hashType string) (hash.Hash, error) { - - var hashWriter hash.Hash - - switch strings.ToLower(hashType) { - case "": - hashWriter = fakeHash{md5.New()} - case "md5": - hashWriter = md5.New() - case "crc32": - hashWriter = crc32.NewIEEE() - case "sha256": - hashWriter = sha256.New() - case "sha512": - hashWriter = sha512.New() - default: - return nil, errors.New( - fmt.Sprintf("invalid checksumType: %s. valid types: md5, crc32, sha256, sha512", hashType)) - } - - return hashWriter, nil -} - -type fakeHash struct { - hash.Hash -} - -func (f fakeHash) Sum(b []byte) []byte { - return []byte("") -} - func initializeBundleDownloading() { // create workers
diff --git a/bundle_test.go b/bundle_test.go deleted file mode 100644 index 46e7a74..0000000 --- a/bundle_test.go +++ /dev/null
@@ -1,338 +0,0 @@ -package apiGatewayDeploy - -import ( - "encoding/json" - "net/url" - "time" - - "net/http" - "net/http/httptest" - - "io/ioutil" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("bundle", func() { - - Context("download", func() { - - It("should timeout connection and retry", func() { - defer func() { - bundleDownloadConnTimeout = time.Second - }() - bundleDownloadConnTimeout = 100 * time.Millisecond - firstTime := true - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if firstTime { - firstTime = false - time.Sleep(1 * time.Second) - w.WriteHeader(500) - } else { - w.Write([]byte("/bundles/longfail")) - } - })) - defer ts.Close() - - uri, err := url.Parse(ts.URL) - Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundles/longfail" - - tx, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - - deploymentID := "bundle_download_fail" - dep := DataDeployment{ - ID: deploymentID, - DataScopeID: deploymentID, - BundleURI: uri.String(), - BundleChecksum: testGetChecksum("crc32", uri.String()), - BundleChecksumType: "crc32", - } - - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) - - queueDownloadRequest(dep) - - var listener = make(chan deploymentsResult) - addSubscriber <- listener - result := <-listener - - Expect(result.err).NotTo(HaveOccurred()) - Expect(len(result.deployments)).To(Equal(1)) - d := result.deployments[0] - Expect(d.ID).To(Equal(deploymentID)) - }) - - It("should timeout deployment, mark status as failed, then finish", func() { - - proceed := make(chan bool) - failedOnce := false - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if failedOnce { - proceed <- true - time.Sleep(markDeploymentFailedAfter) - w.Write([]byte("/bundles/longfail")) - } else { - failedOnce = true - time.Sleep(markDeploymentFailedAfter) - w.WriteHeader(500) - } - })) - defer ts.Close() - - deploymentID := "bundle_download_fail" - - uri, err := url.Parse(ts.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/longfail" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - bundleJson, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - - tx, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - - dep := DataDeployment{ - ID: deploymentID, - BundleConfigID: deploymentID, - ApidClusterID: deploymentID, - DataScopeID: deploymentID, - BundleConfigJSON: string(bundleJson), - ConfigJSON: string(bundleJson), - Created: "", - CreatedBy: "", - Updated: "", - UpdatedBy: "", - BundleName: deploymentID, - BundleURI: bundle.URI, - BundleChecksum: bundle.Checksum, - BundleChecksumType: bundle.ChecksumType, - LocalBundleURI: "", - DeployStatus: "", - DeployErrorCode: 0, - DeployErrorMessage: "", - } - - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) - - trackerHit := false - tracker := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer GinkgoRecover() - - b, err := ioutil.ReadAll(r.Body) - Expect(err).ShouldNot(HaveOccurred()) - var received apiDeploymentResults - json.Unmarshal(b, &received) - - expected := apiDeploymentResults{ - { - ID: deploymentID, - Status: RESPONSE_STATUS_FAIL, - ErrorCode: TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT, - Message: "bundle download failed", - }, - } - Expect(received).To(Equal(expected)) - trackerHit = true - w.Write([]byte("OK")) - })) - defer tracker.Close() - apiServerBaseURI, err = url.Parse(tracker.URL) - Expect(err).ShouldNot(HaveOccurred()) - - queueDownloadRequest(dep) - - <-proceed - - // get error state deployment - deployments, err := getDeployments("WHERE id=$1", deploymentID) - Expect(err).ShouldNot(HaveOccurred()) - - Expect(len(deployments)).To(Equal(1)) - d := deployments[0] - - Expect(d.ID).To(Equal(deploymentID)) - Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL)) - Expect(d.DeployErrorCode).To(Equal(TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT)) - Expect(d.DeployErrorMessage).ToNot(BeEmpty()) - Expect(d.LocalBundleURI).To(BeEmpty()) - - Expect(trackerHit).To(BeTrue()) - - var listener = make(chan deploymentsResult) - addSubscriber <- listener - <-listener - - // get finished deployment - // still in error state (let client update), but with valid local bundle - deployments, err = getDeployments("WHERE id=$1", deploymentID) - Expect(err).ShouldNot(HaveOccurred()) - - Expect(len(deployments)).To(Equal(1)) - d = deployments[0] - - Expect(d.ID).To(Equal(deploymentID)) - Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL)) - Expect(d.DeployErrorCode).To(Equal(TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT)) - Expect(d.DeployErrorMessage).ToNot(BeEmpty()) - Expect(d.LocalBundleURI).To(BeAnExistingFile()) - }) - - It("should not continue attempts if deployment has been deleted", func() { - - deploymentID := "bundle_download_deployment_deleted" - - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/failonce" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - bundleJson, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - - tx, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - - dep := DataDeployment{ - ID: deploymentID, - BundleConfigID: deploymentID, - ApidClusterID: deploymentID, - DataScopeID: deploymentID, - BundleConfigJSON: string(bundleJson), - ConfigJSON: string(bundleJson), - Created: "", - CreatedBy: "", - Updated: "", - UpdatedBy: "", - BundleName: deploymentID, - BundleURI: bundle.URI, - BundleChecksum: bundle.Checksum, - BundleChecksumType: bundle.ChecksumType, - LocalBundleURI: "", - DeployStatus: "", - DeployErrorCode: 0, - DeployErrorMessage: "", - } - - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) - - queueDownloadRequest(dep) - - // skip first try - time.Sleep(bundleRetryDelay) - - // delete deployment - tx, err = getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - deleteDeployment(tx, dep.ID) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) - - // wait for final - time.Sleep(bundleRetryDelay) - - // No way to test this programmatically currently - // search logs for "never mind, deployment bundle_download_deployment_deleted was deleted" - }) - }) - - Context("checksums", func() { - - It("should download with empty checksumType", func() { - checksumDownloadValid("") - }) - - It("should fail download with bad empty checksumType", func() { - checksumDownloadInvalid("") - }) - - It("should fail download with bad md5", func() { - checksumDownloadInvalid("md5") - }) - - It("should download with good md5", func() { - checksumDownloadValid("md5") - }) - - It("should fail download with bad md5", func() { - checksumDownloadInvalid("md5") - }) - - It("should download with good crc32", func() { - checksumDownloadValid("crc32") - }) - - It("should fail download with bad crc32", func() { - checksumDownloadInvalid("crc32") - }) - - It("should download with good sha256", func() { - checksumDownloadValid("sha256") - }) - - It("should fail download with bad sha256", func() { - checksumDownloadInvalid("sha256") - }) - - It("should download correctly with sha512", func() { - checksumDownloadValid("sha512") - }) - - It("should fail download with bad sha512", func() { - checksumDownloadInvalid("sha512") - }) - }) -}) - -func checksumDownloadValid(checksumType string) { - defer GinkgoRecover() - - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundles/checksum" - checksum := testGetChecksum(checksumType, uri.String()) - hash, err := getHashWriter(checksumType) - Expect(err).NotTo(HaveOccurred()) - _, err = downloadFromURI(uri.String(), hash, checksum) - Expect(err).NotTo(HaveOccurred()) -} - -func checksumDownloadInvalid(checksumType string) { - defer GinkgoRecover() - - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundles/checksum" - checksum := "invalidchecksum" - hash, err := getHashWriter(checksumType) - Expect(err).NotTo(HaveOccurred()) - _, err = downloadFromURI(uri.String(), hash, checksum) - Expect(err).To(HaveOccurred()) -}
diff --git a/cmd/apidGatewayDeploy/deployments.json b/cmd/apidGatewayDeploy/deployments.json deleted file mode 100644 index 72efd9f..0000000 --- a/cmd/apidGatewayDeploy/deployments.json +++ /dev/null
@@ -1,24 +0,0 @@ -[ - { - "id": "api_no_change_blocking", - "scopeId": "api_no_change_blocking", - "created": "", - "createdBy": "", - "updated": "", - "updatedBy": "", - "configuration": { - "name": "/bundles/1", - "uri": "http://127.0.0.1:53589/bundles/1", - "checksumType": "crc-32", - "checksum": "c65a0f2a" - }, - "bundleConfiguration": { - "name": "/bundles/1", - "uri": "http://127.0.0.1:53589/bundles/1", - "checksumType": "crc-32", - "checksum": "c65a0f2a" - }, - "displayName": "api_no_change_blocking", - "uri": "x" - } -] \ No newline at end of file
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go deleted file mode 100644 index 3939944..0000000 --- a/cmd/apidGatewayDeploy/main.go +++ /dev/null
@@ -1,132 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "github.com/30x/apid-core" - "github.com/30x/apid-core/factory" - "github.com/30x/apidGatewayDeploy" - _ "github.com/30x/apidGatewayDeploy" - "io/ioutil" - "os" -) - -func main() { - deploymentsFlag := flag.String("deployments", "", "file path to a deployments file (for testing)") - flag.Parse() - deploymentsFile := *deploymentsFlag - - apid.Initialize(factory.DefaultServicesFactory()) - - log := apid.Log() - log.Debug("initializing...") - - configService := apid.Config() - - var deployments apiGatewayDeploy.ApiDeploymentResponse - if deploymentsFile != "" { - log.Printf("Running in temp dir using deployments file: %s", deploymentsFile) - tmpDir, err := ioutil.TempDir("", "apidGatewayDeploy") - if err != nil { - log.Panicf("ERROR: Unable to create temp dir", err) - } - defer os.RemoveAll(tmpDir) - - configService.Set("data_path", tmpDir) - - if deploymentsFile != "" { - bytes, err := ioutil.ReadFile(deploymentsFile) - if err != nil { - log.Errorf("ERROR: Unable to read bundle config file at %s", deploymentsFile) - return - - } - - err = json.Unmarshal(bytes, &deployments) - if err != nil { - log.Errorf("ERROR: Unable to parse deployments %v", err) - return - } - } - } - - apid.InitializePlugins("") - - insertTestDeployments(deployments) - - // print the base url to the console - basePath := "/deployments" - port := configService.GetString("api_port") - log.Print() - log.Printf("API is at: http://localhost:%s%s", port, basePath) - log.Print() - - // start client API listener - api := apid.API() - err := api.Listen() - if err != nil { - log.Print(err) - } -} - -func insertTestDeployments(deployments apiGatewayDeploy.ApiDeploymentResponse) error { - - if len(deployments) == 0 { - return nil - } - - log := apid.Log() - - db, err := apid.Data().DB() - if err != nil { - return err - } - apiGatewayDeploy.SetDB(db) - - err = apiGatewayDeploy.InitDB(db) - if err != nil { - return err - } - - tx, err := db.Begin() - if err != nil { - return err - } - - for _, ad := range deployments { - - dep := apiGatewayDeploy.DataDeployment{ - ID: ad.ID, - BundleConfigID: ad.ID, - ApidClusterID: ad.ID, - DataScopeID: ad.ScopeId, - BundleConfigJSON: string(ad.BundleConfigJson), - ConfigJSON: string(ad.ConfigJson), - Created: "", - CreatedBy: "", - Updated: "", - UpdatedBy: "", - BundleName: ad.DisplayName, - BundleURI: ad.URI, - BundleChecksum: "", - BundleChecksumType: "", - LocalBundleURI: ad.URI, - } - - err = apiGatewayDeploy.InsertDeployment(tx, dep) - if err != nil { - log.Error("Unable to insert deployment") - return err - } - - } - - err = tx.Commit() - if err != nil { - return err - } - - apiGatewayDeploy.InitAPI() - - return nil -}
diff --git a/cover.sh b/cover.sh deleted file mode 100755 index a378f75..0000000 --- a/cover.sh +++ /dev/null
@@ -1,13 +0,0 @@ -#!/usr/bin/env bash - -set -e -echo "mode: atomic" > coverage.txt - -for d in $(go list ./... | grep -v vendor); do - go test -coverprofile=profile.out -covermode=atomic $d - if [ -f profile.out ]; then - tail -n +2 profile.out >> coverage.txt - rm profile.out - fi -done -go tool cover -html=coverage.txt -o cover.html
diff --git a/data.go b/data.go index b09c0cb..d261023 100644 --- a/data.go +++ b/data.go
@@ -4,9 +4,8 @@ "database/sql" "sync" - "encoding/json" "github.com/30x/apid-core" - "strings" + ) var ( @@ -16,76 +15,32 @@ type DataDeployment struct { ID string - BundleConfigID string - ApidClusterID string + OrgID string + EnvID string DataScopeID string - BundleConfigJSON string - ConfigJSON string - Created string - CreatedBy string + Type int + Name string + Revision string + BlobID string + BlobResourceID string Updated string UpdatedBy string - BundleName string - BundleURI string - LocalBundleURI string - BundleChecksum string - BundleChecksumType string - DeployStatus string - DeployErrorCode int - DeployErrorMessage string + Created string + CreatedBy string + BlobFSLocation string + BlobURL string } type SQLExec interface { Exec(query string, args ...interface{}) (sql.Result, error) } -func InitDBFullColumns(db apid.DB) error { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS edgex_deployment ( - id character varying(36) NOT NULL, - bundle_config_id varchar(36) NOT NULL, - apid_cluster_id varchar(36) NOT NULL, - data_scope_id varchar(36) NOT NULL, - bundle_config_json text NOT NULL, - config_json text NOT NULL, - created timestamp without time zone, - created_by text, - updated timestamp without time zone, - updated_by text, - bundle_config_name text, - bundle_uri text, - local_bundle_uri text, - bundle_checksum text, - bundle_checksum_type text, - deploy_status string, - deploy_error_code int, - deploy_error_message text, - PRIMARY KEY (id) - ); - `) - if err != nil { - return err - } - - log.Debug("Database tables created.") - return nil -} - func InitDB(db apid.DB) error { _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS edgex_deployment ( - id character varying(36) NOT NULL, - bundle_config_id varchar(36) NOT NULL, - apid_cluster_id varchar(36) NOT NULL, - data_scope_id varchar(36) NOT NULL, - bundle_config_json text NOT NULL, - config_json text NOT NULL, - created timestamp without time zone, - created_by text, - updated timestamp without time zone, - updated_by text, - bundle_config_name text, - PRIMARY KEY (id) + CREATE TABLE IF NOT EXISTS edgex_blob_available ( + blob_id character varying NOT NULL, + local_fs_location character varying NOT NULL, + access_url character varying ); `) if err != nil { @@ -96,31 +51,6 @@ return nil } -func alterTable(db apid.DB) error { - queries := []string{ - "ALTER TABLE edgex_deployment ADD COLUMN bundle_uri text;", - "ALTER TABLE edgex_deployment ADD COLUMN local_bundle_uri text;", - "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum text;", - "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum_type text;", - "ALTER TABLE edgex_deployment ADD COLUMN deploy_status string;", - "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_code int;", - "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_message text;", - } - - for _, query := range queries { - _, err := db.Exec(query) - if err != nil { - if strings.Contains(err.Error(), "duplicate column name") { - log.Warnf("AlterTable warning: %s", err) - } else { - return err - } - } - } - log.Debug("Database table altered.") - return nil -} - func getDB() apid.DB { dbMux.RLock() db := unsafeDB @@ -136,232 +66,113 @@ unsafeDB = db } -func InsertDeployment(tx *sql.Tx, dep DataDeployment) error { - return insertDeployments(tx, []DataDeployment{dep}) -} - -func insertDeployments(tx *sql.Tx, deps []DataDeployment) error { - - log.Debugf("inserting %d edgex_deployment", len(deps)) - - stmt, err := tx.Prepare(` - INSERT INTO edgex_deployment - (id, bundle_config_id, apid_cluster_id, data_scope_id, - bundle_config_json, config_json, created, created_by, - updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri, - bundle_checksum, bundle_checksum_type, deploy_status, - deploy_error_code, deploy_error_message) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18); - `) - if err != nil { - log.Errorf("prepare insert into edgex_deployment failed: %v", err) - return err - } - defer stmt.Close() - - for _, dep := range deps { - log.Debugf("insertDeployment: %s", dep.ID) - - _, err = stmt.Exec( - dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID, - dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy, - dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI, - dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus, - dep.DeployErrorCode, dep.DeployErrorMessage) - if err != nil { - log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err) - return err - } - } - - log.Debug("inserting edgex_deployment succeeded") - return err -} - -func updateDeploymentsColumns(tx *sql.Tx, deps []DataDeployment) error { - - log.Debugf("updating %d edgex_deployment", len(deps)) - - stmt, err := tx.Prepare(` - UPDATE edgex_deployment SET - (bundle_uri, local_bundle_uri, - bundle_checksum, bundle_checksum_type, deploy_status, - deploy_error_code, deploy_error_message) - = ($1,$2,$3,$4,$5,$6,$7) WHERE id = $8 - `) - if err != nil { - log.Errorf("prepare update edgex_deployment failed: %v", err) - return err - } - defer stmt.Close() - - for _, dep := range deps { - log.Debugf("updateDeploymentsColumns: processing deployment %s, %v", dep.ID, dep.BundleURI) - - _, err = stmt.Exec( - dep.BundleURI, - dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus, - dep.DeployErrorCode, dep.DeployErrorMessage, dep.ID) - if err != nil { - log.Errorf("updateDeploymentsColumns of edgex_deployment %s failed: %v", dep.ID, err) - return err - } - } - - log.Debug("updateDeploymentsColumns of edgex_deployment succeeded") - return err -} - -func getDeploymentsToUpdate(db apid.DB) (deployments []DataDeployment, err error) { - deployments, err = getDeployments("WHERE bundle_uri IS NULL AND local_bundle_uri IS NULL AND deploy_status IS NULL") - if err != nil { - log.Errorf("getDeployments in getDeploymentsToUpdate failed: %v", err) - return - } - var bc bundleConfigJson - for i, _ := range deployments { - log.Debugf("getDeploymentsToUpdate: processing deployment %v, %v", deployments[i].ID, deployments[i].BundleConfigJSON) - json.Unmarshal([]byte(deployments[i].BundleConfigJSON), &bc) - if err != nil { - log.Errorf("JSON decoding Manifest failed: %v", err) - return - } - deployments[i].BundleName = bc.Name - deployments[i].BundleURI = bc.URI - deployments[i].BundleChecksumType = bc.ChecksumType - deployments[i].BundleChecksum = bc.Checksum - - log.Debugf("Unmarshal: %v", deployments[i].BundleURI) - } - return -} - -func deleteDeployment(tx *sql.Tx, depID string) error { - - log.Debugf("deleteDeployment: %s", depID) - - stmt, err := tx.Prepare("DELETE FROM edgex_deployment where id = $1;") - if err != nil { - log.Errorf("prepare delete from edgex_deployment %s failed: %v", depID, err) - return err - } - defer stmt.Close() - - _, err = stmt.Exec(depID) - if err != nil { - log.Errorf("delete from edgex_deployment %s failed: %v", depID, err) - return err - } - - log.Debugf("deleteDeployment %s succeeded", depID) - return err -} - -// getReadyDeployments() returns array of deployments that are ready to deploy -func getReadyDeployments() (deployments []DataDeployment, err error) { - return getDeployments("WHERE local_bundle_uri != $1", "") -} - -// getUnreadyDeployments() returns array of deployments that are not yet ready to deploy +// getUnreadyDeployments() returns array of resources that are not yet to be processed func getUnreadyDeployments() (deployments []DataDeployment, err error) { - return getDeployments("WHERE local_bundle_uri = $1", "") -} -// getDeployments() accepts a "WHERE ..." clause and optional parameters and returns the list of deployments -func getDeployments(where string, a ...interface{}) (deployments []DataDeployment, err error) { + err = nil db := getDB() - var stmt *sql.Stmt - stmt, err = db.Prepare(` - SELECT id, bundle_config_id, apid_cluster_id, data_scope_id, - bundle_config_json, config_json, created, created_by, - updated, updated_by, bundle_config_name, bundle_uri, - local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status, - deploy_error_code, deploy_error_message - FROM edgex_deployment - ` + where) + rows, err := db.Query(` + SELECT id, org_id, env_id, name, revision, project_runtime_blob_metadata.blob_id, resource_blob_id + FROM project_runtime_blob_metadata + LEFT JOIN edgex_blob_available + ON project_runtime_blob_metadata.blob_id = edgex_blob_available.blob_id + WHERE edgex_blob_available.blob_id IS NULL; + `) + if err != nil { - return - } - defer stmt.Close() - var rows *sql.Rows - rows, err = stmt.Query(a...) - if err != nil { - if err == sql.ErrNoRows { - return - } - log.Errorf("Error querying edgex_deployment: %v", err) + log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) return } defer rows.Close() - deployments = dataDeploymentsFromRows(rows) - - return -} - -func dataDeploymentsFromRows(rows *sql.Rows) (deployments []DataDeployment) { for rows.Next() { dep := DataDeployment{} - rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID, - &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Created, &dep.CreatedBy, - &dep.Updated, &dep.UpdatedBy, &dep.BundleName, &dep.BundleURI, - &dep.LocalBundleURI, &dep.BundleChecksum, &dep.BundleChecksumType, &dep.DeployStatus, - &dep.DeployErrorCode, &dep.DeployErrorMessage, - ) + rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID, + &dep.BlobResourceID) deployments = append(deployments, dep) + log.Debugf("New configurations to be processed Id {%s}, blobId {%s}", dep.ID, dep.BlobID) + } + if len(deployments) == 0 { + log.Debug("No new resources found to be processed") + err = sql.ErrNoRows } return + } +// getDeployments() +func getReadyDeployments() (deployments []DataDeployment, err error) { + err = nil + db := getDB() -func updateLocalBundleURI(depID, localBundleUri string) error { - - stmt, err := getDB().Prepare("UPDATE edgex_deployment SET local_bundle_uri=$1 WHERE id=$2;") - if err != nil { - log.Errorf("prepare updateLocalBundleURI failed: %v", err) - return err - } - defer stmt.Close() - - _, err = stmt.Exec(localBundleUri, depID) - if err != nil { - log.Errorf("update edgex_deployment %s localBundleUri to %s failed: %v", depID, localBundleUri, err) - return err - } - - log.Debugf("update edgex_deployment %s localBundleUri to %s succeeded", depID, localBundleUri) - - return nil -} - -func InsertTestDeployment(tx *sql.Tx, dep DataDeployment) error { - - stmt, err := tx.Prepare(` - INSERT INTO edgex_deployment - (id, bundle_config_id, apid_cluster_id, data_scope_id, - bundle_config_json, config_json, created, created_by, - updated, updated_by, bundle_config_name) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11); + rows, err := db.Query(` + SELECT a.id, a.org_id, a.env_id, a.name, a.revision, a.blob_id, + a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by, + b.local_fs_location, b.access_url + FROM project_runtime_blob_metadata as a + INNER JOIN edgex_blob_available as b + ON a.blob_id = b.blob_id `) + if err != nil { - log.Errorf("prepare insert into edgex_deployment failed: %v", err) + log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) + return + } + defer rows.Close() + + for rows.Next() { + dep := DataDeployment{} + rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID, + &dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated, + &dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL) + deployments = append(deployments, dep) + log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID) + } + if len(deployments) == 0 { + log.Debug("No resources ready to be deployed") + err = sql.ErrNoRows + } + return + +} + +func updatelocal_fs_location(depID, local_fs_location string) error { + + // TODO DEMO ONLY + access_url := "http://localhost:9090/blob/" + depID + stmt, err := getDB().Prepare(` + INSERT INTO edgex_blob_available (blob_id, local_fs_location, access_url) + VALUES (?, ?, ?)`) + if err != nil { + log.Errorf("PREPARE updatelocal_fs_location failed: %v", err) return err } defer stmt.Close() - log.Debugf("InsertTestDeployment: %s", dep.ID) - - _, err = stmt.Exec( - dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID, - dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy, - dep.Updated, dep.UpdatedBy, dep.BundleName) + _, err = stmt.Exec(depID, local_fs_location, access_url) if err != nil { - log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err) + log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, local_fs_location, err) return err } - log.Debug("InsertTestDeployment edgex_deployment succeeded") - return err + log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, local_fs_location) + return nil + +} + +func getLocalFSLocation (blobId string) (locfs string , err error) { + + db := getDB() + + rows, err := db.Query("SELECT local_fs_location FROM edgex_blob_available WHERE blob_id = " + blobId) + if err != nil { + log.Errorf("SELECT local_fs_location failed %v", err) + return "", err + } + + defer rows.Close() + rows.Scan(&locfs) + return }
diff --git a/glide.yaml b/glide.yaml deleted file mode 100644 index c36bfa1..0000000 --- a/glide.yaml +++ /dev/null
@@ -1,11 +0,0 @@ -package: github.com/30x/apidGatewayDeploy -import: -- package: github.com/30x/apid-core - version: master -- package: github.com/apigee-labs/transicator/common - version: master -testImport: -- package: github.com/onsi/ginkgo - version: master -- package: github.com/onsi/gomega - version: master
diff --git a/init.go b/init.go index 98b9346..96b1bde 100644 --- a/init.go +++ b/init.go
@@ -21,16 +21,19 @@ configApidClusterID = "apigeesync_cluster_id" configConcurrentDownloads = "apigeesync_concurrent_downloads" configDownloadQueueSize = "apigeesync_download_queue_size" + configBlobServerBaseURI = "apigeesync_blob_server_base" ) var ( services apid.Services log apid.LogService data apid.DataService + config apid.ConfigService bundlePath string debounceDuration time.Duration bundleCleanupDelay time.Duration apiServerBaseURI *url.URL + blobServerURL string apidInstanceID string apidClusterID string downloadQueueSize int @@ -46,11 +49,16 @@ log = services.Log().ForModule("apiGatewayDeploy") log.Debug("start init") - config := services.Config() + config = services.Config() if !config.IsSet(configApiServerBaseURI) { return pluginData, fmt.Errorf("Missing required config value: %s", configApiServerBaseURI) } + + if !config.IsSet(configBlobServerBaseURI) { + return pluginData, fmt.Errorf("Missing required config value: %s", configBlobServerBaseURI) + } + var err error apiServerBaseURI, err = url.Parse(config.GetString(configApiServerBaseURI)) if err != nil { @@ -96,7 +104,7 @@ } data = services.Data() - + blobServerURL = config.GetString(configBlobServerBaseURI) concurrentDownloads = config.GetInt(configConcurrentDownloads) downloadQueueSize = config.GetInt(configDownloadQueueSize) relativeBundlePath := config.GetString(configBundleDirKey)
diff --git a/listener.go b/listener.go index e9cd113..95d20f4 100644 --- a/listener.go +++ b/listener.go
@@ -1,12 +1,12 @@ package apiGatewayDeploy import ( - "encoding/json" "os" "time" "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" + "database/sql" ) const ( @@ -52,35 +52,12 @@ log.Panicf("Unable to access database: %v", err) } - // alter table - err = alterTable(db) - if err != nil { - log.Panicf("Alter table failed: %v", err) - } - // ensure that no new database updates are made on old database + // Update the DB pointer dbMux.Lock() SetDB(db) dbMux.Unlock() - // update deployments - deps, err := getDeploymentsToUpdate(db) - if err != nil { - log.Panicf("Unable to getDeploymentsToUpdate: %v", err) - } - tx, err := db.Begin() - if err != nil { - log.Panicf("Error starting transaction: %v", err) - } - defer tx.Rollback() - err = updateDeploymentsColumns(tx, deps) - if err != nil { - log.Panicf("updateDeploymentsColumns failed: %v", err) - } - err = tx.Commit() - if err != nil { - log.Panicf("Error committing Snapshot update: %v", err) - } - + InitDB(db) startupOnExistingDatabase() log.Debug("Snapshot processed") } @@ -89,7 +66,8 @@ // start bundle downloads that didn't finish go func() { deployments, err := getUnreadyDeployments() - if err != nil { + + if err != nil && err != sql.ErrNoRows { log.Panicf("unable to query database for unready deployments: %v", err) } log.Debugf("Queuing %d deployments for bundle download", len(deployments)) @@ -156,27 +134,18 @@ func dataDeploymentFromRow(row common.Row) (d DataDeployment, err error) { row.Get("id", &d.ID) - row.Get("bundle_config_id", &d.BundleConfigID) - row.Get("apid_cluster_id", &d.ApidClusterID) + row.Get("org_id", &d.OrgID) + row.Get("env_id", &d.EnvID) row.Get("data_scope_id", &d.DataScopeID) - row.Get("bundle_config_json", &d.BundleConfigJSON) - row.Get("config_json", &d.ConfigJSON) - row.Get("created", &d.Created) - row.Get("created_by", &d.CreatedBy) - row.Get("updated", &d.Updated) - row.Get("updated_by", &d.UpdatedBy) - - var bc bundleConfigJson - json.Unmarshal([]byte(d.BundleConfigJSON), &bc) - if err != nil { - log.Errorf("JSON decoding Manifest failed: %v", err) - return - } - - d.BundleName = bc.Name - d.BundleURI = bc.URI - d.BundleChecksumType = bc.ChecksumType - d.BundleChecksum = bc.Checksum + row.Get("bundle_config_json", &d.Type) + row.Get("config_json", &d.Name) + row.Get("created", &d.Revision) + row.Get("created_by", &d.BlobID) + row.Get("created_by", &d.BlobResourceID) + row.Get("created_by", &d.Updated) + row.Get("created_by", &d.UpdatedBy) + row.Get("created_by", &d.Created) + row.Get("updated", &d.CreatedBy) return }
diff --git a/listener_test.go b/listener_test.go deleted file mode 100644 index c253eb3..0000000 --- a/listener_test.go +++ /dev/null
@@ -1,343 +0,0 @@ -package apiGatewayDeploy - -import ( - "encoding/json" - "net/url" - - "net/http/httptest" - - "net/http" - - "fmt" - "github.com/30x/apid-core" - "github.com/apigee-labs/transicator/common" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("listener", func() { - - Context("ApigeeSync snapshot event", func() { - - /* - * Note that the test snapshot should not be empty. - * If it's empty, you can't use deploymentsResult chan to mark the end of processing, - * so the threads generated by startupOnExistingDatabase() will mess up later tests - */ - It("should set DB to appropriate version", func(done Done) { - saveDB := getDB() - deploymentID := "set_version_test" - snapshot, dep := createSnapshotDeployment(deploymentID, "test_version") - - db, err := data.DBVersion(snapshot.SnapshotInfo) - Expect(err).ShouldNot(HaveOccurred()) - - err = InitDB(db) - Expect(err).ShouldNot(HaveOccurred()) - - insertDeploymentToDb(dep, db) - expectedDB, err := data.DBVersion(snapshot.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) - - var listener = make(chan deploymentsResult) - addSubscriber <- listener - - apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) - - result := <-listener - Expect(result.err).ShouldNot(HaveOccurred()) - - // DB should have been set - Expect(getDB() == expectedDB).Should(BeTrue()) - - SetDB(saveDB) - close(done) - }) - - It("should process unready on existing db startup event", func(done Done) { - - saveDB := getDB() - - deploymentID := "startup_test" - - snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready") - - db, err := data.DBVersion(snapshot.SnapshotInfo) - Expect(err).ShouldNot(HaveOccurred()) - - err = InitDB(db) - Expect(err).ShouldNot(HaveOccurred()) - - insertDeploymentToDb(dep, db) - - var listener = make(chan deploymentsResult) - addSubscriber <- listener - - apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) - - result := <-listener - Expect(result.err).ShouldNot(HaveOccurred()) - - Expect(len(result.deployments)).To(Equal(1)) - d := result.deployments[0] - - Expect(d.ID).To(Equal(deploymentID)) - - SetDB(saveDB) - close(done) - }) - - It("should send deployment statuses on existing db startup event", func(done Done) { - - saveDB := getDB() - - successDep := DataDeployment{ - ID: "success", - LocalBundleURI: "x", - DeployStatus: RESPONSE_STATUS_SUCCESS, - DeployErrorCode: 1, - DeployErrorMessage: "message", - } - - failDep := DataDeployment{ - ID: "fail", - LocalBundleURI: "x", - DeployStatus: RESPONSE_STATUS_FAIL, - DeployErrorCode: 1, - DeployErrorMessage: "message", - } - - blankDep := DataDeployment{ - ID: "blank", - LocalBundleURI: "x", - } - - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer GinkgoRecover() - - var results apiDeploymentResults - err := json.NewDecoder(r.Body).Decode(&results) - Expect(err).ToNot(HaveOccurred()) - - Expect(results).To(HaveLen(2)) - - Expect(results).To(ContainElement(apiDeploymentResult{ - ID: successDep.ID, - Status: successDep.DeployStatus, - ErrorCode: successDep.DeployErrorCode, - Message: successDep.DeployErrorMessage, - })) - Expect(results).To(ContainElement(apiDeploymentResult{ - ID: failDep.ID, - Status: failDep.DeployStatus, - ErrorCode: failDep.DeployErrorCode, - Message: failDep.DeployErrorMessage, - })) - - SetDB(saveDB) - close(done) - })) - - var err error - apiServerBaseURI, err = url.Parse(ts.URL) - Expect(err).NotTo(HaveOccurred()) - - // init without info == startup on existing DB - var snapshot = common.Snapshot{ - SnapshotInfo: "test", - Tables: []common.Table{}, - } - - db, err := data.DBVersion(snapshot.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) - - err = InitDBFullColumns(db) - Expect(err).NotTo(HaveOccurred()) - - tx, err := db.Begin() - Expect(err).ShouldNot(HaveOccurred()) - - err = InsertDeployment(tx, successDep) - Expect(err).ShouldNot(HaveOccurred()) - err = InsertDeployment(tx, failDep) - Expect(err).ShouldNot(HaveOccurred()) - err = InsertDeployment(tx, blankDep) - Expect(err).ShouldNot(HaveOccurred()) - - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) - - apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) - }) - }) - - Context("ApigeeSync change event", func() { - - It("inserting event should deliver the deployment to subscribers", func(done Done) { - - deploymentID := "add_test_1" - - event, dep := createChangeDeployment(deploymentID) - - // insert full deployment columns - tx, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) - - var listener = make(chan deploymentsResult) - addSubscriber <- listener - - apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - - // wait for event to propagate - result := <-listener - Expect(result.err).ShouldNot(HaveOccurred()) - - deployments, err := getReadyDeployments() - Expect(err).ShouldNot(HaveOccurred()) - - Expect(len(deployments)).To(Equal(1)) - d := deployments[0] - - Expect(d.ID).To(Equal(deploymentID)) - Expect(d.BundleName).To(Equal(dep.BundleName)) - Expect(d.BundleURI).To(Equal(dep.BundleURI)) - - close(done) - }) - - It("delete event should deliver to subscribers", func(done Done) { - - deploymentID := "delete_test_1" - - // insert deployment - event, dep := createChangeDeployment(deploymentID) - - // insert full deployment columns - tx, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) - - listener := make(chan deploymentsResult) - addSubscriber <- listener - apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - // wait for event to propagate - result := <-listener - Expect(result.err).ShouldNot(HaveOccurred()) - - // delete deployment - deletDeploymentFromDb(dep, getDB()) - row := common.Row{} - row["id"] = &common.ColumnVal{Value: deploymentID} - event = common.ChangeList{ - Changes: []common.Change{ - { - Operation: common.Delete, - Table: DEPLOYMENT_TABLE, - OldRow: row, - }, - }, - } - - listener = make(chan deploymentsResult) - addSubscriber <- listener - apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - result = <-listener - Expect(result.err).ShouldNot(HaveOccurred()) - Expect(len(result.deployments)).To(Equal(0)) - close(done) - }) - }) -}) - -func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) { - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - bundle1Json, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - - row := common.Row{} - row["id"] = &common.ColumnVal{Value: deploymentID} - row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} - - changeList := common.ChangeList{ - Changes: []common.Change{ - { - Operation: common.Insert, - Table: DEPLOYMENT_TABLE, - NewRow: row, - }, - }, - } - dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow) - return changeList, dep -} - -func insertDeploymentToDb(dep DataDeployment, db apid.DB) { - tx, err := db.Begin() - Expect(err).ShouldNot(HaveOccurred()) - defer tx.Rollback() - err = InsertTestDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) -} - -func deletDeploymentFromDb(dep DataDeployment, db apid.DB) { - tx, err := db.Begin() - Expect(err).ShouldNot(HaveOccurred()) - defer tx.Rollback() - err = deleteDeployment(tx, dep.ID) - Expect(err).ShouldNot(HaveOccurred()) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) -} - -func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) { - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - - jsonBytes, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - fmt.Println("JSON :" + string(jsonBytes)) - - dep := DataDeployment{ - ID: deploymentID, - DataScopeID: deploymentID, - BundleURI: bundle.URI, - BundleChecksum: bundle.Checksum, - BundleChecksumType: bundle.ChecksumType, - BundleConfigJSON: string(jsonBytes), - } - - // init without info == startup on existing DB - var snapshot = common.Snapshot{ - SnapshotInfo: snapInfo, - Tables: []common.Table{}, - } - return snapshot, dep -}