[XAPID-377] Added POST /analytics API implemention
diff --git a/api.go b/api.go
new file mode 100644
index 0000000..6f29dce
--- /dev/null
+++ b/api.go
@@ -0,0 +1,97 @@
+package apidAnalytics
+
+import (
+ "database/sql"
+ "encoding/json"
+ "github.com/30x/apid"
+ "net/http"
+)
+
+var analyticsBasePath string
+
+type errResponse struct {
+ ErrorCode string `json:"errorCode"`
+ Reason string `json:"reason"`
+}
+
+type dbError struct {
+ reason string
+ errorCode string
+}
+
+type tenant struct {
+ org string
+ env string
+}
+
+func initAPI(services apid.Services) {
+ log.Debug("initialized API's exposed by apidAnalytics plugin")
+ analyticsBasePath = config.GetString(configAnalyticsBasePath)
+ services.API().HandleFunc(analyticsBasePath + "/{bundle_scope_uuid}", saveAnalyticsRecord).Methods("POST")
+}
+
+func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) {
+
+ db, _ := data.DB()
+ if db == nil {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ w.Write([]byte("Still initializing service"))
+ return
+ }
+
+ vars := apid.API().Vars(r)
+ scopeuuid := vars["bundle_scope_uuid"]
+ tenant, err := getTenantForScope(scopeuuid)
+ if err.errorCode != "" {
+ w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+ switch err.errorCode {
+ case "SEARCH_INTERNAL_ERROR":
+ w.WriteHeader(http.StatusInternalServerError)
+ if err := json.NewEncoder(w).Encode(errResponse{"SEARCH_INTERNAL_ERROR", err.reason}); err != nil {
+ panic(err)
+ }
+ case "UNKNOWN_SCOPE":
+ w.WriteHeader(http.StatusBadRequest)
+ if err := json.NewEncoder(w).Encode(errResponse{"UNKNOWN_SCOPE", err.reason}); err != nil {
+ panic(err)
+ }
+ }
+ } else {
+ message := saveToFile(tenant)
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(message))
+ }
+}
+
+func getTenantForScope(scopeuuid string) (tenant, dbError) {
+ // TODO: create a cache during init and refresh it on every failure or listen for snapshot update event
+ var org, env string
+ {
+ db, err := apid.Data().DB()
+ switch {
+ case err != nil:
+ reason := err.Error()
+ errorCode := "SEARCH_INTERNAL_ERROR"
+ return tenant{org, env}, dbError{reason, errorCode}
+ }
+
+ error := db.QueryRow("SELECT env, org FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&env, &org)
+
+ switch {
+ case error == sql.ErrNoRows:
+ reason := "No tenant found for this scopeuuid: " + scopeuuid
+ errorCode := "UNKNOWN_SCOPE"
+ return tenant{org, env}, dbError{reason, errorCode}
+ case error != nil:
+ reason := error.Error()
+ errorCode := "SEARCH_INTERNAL_ERROR"
+ return tenant{org, env}, dbError{reason, errorCode}
+ }
+ }
+ return tenant{org, env}, dbError{}
+}
+
+func saveToFile(tenant tenant) string {
+ message := "hey " + tenant.org + "~" + tenant.env
+ return message
+}
diff --git a/api_test.go b/api_test.go
new file mode 100644
index 0000000..1c9ac2f
--- /dev/null
+++ b/api_test.go
@@ -0,0 +1,51 @@
+package apidAnalytics
+
+import (
+ "net/http"
+ "net/url"
+ "strings"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+// BeforeSuite setup and AfterSuite cleanup is in apidAnalytics_suite_test.go
+var _ = Describe("testing saveAnalyticsRecord() directly", func() {
+
+ Context("valid scopeuuid", func() {
+
+ It("should successfully return", func() {
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = analyticsBasePath
+
+ v := url.Values{}
+ v.Add("bundle_scope_uuid", "testid")
+
+ client := &http.Client{}
+ req, err := http.NewRequest("POST", uri.String(), strings.NewReader(v.Encode()))
+ res, err := client.Do(req)
+ defer res.Body.Close()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(res.StatusCode, http.StatusOK)
+ })
+ })
+
+ Context("invalid scopeuuid", func() {
+
+ It("should return bad request", func() {
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = analyticsBasePath
+
+ v := url.Values{}
+ v.Add("bundle_scope_uuid", "wrongId")
+
+ client := &http.Client{}
+ req, err := http.NewRequest("POST", uri.String(), strings.NewReader(v.Encode()))
+ res, err := client.Do(req)
+ defer res.Body.Close()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(res.StatusCode, http.StatusBadRequest)
+ })
+ })
+})
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
new file mode 100644
index 0000000..314defb
--- /dev/null
+++ b/apidAnalytics_suite_test.go
@@ -0,0 +1,123 @@
+package apidAnalytics
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/30x/apid"
+ "github.com/30x/apid/factory"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "sync"
+ "testing"
+)
+
+var (
+ testTempDir string
+ testServer *httptest.Server
+ unsafeDB apid.DB
+ dbMux sync.RWMutex
+)
+
+func TestApidAnalytics(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "ApidAnalytics Suite")
+}
+
+var _ = BeforeSuite(func() {
+ apid.Initialize(factory.DefaultServicesFactory())
+
+ config := apid.Config()
+
+ var err error
+ testTempDir, err = ioutil.TempDir("", "api_test")
+ Expect(err).NotTo(HaveOccurred())
+
+ config.Set("data_path", testTempDir)
+ config.Set(uapEndpoint, "http://localhost:9000") // dummy value
+
+ apid.InitializePlugins()
+
+ db, err := apid.Data().DB()
+ Expect(err).NotTo(HaveOccurred())
+ setDB(db)
+ createApidClusterTables(db)
+ insertTestData(db)
+ testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == analyticsBasePathDefault {
+ saveAnalyticsRecord(w, req)
+ }
+ }))
+})
+
+func setDB(db apid.DB) {
+ dbMux.Lock()
+ unsafeDB = db
+ dbMux.Unlock()
+}
+
+func createApidClusterTables(db apid.DB) {
+ _, err := db.Exec(`
+CREATE TABLE apid_cluster (
+ id text,
+ instance_id text,
+ name text,
+ description text,
+ umbrella_org_app_name text,
+ created int64,
+ created_by text,
+ updated int64,
+ updated_by text,
+ _change_selector text,
+ snapshotInfo text,
+ lastSequence text,
+ PRIMARY KEY (id)
+);
+CREATE TABLE data_scope (
+ id text,
+ apid_cluster_id text,
+ scope text,
+ org text,
+ env text,
+ created int64,
+ created_by text,
+ updated int64,
+ updated_by text,
+ _change_selector text,
+ PRIMARY KEY (id)
+);
+`)
+ if err != nil {
+ log.Panic("Unable to initialize DB", err)
+ }
+}
+
+func insertTestData(db apid.DB) {
+
+ txn, err := db.Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, apid_cluster_id, scope, org, env) "+
+ "VALUES"+
+ "($1,$2,$3,$4,$5,$6)",
+ "testid",
+ "some_cluster_id",
+ "some_cluster_id",
+ "tenant_id_xxxx",
+ "testorg",
+ "testenv",
+ )
+ txn.Commit()
+ var count int64
+ db.QueryRow("select count(*) from data_scope").Scan(&count)
+}
+
+var _ = AfterSuite(func() {
+ apid.Events().Close()
+ if testServer != nil {
+ testServer.Close()
+ }
+ os.RemoveAll(testTempDir)
+})
diff --git a/apidAnalytics_test.go b/apidAnalytics_test.go
new file mode 100644
index 0000000..97050f4
--- /dev/null
+++ b/apidAnalytics_test.go
@@ -0,0 +1,25 @@
+package apidAnalytics
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("test getTenantForScope()", func() {
+ Context("get tenant for valid scopeuuid", func() {
+ It("should return testorg and testenv", func() {
+ tenant, dbError := getTenantForScope("testid")
+ Expect(dbError.reason).To(Equal(""))
+ Expect(tenant.org).To(Equal("testorg"))
+ Expect(tenant.env).To(Equal("testenv"))
+ })
+ })
+
+ Context("get tenant for invalid scopeuuid", func() {
+ It("should return empty tenant and a db error", func() {
+ tenant, dbError := getTenantForScope("wrongid")
+ Expect(tenant.org).To(Equal(""))
+ Expect(dbError.errorCode).To(Equal("UNKNOWN_SCOPE"))
+ })
+ })
+})
\ No newline at end of file
diff --git a/cmd/apidAnalytics/main.go b/cmd/apidAnalytics/main.go
new file mode 100644
index 0000000..10689f1
--- /dev/null
+++ b/cmd/apidAnalytics/main.go
@@ -0,0 +1,29 @@
+package main
+
+import (
+ "github.com/30x/apid"
+ "github.com/30x/apid/factory"
+
+ _ "github.com/30x/apidAnalytics"
+)
+
+func main() {
+ // initialize apid using default services
+ apid.Initialize(factory.DefaultServicesFactory())
+
+ log := apid.Log()
+
+ // this will call all initialization functions on all registered plugins
+ apid.InitializePlugins()
+
+ // print the base url to the console
+ config := apid.Config()
+ basePath := config.GetString("analyticsBasePath")
+ port := config.GetString("api_port")
+ log.Printf("Analytics API is at: http://localhost:%s%s", port, basePath)
+
+ // start client API listener
+ api := apid.API()
+ err := api.Listen() // doesn't return if no error
+ log.Fatalf("Error. Is something already running on port %d? %s", port, err)
+}
diff --git a/glide.yaml b/glide.yaml
new file mode 100644
index 0000000..4f63803
--- /dev/null
+++ b/glide.yaml
@@ -0,0 +1,8 @@
+package: github.com/30x/apidAnalytics
+
+import:
+- package: github.com/30x/apid
+ version: master
+testImport:
+- package: github.com/onsi/ginkgo/ginkgo
+- package: github.com/onsi/gomega
diff --git a/init.go b/init.go
new file mode 100644
index 0000000..9f1d284
--- /dev/null
+++ b/init.go
@@ -0,0 +1,137 @@
+package apidAnalytics
+
+import (
+ "fmt"
+ "github.com/30x/apid"
+ "os"
+ "path/filepath"
+)
+
+// TODO: figure out how to get these from a apid config file vs constant values
+const (
+ configAnalyticsBasePath = "apidanalytics_base_path" // config
+ analyticsBasePathDefault = "/analytics"
+
+ configAnalyticsDataPath = "apidanalytics_data_path" // config
+ analyticsDataPathDefault = "/ax"
+
+ analyticsCollectionInterval = "apidanalytics_collection_interval" // config in seconds
+ analyticsCollectionIntervalDefault = "120"
+
+ analyticsCollectionNoFiles = "apidanalytics_collection_no_files" // config
+ analyticsCollectionNoFilesDefault = "1"
+
+ analyticsUploadInterval = "apidanalytics_upload_interval" // config in seconds
+ analyticsUploadIntervalDefault = "5"
+
+ uapEndpoint = "apidanalytics_uap_endpoint" // config
+
+ uapRepo = "apidanalytics_uap_repo" // config
+ uapRepoDefault = "edge"
+
+ uapDataset = "apidanalytics_uap_dataset" // config
+ uapDatasetDefault = "api"
+
+ maxRetries = 3
+)
+
+// keep track of the services that this plugin will use
+// note: services would also be available directly via the package global "apid" (eg. `apid.Log()`)
+var (
+ log apid.LogService
+ config apid.ConfigService
+ data apid.DataService
+
+ localAnalyticsBaseDir string
+ localAnalyticsTempDir string
+ localAnalyticsStagingDir string
+ localAnalyticsFailedDir string
+ localAnalyticsRecoveredDir string
+)
+
+// apid.RegisterPlugin() is required to be called in init()
+func init() {
+ apid.RegisterPlugin(initPlugin)
+}
+
+// initPlugin will be called by apid to initialize
+func initPlugin(services apid.Services) (apid.PluginData, error) {
+
+ // set a logger that is annotated for this plugin
+ log = services.Log().ForModule("analytics")
+ log.Debug("start init for apidAnalytics plugin")
+
+ // set configuration
+ err := setConfig(services)
+ if err != nil {
+ return pluginData, fmt.Errorf("Missing required config value: %s: ", err)
+ }
+
+ for _, key := range []string{uapEndpoint} {
+ if !config.IsSet(key) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", key)
+ }
+ }
+
+ directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir}
+ err = createDirectories(directories)
+
+ if err != nil {
+ return pluginData, fmt.Errorf("Cannot create required local directories %s: ", err)
+ }
+
+ data = services.Data()
+
+ // TODO: perform crash recovery
+ initUploadManager()
+ initAPI(services)
+
+ log.Debug("end init for apidAnalytics plugin")
+ return pluginData, nil
+}
+
+func setConfig(services apid.Services) error {
+ config = services.Config()
+
+ // set plugin config defaults
+ config.SetDefault(configAnalyticsBasePath, analyticsBasePathDefault)
+ config.SetDefault(configAnalyticsDataPath, analyticsDataPathDefault)
+
+ if !config.IsSet("local_storage_path") {
+ return fmt.Errorf("Missing required config value: local_storage_path")
+ }
+
+ // set local directory paths that will be used to buffer analytics data on disk
+ localAnalyticsBaseDir = filepath.Join(config.GetString("local_storage_path"), config.GetString(configAnalyticsDataPath))
+ localAnalyticsTempDir = filepath.Join(localAnalyticsBaseDir, "tmp")
+ localAnalyticsStagingDir = filepath.Join(localAnalyticsBaseDir, "staging")
+ localAnalyticsFailedDir = filepath.Join(localAnalyticsBaseDir, "failed")
+ localAnalyticsRecoveredDir = filepath.Join(localAnalyticsBaseDir, "recovered")
+
+ // set default config for collection interval and number of files per interval
+ config.SetDefault(analyticsCollectionInterval, analyticsCollectionIntervalDefault)
+ config.SetDefault(analyticsCollectionNoFiles, analyticsCollectionNoFilesDefault)
+
+ // set default config for upload interval
+ config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault)
+
+ // set defaults for uap related properties
+ config.SetDefault(uapRepo, uapRepoDefault)
+ config.SetDefault(uapDataset, uapDatasetDefault)
+
+ return nil
+}
+
+// create all missing directories if required
+func createDirectories(directories []string) error {
+ for _, path := range directories {
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ error := os.Mkdir(path, os.ModePerm)
+ if error != nil {
+ return error
+ }
+ log.Infof("created directory %s: ", path)
+ }
+ }
+ return nil
+}
diff --git a/pluginData.go b/pluginData.go
new file mode 100644
index 0000000..af0faa6
--- /dev/null
+++ b/pluginData.go
@@ -0,0 +1,11 @@
+package apidAnalytics
+
+import "github.com/30x/apid"
+
+var pluginData = apid.PluginData{
+ Name: "apidAnalytics",
+ Version: "0.0.1",
+ ExtraData: map[string]interface{}{
+ "schemaVersion": "0.0.1",
+ },
+}
diff --git a/uploadManager.go b/uploadManager.go
new file mode 100644
index 0000000..43290d2
--- /dev/null
+++ b/uploadManager.go
@@ -0,0 +1,68 @@
+package apidAnalytics
+
+import (
+ _ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "time"
+)
+
+var (
+ retriesMap map[string]int
+)
+
+//TODO: make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running
+func initUploadManager() {
+
+ retriesMap = make(map[string]int)
+
+ // TODO: add a way to make sure that this go routine is always running
+ go func() {
+ ticker := time.NewTicker(time.Millisecond * config.GetDuration(analyticsUploadInterval) * 1000)
+ log.Debugf("Intialized upload manager to check for staging directory")
+ defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running
+
+ for t := range ticker.C {
+ files, err := ioutil.ReadDir(localAnalyticsStagingDir)
+
+ if err != nil {
+ log.Errorf("Cannot read directory %s: ", localAnalyticsStagingDir)
+ }
+
+ for _, file := range files {
+ log.Debugf("t: %s , file: %s", t, file.Name())
+ if file.IsDir() {
+ handleUploadDirStatus(file, uploadDir(file))
+ }
+ }
+ }
+ }()
+
+}
+
+func uploadDir(file os.FileInfo) bool {
+ // TODO: handle upload to UAP file by file
+ return false
+}
+
+func handleUploadDirStatus(file os.FileInfo, status bool) {
+ completePath := filepath.Join(localAnalyticsStagingDir, file.Name())
+ if status {
+ os.RemoveAll(completePath)
+ // remove key if exists from retry map after a successful upload
+ delete(retriesMap, file.Name())
+ } else {
+ retriesMap[file.Name()] = retriesMap[file.Name()] + 1
+ if retriesMap[file.Name()] > maxRetries {
+ log.Errorf("Max Retires exceeded for folder: %s", completePath)
+ failedDirPath := filepath.Join(localAnalyticsFailedDir, file.Name())
+ err := os.Rename(completePath, failedDirPath)
+ if err != nil {
+ log.Errorf("Cannot move directory :%s to failed folder", file.Name())
+ }
+ // remove key from retry map once it reaches allowed max failed attempts
+ delete(retriesMap, file.Name())
+ }
+ }
+}