[XAPID-377] Added POST /analytics API implemention
diff --git a/api.go b/api.go new file mode 100644 index 0000000..6f29dce --- /dev/null +++ b/api.go
@@ -0,0 +1,97 @@ +package apidAnalytics + +import ( + "database/sql" + "encoding/json" + "github.com/30x/apid" + "net/http" +) + +var analyticsBasePath string + +type errResponse struct { + ErrorCode string `json:"errorCode"` + Reason string `json:"reason"` +} + +type dbError struct { + reason string + errorCode string +} + +type tenant struct { + org string + env string +} + +func initAPI(services apid.Services) { + log.Debug("initialized API's exposed by apidAnalytics plugin") + analyticsBasePath = config.GetString(configAnalyticsBasePath) + services.API().HandleFunc(analyticsBasePath + "/{bundle_scope_uuid}", saveAnalyticsRecord).Methods("POST") +} + +func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) { + + db, _ := data.DB() + if db == nil { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("Still initializing service")) + return + } + + vars := apid.API().Vars(r) + scopeuuid := vars["bundle_scope_uuid"] + tenant, err := getTenantForScope(scopeuuid) + if err.errorCode != "" { + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + switch err.errorCode { + case "SEARCH_INTERNAL_ERROR": + w.WriteHeader(http.StatusInternalServerError) + if err := json.NewEncoder(w).Encode(errResponse{"SEARCH_INTERNAL_ERROR", err.reason}); err != nil { + panic(err) + } + case "UNKNOWN_SCOPE": + w.WriteHeader(http.StatusBadRequest) + if err := json.NewEncoder(w).Encode(errResponse{"UNKNOWN_SCOPE", err.reason}); err != nil { + panic(err) + } + } + } else { + message := saveToFile(tenant) + w.WriteHeader(http.StatusOK) + w.Write([]byte(message)) + } +} + +func getTenantForScope(scopeuuid string) (tenant, dbError) { + // TODO: create a cache during init and refresh it on every failure or listen for snapshot update event + var org, env string + { + db, err := apid.Data().DB() + switch { + case err != nil: + reason := err.Error() + errorCode := "SEARCH_INTERNAL_ERROR" + return tenant{org, env}, dbError{reason, errorCode} + } + + error := db.QueryRow("SELECT env, org FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&env, &org) + + switch { + case error == sql.ErrNoRows: + reason := "No tenant found for this scopeuuid: " + scopeuuid + errorCode := "UNKNOWN_SCOPE" + return tenant{org, env}, dbError{reason, errorCode} + case error != nil: + reason := error.Error() + errorCode := "SEARCH_INTERNAL_ERROR" + return tenant{org, env}, dbError{reason, errorCode} + } + } + return tenant{org, env}, dbError{} +} + +func saveToFile(tenant tenant) string { + message := "hey " + tenant.org + "~" + tenant.env + return message +}
diff --git a/api_test.go b/api_test.go new file mode 100644 index 0000000..1c9ac2f --- /dev/null +++ b/api_test.go
@@ -0,0 +1,51 @@ +package apidAnalytics + +import ( + "net/http" + "net/url" + "strings" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// BeforeSuite setup and AfterSuite cleanup is in apidAnalytics_suite_test.go +var _ = Describe("testing saveAnalyticsRecord() directly", func() { + + Context("valid scopeuuid", func() { + + It("should successfully return", func() { + + uri, err := url.Parse(testServer.URL) + uri.Path = analyticsBasePath + + v := url.Values{} + v.Add("bundle_scope_uuid", "testid") + + client := &http.Client{} + req, err := http.NewRequest("POST", uri.String(), strings.NewReader(v.Encode())) + res, err := client.Do(req) + defer res.Body.Close() + Expect(err).ShouldNot(HaveOccurred()) + Expect(res.StatusCode, http.StatusOK) + }) + }) + + Context("invalid scopeuuid", func() { + + It("should return bad request", func() { + uri, err := url.Parse(testServer.URL) + uri.Path = analyticsBasePath + + v := url.Values{} + v.Add("bundle_scope_uuid", "wrongId") + + client := &http.Client{} + req, err := http.NewRequest("POST", uri.String(), strings.NewReader(v.Encode())) + res, err := client.Do(req) + defer res.Body.Close() + Expect(err).ShouldNot(HaveOccurred()) + Expect(res.StatusCode, http.StatusBadRequest) + }) + }) +})
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go new file mode 100644 index 0000000..314defb --- /dev/null +++ b/apidAnalytics_suite_test.go
@@ -0,0 +1,123 @@ +package apidAnalytics + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/30x/apid" + "github.com/30x/apid/factory" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "sync" + "testing" +) + +var ( + testTempDir string + testServer *httptest.Server + unsafeDB apid.DB + dbMux sync.RWMutex +) + +func TestApidAnalytics(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ApidAnalytics Suite") +} + +var _ = BeforeSuite(func() { + apid.Initialize(factory.DefaultServicesFactory()) + + config := apid.Config() + + var err error + testTempDir, err = ioutil.TempDir("", "api_test") + Expect(err).NotTo(HaveOccurred()) + + config.Set("data_path", testTempDir) + config.Set(uapEndpoint, "http://localhost:9000") // dummy value + + apid.InitializePlugins() + + db, err := apid.Data().DB() + Expect(err).NotTo(HaveOccurred()) + setDB(db) + createApidClusterTables(db) + insertTestData(db) + testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.URL.Path == analyticsBasePathDefault { + saveAnalyticsRecord(w, req) + } + })) +}) + +func setDB(db apid.DB) { + dbMux.Lock() + unsafeDB = db + dbMux.Unlock() +} + +func createApidClusterTables(db apid.DB) { + _, err := db.Exec(` +CREATE TABLE apid_cluster ( + id text, + instance_id text, + name text, + description text, + umbrella_org_app_name text, + created int64, + created_by text, + updated int64, + updated_by text, + _change_selector text, + snapshotInfo text, + lastSequence text, + PRIMARY KEY (id) +); +CREATE TABLE data_scope ( + id text, + apid_cluster_id text, + scope text, + org text, + env text, + created int64, + created_by text, + updated int64, + updated_by text, + _change_selector text, + PRIMARY KEY (id) +); +`) + if err != nil { + log.Panic("Unable to initialize DB", err) + } +} + +func insertTestData(db apid.DB) { + + txn, err := db.Begin() + Expect(err).ShouldNot(HaveOccurred()) + + txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, apid_cluster_id, scope, org, env) "+ + "VALUES"+ + "($1,$2,$3,$4,$5,$6)", + "testid", + "some_cluster_id", + "some_cluster_id", + "tenant_id_xxxx", + "testorg", + "testenv", + ) + txn.Commit() + var count int64 + db.QueryRow("select count(*) from data_scope").Scan(&count) +} + +var _ = AfterSuite(func() { + apid.Events().Close() + if testServer != nil { + testServer.Close() + } + os.RemoveAll(testTempDir) +})
diff --git a/apidAnalytics_test.go b/apidAnalytics_test.go new file mode 100644 index 0000000..97050f4 --- /dev/null +++ b/apidAnalytics_test.go
@@ -0,0 +1,25 @@ +package apidAnalytics + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("test getTenantForScope()", func() { + Context("get tenant for valid scopeuuid", func() { + It("should return testorg and testenv", func() { + tenant, dbError := getTenantForScope("testid") + Expect(dbError.reason).To(Equal("")) + Expect(tenant.org).To(Equal("testorg")) + Expect(tenant.env).To(Equal("testenv")) + }) + }) + + Context("get tenant for invalid scopeuuid", func() { + It("should return empty tenant and a db error", func() { + tenant, dbError := getTenantForScope("wrongid") + Expect(tenant.org).To(Equal("")) + Expect(dbError.errorCode).To(Equal("UNKNOWN_SCOPE")) + }) + }) +}) \ No newline at end of file
diff --git a/cmd/apidAnalytics/main.go b/cmd/apidAnalytics/main.go new file mode 100644 index 0000000..10689f1 --- /dev/null +++ b/cmd/apidAnalytics/main.go
@@ -0,0 +1,29 @@ +package main + +import ( + "github.com/30x/apid" + "github.com/30x/apid/factory" + + _ "github.com/30x/apidAnalytics" +) + +func main() { + // initialize apid using default services + apid.Initialize(factory.DefaultServicesFactory()) + + log := apid.Log() + + // this will call all initialization functions on all registered plugins + apid.InitializePlugins() + + // print the base url to the console + config := apid.Config() + basePath := config.GetString("analyticsBasePath") + port := config.GetString("api_port") + log.Printf("Analytics API is at: http://localhost:%s%s", port, basePath) + + // start client API listener + api := apid.API() + err := api.Listen() // doesn't return if no error + log.Fatalf("Error. Is something already running on port %d? %s", port, err) +}
diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..4f63803 --- /dev/null +++ b/glide.yaml
@@ -0,0 +1,8 @@ +package: github.com/30x/apidAnalytics + +import: +- package: github.com/30x/apid + version: master +testImport: +- package: github.com/onsi/ginkgo/ginkgo +- package: github.com/onsi/gomega
diff --git a/init.go b/init.go new file mode 100644 index 0000000..9f1d284 --- /dev/null +++ b/init.go
@@ -0,0 +1,137 @@ +package apidAnalytics + +import ( + "fmt" + "github.com/30x/apid" + "os" + "path/filepath" +) + +// TODO: figure out how to get these from a apid config file vs constant values +const ( + configAnalyticsBasePath = "apidanalytics_base_path" // config + analyticsBasePathDefault = "/analytics" + + configAnalyticsDataPath = "apidanalytics_data_path" // config + analyticsDataPathDefault = "/ax" + + analyticsCollectionInterval = "apidanalytics_collection_interval" // config in seconds + analyticsCollectionIntervalDefault = "120" + + analyticsCollectionNoFiles = "apidanalytics_collection_no_files" // config + analyticsCollectionNoFilesDefault = "1" + + analyticsUploadInterval = "apidanalytics_upload_interval" // config in seconds + analyticsUploadIntervalDefault = "5" + + uapEndpoint = "apidanalytics_uap_endpoint" // config + + uapRepo = "apidanalytics_uap_repo" // config + uapRepoDefault = "edge" + + uapDataset = "apidanalytics_uap_dataset" // config + uapDatasetDefault = "api" + + maxRetries = 3 +) + +// keep track of the services that this plugin will use +// note: services would also be available directly via the package global "apid" (eg. `apid.Log()`) +var ( + log apid.LogService + config apid.ConfigService + data apid.DataService + + localAnalyticsBaseDir string + localAnalyticsTempDir string + localAnalyticsStagingDir string + localAnalyticsFailedDir string + localAnalyticsRecoveredDir string +) + +// apid.RegisterPlugin() is required to be called in init() +func init() { + apid.RegisterPlugin(initPlugin) +} + +// initPlugin will be called by apid to initialize +func initPlugin(services apid.Services) (apid.PluginData, error) { + + // set a logger that is annotated for this plugin + log = services.Log().ForModule("analytics") + log.Debug("start init for apidAnalytics plugin") + + // set configuration + err := setConfig(services) + if err != nil { + return pluginData, fmt.Errorf("Missing required config value: %s: ", err) + } + + for _, key := range []string{uapEndpoint} { + if !config.IsSet(key) { + return pluginData, fmt.Errorf("Missing required config value: %s", key) + } + } + + directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir} + err = createDirectories(directories) + + if err != nil { + return pluginData, fmt.Errorf("Cannot create required local directories %s: ", err) + } + + data = services.Data() + + // TODO: perform crash recovery + initUploadManager() + initAPI(services) + + log.Debug("end init for apidAnalytics plugin") + return pluginData, nil +} + +func setConfig(services apid.Services) error { + config = services.Config() + + // set plugin config defaults + config.SetDefault(configAnalyticsBasePath, analyticsBasePathDefault) + config.SetDefault(configAnalyticsDataPath, analyticsDataPathDefault) + + if !config.IsSet("local_storage_path") { + return fmt.Errorf("Missing required config value: local_storage_path") + } + + // set local directory paths that will be used to buffer analytics data on disk + localAnalyticsBaseDir = filepath.Join(config.GetString("local_storage_path"), config.GetString(configAnalyticsDataPath)) + localAnalyticsTempDir = filepath.Join(localAnalyticsBaseDir, "tmp") + localAnalyticsStagingDir = filepath.Join(localAnalyticsBaseDir, "staging") + localAnalyticsFailedDir = filepath.Join(localAnalyticsBaseDir, "failed") + localAnalyticsRecoveredDir = filepath.Join(localAnalyticsBaseDir, "recovered") + + // set default config for collection interval and number of files per interval + config.SetDefault(analyticsCollectionInterval, analyticsCollectionIntervalDefault) + config.SetDefault(analyticsCollectionNoFiles, analyticsCollectionNoFilesDefault) + + // set default config for upload interval + config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault) + + // set defaults for uap related properties + config.SetDefault(uapRepo, uapRepoDefault) + config.SetDefault(uapDataset, uapDatasetDefault) + + return nil +} + +// create all missing directories if required +func createDirectories(directories []string) error { + for _, path := range directories { + if _, err := os.Stat(path); os.IsNotExist(err) { + error := os.Mkdir(path, os.ModePerm) + if error != nil { + return error + } + log.Infof("created directory %s: ", path) + } + } + return nil +}
diff --git a/pluginData.go b/pluginData.go new file mode 100644 index 0000000..af0faa6 --- /dev/null +++ b/pluginData.go
@@ -0,0 +1,11 @@ +package apidAnalytics + +import "github.com/30x/apid" + +var pluginData = apid.PluginData{ + Name: "apidAnalytics", + Version: "0.0.1", + ExtraData: map[string]interface{}{ + "schemaVersion": "0.0.1", + }, +}
diff --git a/uploadManager.go b/uploadManager.go new file mode 100644 index 0000000..43290d2 --- /dev/null +++ b/uploadManager.go
@@ -0,0 +1,68 @@ +package apidAnalytics + +import ( + _ "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" +) + +var ( + retriesMap map[string]int +) + +//TODO: make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running +func initUploadManager() { + + retriesMap = make(map[string]int) + + // TODO: add a way to make sure that this go routine is always running + go func() { + ticker := time.NewTicker(time.Millisecond * config.GetDuration(analyticsUploadInterval) * 1000) + log.Debugf("Intialized upload manager to check for staging directory") + defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running + + for t := range ticker.C { + files, err := ioutil.ReadDir(localAnalyticsStagingDir) + + if err != nil { + log.Errorf("Cannot read directory %s: ", localAnalyticsStagingDir) + } + + for _, file := range files { + log.Debugf("t: %s , file: %s", t, file.Name()) + if file.IsDir() { + handleUploadDirStatus(file, uploadDir(file)) + } + } + } + }() + +} + +func uploadDir(file os.FileInfo) bool { + // TODO: handle upload to UAP file by file + return false +} + +func handleUploadDirStatus(file os.FileInfo, status bool) { + completePath := filepath.Join(localAnalyticsStagingDir, file.Name()) + if status { + os.RemoveAll(completePath) + // remove key if exists from retry map after a successful upload + delete(retriesMap, file.Name()) + } else { + retriesMap[file.Name()] = retriesMap[file.Name()] + 1 + if retriesMap[file.Name()] > maxRetries { + log.Errorf("Max Retires exceeded for folder: %s", completePath) + failedDirPath := filepath.Join(localAnalyticsFailedDir, file.Name()) + err := os.Rename(completePath, failedDirPath) + if err != nil { + log.Errorf("Cannot move directory :%s to failed folder", file.Name()) + } + // remove key from retry map once it reaches allowed max failed attempts + delete(retriesMap, file.Name()) + } + } +}