add tests for bundle.go, fix bugs
diff --git a/api.go b/api.go index 9275733..5d4f748 100644 --- a/api.go +++ b/api.go
@@ -221,7 +221,7 @@ if err != nil { a.writeInternalError(w, err.Error()) } - w.Header().Set("Content-type", headerSteam) + w.Header().Set("Content-Type", headerSteam) }
diff --git a/api_test.go b/api_test.go index a7323b5..66e790d 100644 --- a/api_test.go +++ b/api_test.go
@@ -24,12 +24,13 @@ . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" mathrand "math/rand" + "os" "strconv" "time" ) const ( - testUrl = "http://127.0.0.1:9000" + apiTestUrl = "http://127.0.0.1:9000" testBlobId = "gcs:SHA-512:39ca7ae89bb9468af34df8bc873748b4035210c91bcc01359c092c1d51364b5f3df06bc69a40621acfaa46791af9ea41bc0f3429a84738ba1a7c8d394859601a" ) @@ -60,7 +61,7 @@ Context("GET /configurations", func() { It("should get empty set if no deployments", func() { // setup http client - uri, err := url.Parse(testUrl) + uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) @@ -80,13 +81,13 @@ // verify response Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0)) Expect(depRes.Kind).Should(Equal(kindCollection)) - Expect(depRes.Self).Should(Equal(testUrl + deploymentsEndpoint + strconv.Itoa(testCount))) + Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount))) }) It("should get correct config format", func() { // setup http client - uri, err := url.Parse(testUrl) + uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) @@ -116,7 +117,7 @@ It("should get 304 for no change", func() { // setup http client - uri, err := url.Parse(testUrl) + uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) @@ -149,7 +150,7 @@ start := time.Now() // setup http client - uri, err := url.Parse(testUrl) + uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) query := uri.Query() @@ -177,7 +178,7 @@ // verify response Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0)) Expect(depRes.Kind).Should(Equal(kindCollection)) - Expect(depRes.Self).Should(Equal(testUrl + deploymentsEndpoint + strconv.Itoa(testCount))) + Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount))) }, 2) @@ -186,7 +187,7 @@ isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T23:23:38.162Z", "2017-06-22T16:41:02.334Z"} // setup http client - uri, err := url.Parse(testUrl) + uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) @@ -246,7 +247,7 @@ Context("GET /blobs", func() { It("should get file bytesfrom endpoint", func() { // setup http client - uri, err := url.Parse(testUrl) + uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) uri.Path = blobEndpointPath + strconv.Itoa(testCount) + "/test" @@ -336,6 +337,7 @@ unreadyBlobIds []string readyDeployments []DataDeployment localFSLocation string + fileResponse chan string } func (d *dummyDbManager) setDbVersion(version string) { @@ -354,7 +356,17 @@ return d.readyDeployments, nil } -func (d *dummyDbManager) updateLocalFsLocation(string, string) error { +func (d *dummyDbManager) updateLocalFsLocation(blobId, localFsLocation string) error { + file, err := os.Open(localFsLocation) + if err != nil { + return err + } + buff := make([]byte, 36) + _, err = file.Read(buff) + if err != nil { + return err + } + d.fileResponse <- string(buff) return nil }
diff --git a/bundle.go b/bundle.go index a35bdd5..62478fa 100644 --- a/bundle.go +++ b/bundle.go
@@ -28,7 +28,7 @@ ) const ( - BLOBSTORE_URI = "/v1/blobs/{BLOB_ID}/signedurl" + blobStoreUri = "/v1/blobs/{blobId}/signedurl" ) type bundleManagerInterface interface { @@ -41,6 +41,7 @@ } type bundleManager struct { + blobServerUrl string dbMan dbManagerInterface apiMan apiManagerInterface concurrentDownloads int @@ -87,11 +88,12 @@ maxBackOff := 5 * time.Minute return &DownloadRequest{ - bm: bm, - blobId: id, - backoffFunc: createBackoff(retryIn, maxBackOff), - markFailedAt: markFailedAt, - connTimeout: bm.bundleDownloadConnTimeout, + blobServerURL: bm.blobServerUrl, + bm: bm, + blobId: id, + backoffFunc: createBackoff(retryIn, maxBackOff), + markFailedAt: markFailedAt, + connTimeout: bm.bundleDownloadConnTimeout, } } @@ -134,11 +136,12 @@ } type DownloadRequest struct { - bm *bundleManager - blobId string - backoffFunc func() - markFailedAt time.Time - connTimeout time.Duration + bm *bundleManager + blobId string + backoffFunc func() + markFailedAt time.Time + connTimeout time.Duration + blobServerURL string } func (r *DownloadRequest) downloadBundle() error { @@ -147,7 +150,7 @@ r.checkTimeout() - downloadedFile, err := downloadFromURI(r.blobId, r.connTimeout) + downloadedFile, err := downloadFromURI(r.blobServerURL, r.blobId, r.connTimeout) if err != nil { log.Errorf("Unable to download blob file blobId=%s err:%v", r.blobId, err) @@ -183,14 +186,14 @@ return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId))) } -func getSignedURL(blobId string, bundleDownloadConnTimeout time.Duration) (string, error) { +func getSignedURL(blobServerURL string, blobId string, bundleDownloadConnTimeout time.Duration) (string, error) { blobUri, err := url.Parse(blobServerURL) if err != nil { log.Panicf("bad url value for config %s: %s", blobUri, err) } - blobUri.Path += strings.Replace(BLOBSTORE_URI, "{BLOB_ID}", blobId, 1) + blobUri.Path += strings.Replace(blobStoreUri, "{blobId}", blobId, 1) parameters := url.Values{} parameters.Add("action", "GET") parameters.Add("key", blobId) @@ -214,12 +217,12 @@ // downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally // after downloading the resource from GCS (via the signed URL) -func downloadFromURI(blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) { +func downloadFromURI(blobServerURL string, blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) { var tempFile *os.File log.Debugf("Downloading bundle: %s", blobId) - uri, err := getSignedURL(blobId, bundleDownloadConnTimeout) + uri, err := getSignedURL(blobServerURL, blobId, bundleDownloadConnTimeout) if err != nil { log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err) return
diff --git a/bundle_test.go b/bundle_test.go index 192ed40..2528d71 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -1 +1,131 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package apiGatewayConfDeploy + +import ( + "net/http" + + "bytes" + "github.com/gorilla/mux" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "io" + "strings" + "time" +) + +const ( + bundleTestUrl = "http://127.0.0.1:9000" + dummySignedEndpoint = "/dummyblob/{blobId}" +) + +var _ = Describe("api", func() { + var testCount int + var testBundleMan *bundleManager + var dummyDbMan *dummyDbManager + var dummyApiMan *dummyApiManager + var blobServer *dummyBlobServer + + var _ = BeforeEach(func() { + testCount += 1 + concurrentDownloads := 5 + downloadQueueSize := 5 + + // init test blob server + if blobServer == nil { + blobServer = &dummyBlobServer{ + serverEndpoint: blobStoreUri, + signedEndpoint: dummySignedEndpoint, + } + blobServer.start() + } + + // init dummy db manager + dummyDbMan = &dummyDbManager{ + fileResponse: make(chan string), + } + + // init dummy api manager + dummyApiMan = &dummyApiManager{} + + // init bundle manager + testBundleMan = &bundleManager{ + blobServerUrl: bundleTestUrl, + dbMan: dummyDbMan, + apiMan: dummyApiMan, + concurrentDownloads: concurrentDownloads, + markDeploymentFailedAfter: 5 * time.Second, + bundleDownloadConnTimeout: 5 * time.Second, + bundleRetryDelay: time.Second, + bundleCleanupDelay: 5 * time.Second, + downloadQueue: make(chan *DownloadRequest, downloadQueueSize), + isClosed: new(int32), + } + testBundleMan.initializeBundleDownloading() + time.Sleep(100 * time.Millisecond) + }) + + var _ = AfterEach(func() { + testBundleMan.Close() + testBundleMan = nil + dummyDbMan = nil + dummyApiMan = nil + }) + + It("should download blob according to id", func() { + id := GenerateUUID() + testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id)) + received := <-dummyDbMan.fileResponse + Expect(received).Should(Equal(id)) + }) +}) + +type dummyApiManager struct { +} + +func (a *dummyApiManager) InitAPI() { +} + +type dummyBlobServer struct { + serverEndpoint string + signedEndpoint string +} + +func (b *dummyBlobServer) start() { + services.API().HandleFunc(b.serverEndpoint, b.returnSigned).Methods("GET") + services.API().HandleFunc(b.signedEndpoint, b.returnBlob).Methods("GET") +} + +// send a dummy uri as response +func (b *dummyBlobServer) returnSigned(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + blobId := vars["blobId"] + + uriString := strings.Replace(bundleTestUrl+b.signedEndpoint, "{blobId}", blobId, 1) + log.Debug("dummyBlobServer returnSigned: " + uriString) + _, err := io.Copy(w, bytes.NewReader([]byte(uriString))) + Expect(err).Should(Succeed()) + w.Header().Set("Content-Type", headerSteam) +} + +// send blobId back as response +func (b *dummyBlobServer) returnBlob(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + blobId := vars["blobId"] + log.Debug("dummyBlobServer returnBlob id=" + blobId) + _, err := io.Copy(w, bytes.NewReader([]byte(blobId))) + Expect(err).Should(Succeed()) + w.Header().Set("Content-Type", headerSteam) +}
diff --git a/data.go b/data.go index 81affe0..80beedd 100644 --- a/data.go +++ b/data.go
@@ -137,7 +137,7 @@ return } -// getDeployments() +// TODO there's a bug in the db statement func (dbc *dbManager) getReadyDeployments() ([]DataDeployment, error) { rows, err := dbc.getDb().Query(`SELECT
diff --git a/init.go b/init.go index fd12337..d70963d 100644 --- a/init.go +++ b/init.go
@@ -48,7 +48,6 @@ bundlePath string debounceDuration time.Duration apiServerBaseURI *url.URL - blobServerURL string apidInstanceID string apidClusterID string ) @@ -139,7 +138,7 @@ // initialize bundle manager - blobServerURL = config.GetString(configBlobServerBaseURI) + blobServerURL := config.GetString(configBlobServerBaseURI) relativeBundlePath := config.GetString(configBundleDirKey) storagePath := config.GetString(configStoragePath) bundlePath = path.Join(storagePath, relativeBundlePath) @@ -150,6 +149,7 @@ concurrentDownloads := config.GetInt(configConcurrentDownloads) downloadQueueSize := config.GetInt(configDownloadQueueSize) bundleMan := &bundleManager{ + blobServerUrl: blobServerURL, dbMan: dbMan, apiMan: apiMan, concurrentDownloads: concurrentDownloads,