Bundle download retries, bug fixes, moar testing!
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index 24a8894..c8be184 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -11,6 +11,7 @@ "net/http/httptest" "os" "testing" + "time" ) var ( @@ -33,9 +34,17 @@ apid.InitializePlugins() router := apid.API().Router() - // fake bundle repo - router.HandleFunc("/bundle", func(w http.ResponseWriter, req *http.Request) { - w.Write([]byte("bundle stuff")) + // fake unreliable bundle repo + downloadMultiplier = 10 * time.Millisecond + count := 0 + router.HandleFunc("/bundle/{id}", func(w http.ResponseWriter, req *http.Request) { + count++ + if count % 2 == 0 { + w.WriteHeader(500) + return + } + vars := apid.API().Vars(req) + w.Write([]byte("/bundle/" + vars["id"])) }) testServer = httptest.NewServer(router) })
diff --git a/deployments.go b/deployments.go index 5c1b6b2..2f51b09 100644 --- a/deployments.go +++ b/deployments.go
@@ -11,11 +11,17 @@ "encoding/base64" "path" "errors" + "io/ioutil" + "time" +) + +const ( + DOWNLOAD_ATTEMPTS = 3 ) var ( - bundlePath string - gitHubAccessToken string // todo: temporary - should come from Manifest + gitHubAccessToken string // todo: temporary - will not be used + downloadMultiplier = 10 * time.Second ) type systemBundle struct { @@ -68,12 +74,11 @@ } // retrieveBundle retrieves bundle data from a URI -func retrieveBundle(uriString string) (io.ReadCloser, error) { +func getBundleReader(uriString string) (io.ReadCloser, error) { uri, err := url.Parse(uriString) if err != nil { return nil, fmt.Errorf("DownloadFileUrl: Failed to parse urlStr: %s", uriString) - } // todo: temporary - if not a github url, just open it or call GET on it @@ -92,6 +97,9 @@ if err != nil { return nil, err } + if res.StatusCode != 200 { + return nil, fmt.Errorf("Bundle uri %s failed with status %s", uriString, res.StatusCode) + } return res.Body, nil } @@ -99,28 +107,67 @@ return github.GetUrlData(uri, gitHubAccessToken) } -// todo: retry on error // check if already exists and skip func prepareBundle(depID string, bun bundle) error { bundleFile := getBundleFilePath(depID, bun.URI) - out, err := os.Create(bundleFile) + bundleDir := path.Dir(bundleFile) + + downloadBundle := func() (fileName string, err error) { + + log.Debugf("Downloading bundle: %s", bun.URI) + + var tempFile *os.File + tempFile, err = ioutil.TempFile(bundleDir, "download") + if err != nil { + log.Errorf("Unable to create temp file: %v", err) + return + } + fileName = tempFile.Name() + + var bundleReader io.ReadCloser + bundleReader, err = getBundleReader(bun.URI) + if err != nil { + log.Errorf("Unable to retrieve bundle %s: %v", bun.URI, err) + return + } + defer bundleReader.Close() + + _, err = io.Copy(tempFile, bundleReader) + if err != nil { + log.Errorf("Unable to write bundle %s: %v", tempFile, err) + } + + return + } + + // retry + var tempFile string + var err error + for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ { + tempFile, err = downloadBundle() + if err == nil { + break + } + if tempFile != "" { + os.Remove(tempFile) + } + + // simple back-off, we could potentially be more sophisticated + retryIn := time.Duration(i) * downloadMultiplier + log.Debugf("will retry download in %s", retryIn) + time.Sleep(retryIn) + } + if err != nil { - log.Errorf("Unable to create bundle file %s, Err: %s", bundleFile, err) + log.Errorf("failed %s download attempts. aborting.", DOWNLOAD_ATTEMPTS) return err } - defer out.Close() - bundleData, err := retrieveBundle(bun.URI) + err = os.Rename(tempFile, bundleFile) if err != nil { - log.Errorf("Unable to retrieve bundle %s, Err: %s", bun.URI, err) - return err - } - defer bundleData.Close() - - _, err = io.Copy(out, bundleData) - if err != nil { - log.Errorf("Unable to write bundle %s, Err: %s", bundleFile, err) + log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err) + os.Remove(tempFile) return err } @@ -128,7 +175,7 @@ } func getDeploymentFilesPath(depID string) string { - return bundlePath + "/" + depID + return path.Join(bundlePath, depID) } func getBundleFilePath(depID string, bundleURI string) string { @@ -148,7 +195,7 @@ } deploymentPath := getDeploymentFilesPath(depID) - err = os.Mkdir(deploymentPath, 0700) + err = os.MkdirAll(deploymentPath, 0700) if err != nil { log.Errorf("Deployment dir creation failed: %v", err) return err @@ -156,7 +203,8 @@ // download bundles and store them locally errorsChan := make(chan error, len(dep.Bundles)) - for i, bun := range dep.Bundles { + for i := range dep.Bundles { + bun := dep.Bundles[i] go func() { err := prepareBundle(depID, bun) errorsChan <- err @@ -185,7 +233,7 @@ func parseManifest(manifestString string) (dep deployment, err error) { err = json.Unmarshal([]byte(manifestString), &dep) if err != nil { - log.Errorf("JSON decoding Manifest failed Err: %v", err) + log.Errorf("JSON decoding Manifest failed: %v", err) return }
diff --git a/init.go b/init.go index d6c2298..adaa6e7 100644 --- a/init.go +++ b/init.go
@@ -1,10 +1,10 @@ package apiGatewayDeploy import ( + "database/sql" "github.com/30x/apid" "github.com/30x/apidGatewayDeploy/github" "os" - "database/sql" "path" ) @@ -14,8 +14,9 @@ ) var ( - log apid.LogService - db *sql.DB + log apid.LogService + db *sql.DB + bundlePath string ) func init() { @@ -37,7 +38,7 @@ log.Panicf("Failed bundle directory creation: %v", err) } storagePath := config.GetString("local_storage_path") - bundlePath := path.Join(storagePath, relativeBundlePath) + bundlePath = path.Join(storagePath, relativeBundlePath) log.Infof("Bundle directory path is %s", bundlePath) gitHubAccessToken = config.GetString(configGithubAccessToken)
diff --git a/listener.go b/listener.go index 5fde4b3..c1e1000 100644 --- a/listener.go +++ b/listener.go
@@ -77,40 +77,30 @@ func processNewManifest(row common.Row) error { - var deploymentID, manifest string + var deploymentID, manifestString string err := row.Get("id", &deploymentID) if err != nil { return err } - err = row.Get("manifest_body", &manifest) + err = row.Get("manifest_body", &manifestString) if err != nil { return err } - //err = queueDeployment(deploymentID, manifest) - // - //if err == nil { - // go serviceDeploymentQueue() - //} - bypassQueue(deploymentID, manifest) - - return err -} - - -func bypassQueue(depID, manifestString string) { - manifest, err := parseManifest(manifestString) if err != nil { - return + log.Errorf("error parsing manifest: %v", err) + return err } - err = prepareDeployment(depID, manifest) + err = prepareDeployment(deploymentID, manifest) if err != nil { - log.Errorf("serviceDeploymentQueue prepare deployment failed: %v", depID) - return + log.Errorf("serviceDeploymentQueue prepare deployment failed: %s", deploymentID) + return err } - log.Debugf("Signaling new deployment ready: %s", depID) - incoming <- depID -} \ No newline at end of file + log.Debugf("Signaling new deployment ready: %s", deploymentID) + incoming <- deploymentID + + return nil +}
diff --git a/listener_test.go b/listener_test.go index b72f9d7..28d6ce3 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -7,6 +7,7 @@ . "github.com/onsi/gomega" "net/url" "github.com/apigee-labs/transicator/common" + "io/ioutil" ) var _ = Describe("listener", func() { @@ -17,18 +18,25 @@ uri, err := url.Parse(testServer.URL) Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundle" - bundleUri := uri.String() + uri.Path = "/bundle/1" + bundleUri1 := uri.String() + uri.Path = "/bundle/2" + bundleUri2 := uri.String() dep := deployment{ DeploymentID: deploymentID, System: bundle{ - URI: bundleUri, + URI: "whatever", }, Bundles: []bundle{ { - BundleID: "bun", - URI: bundleUri, + BundleID: "/bundle/1", + URI: bundleUri1, + Scope: "some-scope", + }, + { + BundleID: "/bundle/2", + URI: bundleUri2, Scope: "some-scope", }, }, @@ -60,9 +68,7 @@ return } - depID, err := getCurrentDeploymentID() - Expect(err).ShouldNot(HaveOccurred()) - Expect(depID).Should(Equal(deploymentID)) + testDeployment(dep) close(done) }, @@ -79,7 +85,7 @@ uri, err := url.Parse(testServer.URL) Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundle" + uri.Path = "/bundle/1" bundleUri := uri.String() dep := deployment{ @@ -89,7 +95,7 @@ }, Bundles: []bundle{ { - BundleID: "bun", + BundleID: "/bundle/1", URI: bundleUri, Scope: "some-scope", }, @@ -123,9 +129,7 @@ return } - depID, err := getCurrentDeploymentID() - Expect(err).ShouldNot(HaveOccurred()) - Expect(depID).Should(Equal(deploymentID)) + testDeployment(dep) close(done) }, @@ -149,3 +153,23 @@ func (t *test_handler) Handle(event apid.Event) { t.f(event) } + +func testDeployment(dep deployment) { + + depID, err := getCurrentDeploymentID() + Expect(err).ShouldNot(HaveOccurred()) + Expect(depID).Should(Equal(dep.DeploymentID)) + + deployment, err := getDeployment(depID) + Expect(deployment.Bundles).To(HaveLen(len(dep.Bundles))) + + for _, b := range dep.Bundles { + bundleFile := getBundleFilePath(depID, b.URI) + Expect(err).ShouldNot(HaveOccurred()) + Expect(bundleFile).To(BeARegularFile()) + + bytes, err := ioutil.ReadFile(bundleFile) + Expect(err).ShouldNot(HaveOccurred()) + Expect(string(bytes)).Should(Equal(b.BundleID)) + } +}