Merge pull request #6 from 30x/uploader_testcases

Uploader testcases
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
index 345b4b8..590de46 100644
--- a/apidAnalytics_suite_test.go
+++ b/apidAnalytics_suite_test.go
@@ -4,6 +4,7 @@
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
 
+	"encoding/json"
 	"github.com/30x/apid"
 	"github.com/30x/apid/factory"
 	"io/ioutil"
@@ -38,11 +39,7 @@
 
 	db, err := apid.Data().DB()
 	Expect(err).NotTo(HaveOccurred())
-
-	createApidClusterTables(db)
-	createTables(db)
-	insertTestData(db)
-	setDB(db)
+	initDb(db)
 
 	// required config uapServerBase is not set, thus init should panic
 	Expect(apid.InitializePlugins).To(Panic())
@@ -50,20 +47,75 @@
 	config.Set(uapServerBase, "http://localhost:9000") // dummy value
 	Expect(apid.InitializePlugins).ToNot(Panic())
 
+	// create initial cache for tenant and developer info
 	config.Set(useCaching, true)
+
 	createTenantCache()
 	Expect(len(tenantCache)).To(Equal(1))
 
 	createDeveloperInfoCache()
 	Expect(len(developerInfoCache)).To(Equal(1))
 
+	// Analytics POST API
 	router := apid.API().Router()
 	router.HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}", func(w http.ResponseWriter, req *http.Request) {
 		saveAnalyticsRecord(w, req)
 	}).Methods("POST")
+
+	// Mock UAP collection endpoint
+	router.HandleFunc("/analytics", func(w http.ResponseWriter, req *http.Request) {
+		mockUAPCollection(w, req)
+	}).Methods("GET")
+
+	// fake AWS S3
+	router.HandleFunc("/upload", func(w http.ResponseWriter, req *http.Request) {
+		mockFinalDatastore(w, req)
+	}).Methods("PUT")
+
 	testServer = httptest.NewServer(router)
+
+	// ser serverbased to local so that responses can be mocked
+	config.Set(uapServerBase, testServer.URL)
 })
 
