add tracking apis
diff --git a/api.go b/api.go index f676aaa..98c7e16 100644 --- a/api.go +++ b/api.go
@@ -22,6 +22,7 @@ "io/ioutil" "net/http" "net/url" + "regexp" "strconv" "sync/atomic" "time" @@ -38,14 +39,19 @@ ) const ( - deploymentsEndpoint = "/configurations" - blobEndpointPath = "/blobs" - blobEndpoint = blobEndpointPath + "/{blobId}" + deploymentsEndpoint = "/configurations" + blobEndpointPath = "/blobs" + blobEndpoint = blobEndpointPath + "/{blobId}" + configStatusEndpoint = "/configstatus" + heartbeatEndpoint = "/heartbeat/{uuid}" + registerEndpoint = "/register/{uuid}" ) const ( API_ERR_BAD_BLOCK = iota + 1 API_ERR_INTERNAL + API_ERR_INVALID_PARAMETERS + API_ERR_FROM_TRACKER ) const ( @@ -94,6 +100,12 @@ ApiDeploymentsResponse []ApiDeploymentDetails `json:"contents"` } +type PutConfigStatusResponse struct { + Kind string + Self string + //Contents [] +} + //TODO add support for block and subscriber type apiManagerInterface interface { InitAPI() @@ -102,14 +114,18 @@ } type apiManager struct { - dbMan dbManagerInterface - deploymentsEndpoint string - blobEndpoint string - eTag int64 - deploymentsChanged chan interface{} - addSubscriber chan chan deploymentsResult - removeSubscriber chan chan deploymentsResult - apiInitialized bool + dbMan dbManagerInterface + trackerCl trackerClientInterface + deploymentsEndpoint string + blobEndpoint string + configStatusEndpoint string + heartbeatEndpoint string + registerEndpoint string + eTag int64 + deploymentsChanged chan interface{} + addSubscriber chan chan deploymentsResult + removeSubscriber chan chan deploymentsResult + apiInitialized bool } func (a *apiManager) InitAPI() { @@ -118,6 +134,9 @@ } services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET") services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET") + services.API().HandleFunc(a.configStatusEndpoint, a.apiPutConfigStatus).Methods("PUT") + services.API().HandleFunc(a.heartbeatEndpoint, a.apiPutHeartbeat).Methods("PUT") + services.API().HandleFunc(a.registerEndpoint, a.apiPostRegister).Methods("POST") a.apiInitialized = true log.Debug("API endpoints initialized") } @@ -346,6 +365,105 @@ w.Write(b) } +func (a *apiManager) apiPostRegister(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + uuid := vars["uuid"] + if !isValidUuid(uuid) { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing gateway uuid") + return + } + created := r.Header.Get("created") + if created == "" { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing created") + return + } + name := r.Header.Get("name") + if name == "" { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing name") + return + } + + pod := r.Header.Get("pod") + podType := r.Header.Get("podtype") + serviceType := r.Header.Get("type") + + trackerResp := a.trackerCl.postRegister(uuid, pod, created, name, podType, serviceType) + switch trackerResp.code { + case http.StatusOK: + a.writePostRegisterResp(w, trackerResp) + default: + log.Infof("putConfigStatus code: %v Reason: %v", trackerResp.code, trackerResp.errString) + a.writeError(w, trackerResp.code, API_ERR_FROM_TRACKER, trackerResp.errString) + } + +} + +func (a *apiManager) apiPutConfigStatus(w http.ResponseWriter, r *http.Request) { + + configId := r.URL.Query().Get("configid") + if configId == "" { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing configId") + return + } + uuid := r.Header.Get("uuid") + if !isValidUuid(uuid) { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing gateway uuid") + return + } + status := r.Header.Get("status") + if status == "" { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing status") + return + } + created := r.Header.Get("created") + if created == "" { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing created") + return + } + trackerResp := a.trackerCl.putConfigStatus(configId, status, uuid, created) + switch trackerResp.code { + case http.StatusOK: + a.writeConfigStatusResp(w, trackerResp) + default: + log.Infof("putConfigStatus code: %v Reason: %v", trackerResp.code, trackerResp.errString) + a.writeError(w, trackerResp.code, API_ERR_FROM_TRACKER, trackerResp.errString) + } +} + +func (a *apiManager) apiPutHeartbeat(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + uuid := vars["uuid"] + if !isValidUuid(uuid) { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing gateway uuid") + return + } + updated := r.Header.Get("updated") + if updated == "" { + a.writeError(w, http.StatusBadRequest, API_ERR_INVALID_PARAMETERS, "Bad/Missing updated") + return + } + trackerResp := a.trackerCl.putHeartbeat(uuid, updated) + switch trackerResp.code { + case http.StatusOK: + a.writePutHeartbeatResp(w, trackerResp) + default: + log.Infof("putConfigStatus code: %v Reason: %v", trackerResp.code, trackerResp.errString) + a.writeError(w, trackerResp.code, API_ERR_FROM_TRACKER, trackerResp.errString) + } +} + +func (a *apiManager) writeConfigStatusResp(w http.ResponseWriter, tr trackerResponse) { + w.Header().Add("Content-type", tr.contentType) +} + +func (a *apiManager) writePostRegisterResp(w http.ResponseWriter, tr trackerResponse) { + w.Header().Add("Content-type", tr.contentType) +} + +func (a *apiManager) writePutHeartbeatResp(w http.ResponseWriter, tr trackerResponse) { + w.Header().Add("Content-type", tr.contentType) +} + // call whenever the list of deployments changes func (a *apiManager) incrementETag() string { e := atomic.AddInt64(&a.eTag, 1) @@ -394,3 +512,11 @@ proto = proto + "://" + config.GetString(configAPIListen) return proto } + +func isValidUuid(uuid string) bool { + r, err := regexp.Compile("^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[8|9|aA|bB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}$") + if err != nil { + return false + } + return r.MatchString(uuid) +}
diff --git a/clients.go b/clients.go new file mode 100644 index 0000000..c260b98 --- /dev/null +++ b/clients.go
@@ -0,0 +1,153 @@ +package apiGatewayConfDeploy + +import ( + "io/ioutil" + "net/http" + "net/url" +) + +const ( + trackerConfigStatusEndpoint = "/serviceconfigstatus" + trackerHeartbeatEndpoint = "/serviceheartbeat/{uuid}" + trackerRegisterEndpoint = "/serviceregister/{uuid}" +) + +type trackerClientInterface interface { + putConfigStatus(configId, status, uuid, created string) trackerResponse + postRegister(uuid, pod, created, name, podType, serviceType string) trackerResponse + putHeartbeat(uuid, updated string) trackerResponse +} + +type trackerResponse struct { + code int + contentType string + errString string +} + +type trackerClient struct { + trackerBaseUrl string + clusterId string + httpclient *http.Client +} + +func (t *trackerClient) putConfigStatus(configId, status, uuid, created string) trackerResponse { + uri, err := url.Parse(t.trackerBaseUrl + trackerConfigStatusEndpoint) + if err != nil { + log.Errorf("putConfigStatus failed to parse tracker uri: %v", err) + return (internalError(err)) + } + req, err := http.NewRequest("PUT", uri, nil) + req.Header.Add("configid", configId) + req.Header.Add("Authorization", getAuthToken()) + req.Header.Add("status", status) + req.Header.Add("uuid", uuid) + req.Header.Add("created", created) + req.Header.Add("clusterid", t.clusterId) + + r, err := t.httpclient.Do(req) + if err != nil { + log.Errorf("trackerClient communication error: %v", err) + return (internalError(err)) + } + defer r.Body.Close() + res := trackerResponse{} + switch r.StatusCode { + case http.StatusOK: + res.code = r.StatusCode + res.contentType = r.Header.Get("Content-type") + default: + res.code = r.StatusCode + res.contentType = r.Header.Get("Content-type") + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return (internalError(err)) + } + res.errString = string(body) + } + return res +} + +func (t *trackerClient) postRegister(uuid, pod, created, name, podType, serviceType string) trackerResponse { + uri, err := url.Parse(t.trackerBaseUrl + trackerRegisterEndpoint) + if err != nil { + log.Errorf("postRegister failed to parse tracker uri: %v", err) + return (internalError(err)) + } + req, err := http.NewRequest("PUT", uri, nil) + req.Header.Add("uuid", uuid) + req.Header.Add("Authorization", getAuthToken()) + req.Header.Add("pod", pod) + req.Header.Add("podtype", podType) + req.Header.Add("created", created) + req.Header.Add("name", name) + req.Header.Add("type", serviceType) + req.Header.Add("clusterid", t.clusterId) + + r, err := t.httpclient.Do(req) + if err != nil { + log.Errorf("trackerClient communication error: %v", err) + return (internalError(err)) + } + defer r.Body.Close() + res := trackerResponse{} + switch r.StatusCode { + case http.StatusOK: + res.code = r.StatusCode + res.contentType = r.Header.Get("Content-type") + default: + res.code = r.StatusCode + res.contentType = r.Header.Get("Content-type") + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return (internalError(err)) + } + res.errString = string(body) + } + return res +} + +func (t *trackerClient) putHeartbeat(uuid, updated string) trackerResponse { + uri, err := url.Parse(t.trackerBaseUrl + trackerHeartbeatEndpoint) + if err != nil { + log.Errorf("putHeartbeat failed to parse tracker uri: %v", err) + return (internalError(err)) + } + req, err := http.NewRequest("PUT", uri, nil) + req.Header.Add("uuid", uuid) + req.Header.Add("Authorization", getAuthToken()) + req.Header.Add("updated", updated) + req.Header.Add("clusterid", t.clusterId) + + r, err := t.httpclient.Do(req) + if err != nil { + log.Errorf("trackerClient communication error: %v", err) + return (internalError(err)) + } + defer r.Body.Close() + res := trackerResponse{} + switch r.StatusCode { + case http.StatusOK: + res.code = r.StatusCode + res.contentType = r.Header.Get("Content-type") + default: + res.code = r.StatusCode + res.contentType = r.Header.Get("Content-type") + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return (internalError(err)) + } + res.errString = string(body) + } + return res +} + +func internalError(err error) (res trackerResponse) { + res.code = http.StatusInternalServerError + res.errString = err.Error() + return res +} + +func getAuthToken() string { + //TODO + return "Bearer " +}
diff --git a/init.go b/init.go index 1c4e16d..919e7a6 100644 --- a/init.go +++ b/init.go
@@ -54,6 +54,7 @@ debounceDuration time.Duration apiServerBaseURI *url.URL eventHandler *apigeeSyncHandler + apidClusterId string ) func init() { @@ -75,6 +76,10 @@ return pluginData, fmt.Errorf("Missing required config value: %s", configBlobServerBaseURI) } + if !config.IsSet(configApidClusterID) { + return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID) + } + var err error apiServerBaseURI, err = url.Parse(config.GetString(configApiServerBaseURI)) if err != nil { @@ -122,6 +127,7 @@ return nil }, } + apidClusterId = config.GetString(configApidClusterID) // initialize db manager