add tests for bundle.go, fix bugs
diff --git a/api.go b/api.go
index 9275733..5d4f748 100644
--- a/api.go
+++ b/api.go
@@ -221,7 +221,7 @@
if err != nil {
a.writeInternalError(w, err.Error())
}
- w.Header().Set("Content-type", headerSteam)
+ w.Header().Set("Content-Type", headerSteam)
}
diff --git a/api_test.go b/api_test.go
index a7323b5..66e790d 100644
--- a/api_test.go
+++ b/api_test.go
@@ -24,12 +24,13 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
mathrand "math/rand"
+ "os"
"strconv"
"time"
)
const (
- testUrl = "http://127.0.0.1:9000"
+ apiTestUrl = "http://127.0.0.1:9000"
testBlobId = "gcs:SHA-512:39ca7ae89bb9468af34df8bc873748b4035210c91bcc01359c092c1d51364b5f3df06bc69a40621acfaa46791af9ea41bc0f3429a84738ba1a7c8d394859601a"
)
@@ -60,7 +61,7 @@
Context("GET /configurations", func() {
It("should get empty set if no deployments", func() {
// setup http client
- uri, err := url.Parse(testUrl)
+ uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
@@ -80,13 +81,13 @@
// verify response
Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0))
Expect(depRes.Kind).Should(Equal(kindCollection))
- Expect(depRes.Self).Should(Equal(testUrl + deploymentsEndpoint + strconv.Itoa(testCount)))
+ Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount)))
})
It("should get correct config format", func() {
// setup http client
- uri, err := url.Parse(testUrl)
+ uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
@@ -116,7 +117,7 @@
It("should get 304 for no change", func() {
// setup http client
- uri, err := url.Parse(testUrl)
+ uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
@@ -149,7 +150,7 @@
start := time.Now()
// setup http client
- uri, err := url.Parse(testUrl)
+ uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
query := uri.Query()
@@ -177,7 +178,7 @@
// verify response
Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0))
Expect(depRes.Kind).Should(Equal(kindCollection))
- Expect(depRes.Self).Should(Equal(testUrl + deploymentsEndpoint + strconv.Itoa(testCount)))
+ Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount)))
}, 2)
@@ -186,7 +187,7 @@
isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T23:23:38.162Z", "2017-06-22T16:41:02.334Z"}
// setup http client
- uri, err := url.Parse(testUrl)
+ uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
@@ -246,7 +247,7 @@
Context("GET /blobs", func() {
It("should get file bytesfrom endpoint", func() {
// setup http client
- uri, err := url.Parse(testUrl)
+ uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
uri.Path = blobEndpointPath + strconv.Itoa(testCount) + "/test"
@@ -336,6 +337,7 @@
unreadyBlobIds []string
readyDeployments []DataDeployment
localFSLocation string
+ fileResponse chan string
}
func (d *dummyDbManager) setDbVersion(version string) {
@@ -354,7 +356,17 @@
return d.readyDeployments, nil
}
-func (d *dummyDbManager) updateLocalFsLocation(string, string) error {
+func (d *dummyDbManager) updateLocalFsLocation(blobId, localFsLocation string) error {
+ file, err := os.Open(localFsLocation)
+ if err != nil {
+ return err
+ }
+ buff := make([]byte, 36)
+ _, err = file.Read(buff)
+ if err != nil {
+ return err
+ }
+ d.fileResponse <- string(buff)
return nil
}
diff --git a/bundle.go b/bundle.go
index a35bdd5..62478fa 100644
--- a/bundle.go
+++ b/bundle.go
@@ -28,7 +28,7 @@
)
const (
- BLOBSTORE_URI = "/v1/blobs/{BLOB_ID}/signedurl"
+ blobStoreUri = "/v1/blobs/{blobId}/signedurl"
)
type bundleManagerInterface interface {
@@ -41,6 +41,7 @@
}
type bundleManager struct {
+ blobServerUrl string
dbMan dbManagerInterface
apiMan apiManagerInterface
concurrentDownloads int
@@ -87,11 +88,12 @@
maxBackOff := 5 * time.Minute
return &DownloadRequest{
- bm: bm,
- blobId: id,
- backoffFunc: createBackoff(retryIn, maxBackOff),
- markFailedAt: markFailedAt,
- connTimeout: bm.bundleDownloadConnTimeout,
+ blobServerURL: bm.blobServerUrl,
+ bm: bm,
+ blobId: id,
+ backoffFunc: createBackoff(retryIn, maxBackOff),
+ markFailedAt: markFailedAt,
+ connTimeout: bm.bundleDownloadConnTimeout,
}
}
@@ -134,11 +136,12 @@
}
type DownloadRequest struct {
- bm *bundleManager
- blobId string
- backoffFunc func()
- markFailedAt time.Time
- connTimeout time.Duration
+ bm *bundleManager
+ blobId string
+ backoffFunc func()
+ markFailedAt time.Time
+ connTimeout time.Duration
+ blobServerURL string
}
func (r *DownloadRequest) downloadBundle() error {
@@ -147,7 +150,7 @@
r.checkTimeout()
- downloadedFile, err := downloadFromURI(r.blobId, r.connTimeout)
+ downloadedFile, err := downloadFromURI(r.blobServerURL, r.blobId, r.connTimeout)
if err != nil {
log.Errorf("Unable to download blob file blobId=%s err:%v", r.blobId, err)
@@ -183,14 +186,14 @@
return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId)))
}
-func getSignedURL(blobId string, bundleDownloadConnTimeout time.Duration) (string, error) {
+func getSignedURL(blobServerURL string, blobId string, bundleDownloadConnTimeout time.Duration) (string, error) {
blobUri, err := url.Parse(blobServerURL)
if err != nil {
log.Panicf("bad url value for config %s: %s", blobUri, err)
}
- blobUri.Path += strings.Replace(BLOBSTORE_URI, "{BLOB_ID}", blobId, 1)
+ blobUri.Path += strings.Replace(blobStoreUri, "{blobId}", blobId, 1)
parameters := url.Values{}
parameters.Add("action", "GET")
parameters.Add("key", blobId)
@@ -214,12 +217,12 @@
// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally
// after downloading the resource from GCS (via the signed URL)
-func downloadFromURI(blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) {
+func downloadFromURI(blobServerURL string, blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) {
var tempFile *os.File
log.Debugf("Downloading bundle: %s", blobId)
- uri, err := getSignedURL(blobId, bundleDownloadConnTimeout)
+ uri, err := getSignedURL(blobServerURL, blobId, bundleDownloadConnTimeout)
if err != nil {
log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err)
return
diff --git a/bundle_test.go b/bundle_test.go
index 192ed40..2528d71 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -1 +1,131 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package apiGatewayConfDeploy
+
+import (
+ "net/http"
+
+ "bytes"
+ "github.com/gorilla/mux"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "io"
+ "strings"
+ "time"
+)
+
+const (
+ bundleTestUrl = "http://127.0.0.1:9000"
+ dummySignedEndpoint = "/dummyblob/{blobId}"
+)
+
+var _ = Describe("api", func() {
+ var testCount int
+ var testBundleMan *bundleManager
+ var dummyDbMan *dummyDbManager
+ var dummyApiMan *dummyApiManager
+ var blobServer *dummyBlobServer
+
+ var _ = BeforeEach(func() {
+ testCount += 1
+ concurrentDownloads := 5
+ downloadQueueSize := 5
+
+ // init test blob server
+ if blobServer == nil {
+ blobServer = &dummyBlobServer{
+ serverEndpoint: blobStoreUri,
+ signedEndpoint: dummySignedEndpoint,
+ }
+ blobServer.start()
+ }
+
+ // init dummy db manager
+ dummyDbMan = &dummyDbManager{
+ fileResponse: make(chan string),
+ }
+
+ // init dummy api manager
+ dummyApiMan = &dummyApiManager{}
+
+ // init bundle manager
+ testBundleMan = &bundleManager{
+ blobServerUrl: bundleTestUrl,
+ dbMan: dummyDbMan,
+ apiMan: dummyApiMan,
+ concurrentDownloads: concurrentDownloads,
+ markDeploymentFailedAfter: 5 * time.Second,
+ bundleDownloadConnTimeout: 5 * time.Second,
+ bundleRetryDelay: time.Second,
+ bundleCleanupDelay: 5 * time.Second,
+ downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
+ isClosed: new(int32),
+ }
+ testBundleMan.initializeBundleDownloading()
+ time.Sleep(100 * time.Millisecond)
+ })
+
+ var _ = AfterEach(func() {
+ testBundleMan.Close()
+ testBundleMan = nil
+ dummyDbMan = nil
+ dummyApiMan = nil
+ })
+
+ It("should download blob according to id", func() {
+ id := GenerateUUID()
+ testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id))
+ received := <-dummyDbMan.fileResponse
+ Expect(received).Should(Equal(id))
+ })
+})
+
+type dummyApiManager struct {
+}
+
+func (a *dummyApiManager) InitAPI() {
+}
+
+type dummyBlobServer struct {
+ serverEndpoint string
+ signedEndpoint string
+}
+
+func (b *dummyBlobServer) start() {
+ services.API().HandleFunc(b.serverEndpoint, b.returnSigned).Methods("GET")
+ services.API().HandleFunc(b.signedEndpoint, b.returnBlob).Methods("GET")
+}
+
+// send a dummy uri as response
+func (b *dummyBlobServer) returnSigned(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ blobId := vars["blobId"]
+
+ uriString := strings.Replace(bundleTestUrl+b.signedEndpoint, "{blobId}", blobId, 1)
+ log.Debug("dummyBlobServer returnSigned: " + uriString)
+ _, err := io.Copy(w, bytes.NewReader([]byte(uriString)))
+ Expect(err).Should(Succeed())
+ w.Header().Set("Content-Type", headerSteam)
+}
+
+// send blobId back as response
+func (b *dummyBlobServer) returnBlob(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ blobId := vars["blobId"]
+ log.Debug("dummyBlobServer returnBlob id=" + blobId)
+ _, err := io.Copy(w, bytes.NewReader([]byte(blobId)))
+ Expect(err).Should(Succeed())
+ w.Header().Set("Content-Type", headerSteam)
+}
diff --git a/data.go b/data.go
index 81affe0..80beedd 100644
--- a/data.go
+++ b/data.go
@@ -137,7 +137,7 @@
return
}
-// getDeployments()
+// TODO there's a bug in the db statement
func (dbc *dbManager) getReadyDeployments() ([]DataDeployment, error) {
rows, err := dbc.getDb().Query(`SELECT
diff --git a/init.go b/init.go
index fd12337..d70963d 100644
--- a/init.go
+++ b/init.go
@@ -48,7 +48,6 @@
bundlePath string
debounceDuration time.Duration
apiServerBaseURI *url.URL
- blobServerURL string
apidInstanceID string
apidClusterID string
)
@@ -139,7 +138,7 @@
// initialize bundle manager
- blobServerURL = config.GetString(configBlobServerBaseURI)
+ blobServerURL := config.GetString(configBlobServerBaseURI)
relativeBundlePath := config.GetString(configBundleDirKey)
storagePath := config.GetString(configStoragePath)
bundlePath = path.Join(storagePath, relativeBundlePath)
@@ -150,6 +149,7 @@
concurrentDownloads := config.GetInt(configConcurrentDownloads)
downloadQueueSize := config.GetInt(configDownloadQueueSize)
bundleMan := &bundleManager{
+ blobServerUrl: blobServerURL,
dbMan: dbMan,
apiMan: apiMan,
concurrentDownloads: concurrentDownloads,