+func mockUAPCollection(w http.ResponseWriter, req *http.Request) {
+	if req.Header.Get("Authorization") == "" {
+		w.WriteHeader(http.StatusUnauthorized)
+	} else {
+		if req.URL.Query().Get("tenant") == "testorg~testenv" {
+			w.WriteHeader(http.StatusOK)
+
+			body := make(map[string]interface{})
+			body["url"] = testServer.URL + "/upload?awskey=xxxx"
+			bytes, _ := json.Marshal(body)
+			w.Write(bytes)
+		} else {
+			w.WriteHeader(http.StatusNotFound)
+		}
+	}
+}
+
+func mockFinalDatastore(w http.ResponseWriter, req *http.Request) {
+	status := req.URL.Query().Get("expected_status")
+	switch status {
+	case "ok":
+		w.WriteHeader(http.StatusOK)
+	case "serverError":
+		w.WriteHeader(http.StatusInternalServerError)
+	case "forbidden":
+		w.WriteHeader(http.StatusForbidden)
+	default:
+		w.WriteHeader(http.StatusOK)
+	}
+}
+
+func initDb(db apid.DB) {
+	createApidClusterTables(db)
+	createTables(db)
+	insertTestData(db)
+	setDB(db)
+}
+
 func createTables(db apid.DB) {
 	_, err := db.Exec(`
 		CREATE TABLE IF NOT EXISTS api_product (
diff --git a/upload_manager_test.go b/upload_manager_test.go
new file mode 100644
index 0000000..e0033e9
--- /dev/null
+++ b/upload_manager_test.go
@@ -0,0 +1,111 @@
+package apidAnalytics
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strconv"
+)
+
+var _ = Describe("test handleUploadDirStatus()", func() {
+	Context("successful upload", func() {
+		It("should delete dir from staging and remove entry from map", func() {
+			dirName := "testorg~testenv~20160101530000"
+			dirPath := filepath.Join(localAnalyticsStagingDir, dirName)
+
+			err := os.Mkdir(dirPath, os.ModePerm)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			info, e := os.Stat(dirPath)
+			Expect(e).ShouldNot(HaveOccurred())
+			handleUploadDirStatus(info, true)
+
+			status, _ := exists(dirPath)
+			Expect(status).To(BeFalse())
+			_, exists := retriesMap[dirName]
+			Expect(exists).To(BeFalse())
+		})
+	})
+	Context("unsuccessful upload", func() {
+		It("retry thrice before moving to failed", func() {
+			dirName := "testorg~testenv~20160101530000"
+			dirPath := filepath.Join(localAnalyticsStagingDir, dirName)
+
+			err := os.Mkdir(dirPath, os.ModePerm)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			info, e := os.Stat(dirPath)
+			Expect(e).ShouldNot(HaveOccurred())
+
+			// Retry thrice
+			for i := 1; i < maxRetries; i++ {
+				handleUploadDirStatus(info, false)
+
+				status, _ := exists(dirPath)
+				Expect(status).To(BeTrue())
+
+				cnt, exists := retriesMap[dirName]
+				Expect(exists).To(BeTrue())
+				Expect(cnt).To(Equal(i))
+			}
+
+			// after final retry, it should be moved to failed
+			handleUploadDirStatus(info, false)
+
+			failedPath := filepath.Join(localAnalyticsFailedDir, dirName)
+
+			status, _ := exists(failedPath)
+			Expect(status).To(BeTrue())
+
+			_, exists := retriesMap[dirName]
+			Expect(exists).To(BeFalse())
+		})
+	})
+})
+
+var _ = Describe("test retryFailedUploads()", func() {
+	Context("previously failed directories in failed folder", func() {
+		It("should be moved to staging directory", func() {
+			dirName := "testorg~testenv~20160101830000"
+			dirPath := filepath.Join(localAnalyticsFailedDir, dirName)
+
+			err := os.Mkdir(dirPath, os.ModePerm)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			retryFailedUploads()
+
+			stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
+
+			// move from failed to staging directory
+			status, _ := exists(dirPath)
+			Expect(status).To(BeFalse())
+
+			status, _ = exists(stagingPath)
+			Expect(status).To(BeTrue())
+		})
+		It("if multiple folders, then move only configured batch at a time", func() {
+			for i := 1; i < (retryFailedDirBatchSize * 2); i++ {
+				dirName := "testorg~testenv" + strconv.Itoa(i) + "~2016010183000"
+				dirPath := filepath.Join(localAnalyticsFailedDir, dirName)
+				err := os.Mkdir(dirPath, os.ModePerm)
+				Expect(err).ShouldNot(HaveOccurred())
+			}
+
+			// before count failed
+			dirs, _ := ioutil.ReadDir(localAnalyticsFailedDir)
+			failedDirCntBefore := len(dirs)
+
+			retryFailedUploads()
+
+			// after count failed
+			dirs, _ = ioutil.ReadDir(localAnalyticsFailedDir)
+			failedDirCntAfter := len(dirs)
+
+			Expect(failedDirCntBefore - failedDirCntAfter).
+				To(Equal(retryFailedDirBatchSize))
+
+		})
+	})
+})
diff --git a/uploader_test.go b/uploader_test.go
new file mode 100644
index 0000000..b4a6d84
--- /dev/null
+++ b/uploader_test.go
@@ -0,0 +1,146 @@
+package apidAnalytics
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"os"
+	"path/filepath"
+)
+
+var _ = Describe("test uploadFile()", func() {
+	It("should return status based on tenant", func() {
+		fakeDir := filepath.Join(localAnalyticsStagingDir, "testorg~testenv~20060102150405")
+		fp := filepath.Join(fakeDir, "fakefile.txt.gz")
+		os.Mkdir(fakeDir, os.ModePerm)
+		os.Create(fp)
+
+		By("valid tenant")
+		tenant := "testorg~testenv"
+		relativeFilePath := "/date=2006-01-02/time=15-04/fakefile.txt.gz"
+
+		status, err := uploadFile(tenant, relativeFilePath, fp)
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(status).To(BeTrue())
+
+		By("invalid tenant")
+		tenant = "o~e"
+		relativeFilePath = "/date=2006-01-02/time=15-04/fakefile.txt.gz"
+
+		status, err = uploadFile(tenant, relativeFilePath, fp)
+		Expect(err).Should(HaveOccurred())
+		Expect(status).To(BeFalse())
+	})
+})
+
+var _ = Describe("test uploadDir()", func() {
+	Context("valid tenant", func() {
+		It("should return true and delete the file", func() {
+			fakeDir := filepath.Join(localAnalyticsStagingDir, "testorg~testenv~20060102150605")
+			fp := filepath.Join(fakeDir, "fakefile.txt.gz")
+			os.Mkdir(fakeDir, os.ModePerm)
+			os.Create(fp)
+
+			dir, _ := os.Stat(fakeDir)
+
+			status := uploadDir(dir)
+			Expect(status).To(BeTrue())
+
+			e, _ := exists(fp)
+			Expect(e).To(BeFalse())
+		})
+	})
+	Context("invalid tenant", func() {
+		It("should return false and file should not be deleted", func() {
+			fakeDir := filepath.Join(localAnalyticsStagingDir, "o~e~20060102150605")
+			fp := filepath.Join(fakeDir, "fakefile.txt.gz")
+			os.Mkdir(fakeDir, os.ModePerm)
+			os.Create(fp)
+
+			dir, _ := os.Stat(fakeDir)
+
+			status := uploadDir(dir)
+			Expect(status).To(BeFalse())
+
+			e, _ := exists(fp)
+			Expect(e).To(BeTrue())
+		})
+	})
+})
+
+var _ = Describe("test getSignedUrl()", func() {
+	Context("invalid tenant", func() {
+		It("should return error", func() {
+			tenant := "org~env"
+			relativeFilePath := "/date=2016-01-01/time=22-45/a.txt.gz"
+
+			_, err := getSignedUrl(tenant, relativeFilePath)
+			Expect(err).Should(HaveOccurred())
+			Expect(err.Error()).Should(ContainSubstring("404 Not Found"))
+		})
+	})
+	Context("valid tenant", func() {
+		It("should return signed url", func() {
+			tenant := "testorg~testenv"
+			relativeFilePath := "/date=2016-01-01/time=22-45/a.txt.gz"
+
+			url, err := getSignedUrl(tenant, relativeFilePath)
+			Expect(err).ShouldNot(HaveOccurred())
+			Expect(url).ShouldNot(Equal(""))
+			log.Debugf(url)
+		})
+	})
+})
+
+var _ = Describe("test uploadFileToDatastore()", func() {
+	It("should return status based on response from mocked datastore", func() {
+		fakeDir := filepath.Join(localAnalyticsStagingDir, "d1~e1~20060102150405")
+		fp := filepath.Join(fakeDir, "fakefile.txt")
+		os.Mkdir(fakeDir, os.ModePerm)
+		os.Create(fp)
+
+		By("trying to upload not existng file")
+		signedUrl := testServer.URL + "/upload?expected_status=ok"
+		status, err := uploadFileToDatastore("nofile", signedUrl)
+		Expect(err).Should(HaveOccurred())
+		Expect(status).To(BeFalse())
+
+		By("successful file upload")
+		signedUrl = testServer.URL + "/upload?expected_status=ok"
+		status, err = uploadFileToDatastore(fp, signedUrl)
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(status).To(BeTrue())
+
+		By("trying to upload file after signed url has expired")
+		signedUrl = testServer.URL + "/upload?expected_status=forbidden"
+		status, err = uploadFileToDatastore(fp, signedUrl)
+		Expect(status).To(BeFalse())
+		Expect(err).Should(HaveOccurred())
+		Expect(err.Error()).Should(ContainSubstring("403 Forbidden"))
+
+		By("internal server error from datastore")
+		signedUrl = testServer.URL + "/upload?expected_status=serverError"
+		status, err = uploadFileToDatastore(fp, signedUrl)
+		Expect(status).To(BeFalse())
+		Expect(err).Should(HaveOccurred())
+		Expect(err.Error()).Should(
+			ContainSubstring("500 Internal Server Error"))
+
+	})
+})
+
+var _ = Describe("test splitDirName()", func() {
+	It("should return tenant and timestamp", func() {
+		dirname := "o1~e1~20060102150405"
+		tenant, timestamp := splitDirName(dirname)
+		Expect(tenant).To(Equal("o1~e1"))
+		Expect(timestamp).To(Equal("20060102150405"))
+	})
+})
+
+var _ = Describe("test getDateFromDirTimestamp()", func() {
+	It("should return date/time formatted timestamp", func() {
+		timestamp := "20060102150405"
+		dateHourTS := getDateFromDirTimestamp(timestamp)
+		Expect(dateHourTS).To(Equal("date=2006-01-02/time=15-04"))
+	})
+})