The basic functionality of the new generic Configuration deployment plugin.
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3f082d0 --- /dev/null +++ b/.gitignore
@@ -0,0 +1,7 @@ +profile.out +cover.html +coverage.txt +.idea +*.lock +*.iml +vendor/
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..6d364e1 --- /dev/null +++ b/CONTRIBUTING.md
@@ -0,0 +1,23 @@ +# How to Contribute + +We'd love to accept your patches and contributions to this project. There are +just a few small guidelines you need to follow. + +## Contributor License Agreement + +Contributions to this project must be accompanied by a Contributor License +Agreement. You (or your employer) retain the copyright to your contribution, +this simply gives us permission to use and redistribute your contributions as +part of the project. Head over to <https://cla.developers.google.com/> to see +your current agreements on file or to sign a new one. + +You generally only need to submit a CLA once, so if you've already submitted one +(even if it was for a different project), you probably don't need to do it +again. + +## Code reviews + +All submissions, including submissions by project members, require review. We +use GitHub pull requests for this purpose. Consult +[GitHub Help](https://help.github.com/articles/about-pull-requests/) for more +information on using pull requests. \ No newline at end of file
diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7a4a3ea --- /dev/null +++ b/LICENSE
@@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file
diff --git a/README.md b/README.md new file mode 100644 index 0000000..dd683e9 --- /dev/null +++ b/README.md
@@ -0,0 +1,15 @@ +# apidGatewayConfDeploy + +apidGatewayConfDeploy is a plugin for +[apid](http://github.com/30x/apid). + +Gateways acting as clients will connect to apid; and this plugin will +offer configuration updates that occur in the management. The plugin +will also download the configuration data from GCS securely, and store +it in the local filesystem. They can be fetched by the Gateway via API's +as well. + +## Functional description + +see the file [swagger.yaml](swagger.yaml). +
diff --git a/api.go b/api.go new file mode 100644 index 0000000..68a855d --- /dev/null +++ b/api.go
@@ -0,0 +1,295 @@ +package apiGatewayDeploy + +import ( + "encoding/json" + "net/http" + "strconv" + "sync/atomic" + "time" + "github.com/gorilla/mux" +) + +// todo: the full set of states should probably be RECEIVED, READY, FAIL, SUCCESS +const ( + RESPONSE_STATUS_SUCCESS = "SUCCESS" + RESPONSE_STATUS_FAIL = "FAIL" +) + +const ( + TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT = iota + 1 +) + +const ( + API_ERR_BAD_BLOCK = iota + 1 + API_ERR_INTERNAL +) + +const ( + sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST" + iso8601 = "2006-01-02T15:04:05.999Z07:00" + sqliteTimeFormat = "2006-01-02 15:04:05.999-07:00" +) + +type deploymentsResult struct { + deployments []DataDeployment + err error + eTag string +} + +var ( + deploymentsChanged = make(chan interface{}, 5) + addSubscriber = make(chan chan deploymentsResult) + removeSubscriber = make(chan chan deploymentsResult) + eTag int64 +) + +type errorResponse struct { + ErrorCode int `json:"errorCode"` + Reason string `json:"reason"` +} + +type ApiDeploymentDetails struct { + Self string `json:self` + Name string `json:name` + Org string `json:org` + Env string `json:env` + Scope string `json:scope` + Type string `json:type` + BlobURL string `json:bloburl` + Revision string `json:revision` + BlobId string `json:blobId` + ResourceBlobId string `json:resourceBlobId` + Created string `json:created` + Updated string `json:updated` + +} + +type ApiDeploymentResponse struct { + Kind string `json:kind` + Self string `json:self` + ApiDeploymentResponse []ApiDeploymentDetails `json:contents` +} + + +const deploymentsEndpoint = "/configurations" +const BlobEndpoint = "/blob/{blobId}" + +func InitAPI() { + services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET") + services.API().HandleFunc(BlobEndpoint, apiReturnBlobData).Methods("GET") +} + +func writeError(w http.ResponseWriter, status int, code int, reason string) { + w.WriteHeader(status) + e := errorResponse{ + ErrorCode: code, + Reason: reason, + } + bytes, err := json.Marshal(e) + if err != nil { + log.Errorf("unable to marshal errorResponse: %v", err) + } else { + w.Write(bytes) + } + log.Debugf("sending %d error to client: %s", status, reason) +} + +func writeDatabaseError(w http.ResponseWriter) { + writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, "database error") +} + +func debounce(in chan interface{}, out chan []interface{}, window time.Duration) { + send := func(toSend []interface{}) { + if toSend != nil { + log.Debugf("debouncer sending: %v", toSend) + out <- toSend + } + } + var toSend []interface{} + for { + select { + case incoming, ok := <-in: + if ok { + log.Debugf("debouncing %v", incoming) + toSend = append(toSend, incoming) + } else { + send(toSend) + log.Debugf("closing debouncer") + close(out) + return + } + case <-time.After(window): + send(toSend) + toSend = nil + } + } +} + +func distributeEvents() { + subscribers := make(map[chan deploymentsResult]struct{}) + deliverDeployments := make(chan []interface{}, 1) + + go debounce(deploymentsChanged, deliverDeployments, debounceDuration) + + for { + select { + case _, ok := <-deliverDeployments: + if !ok { + return // todo: using this? + } + subs := subscribers + subscribers = make(map[chan deploymentsResult]struct{}) + go func() { + eTag := incrementETag() + deployments, err := getUnreadyDeployments() + log.Debugf("delivering deployments to %d subscribers", len(subs)) + for subscriber := range subs { + log.Debugf("delivering to: %v", subscriber) + subscriber <- deploymentsResult{deployments, err, eTag} + } + }() + case subscriber := <-addSubscriber: + log.Debugf("Add subscriber: %v", subscriber) + subscribers[subscriber] = struct{}{} + case subscriber := <-removeSubscriber: + log.Debugf("Remove subscriber: %v", subscriber) + delete(subscribers, subscriber) + } + } +} + +func apiReturnBlobData(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + blobId := vars["blobId"] + _, err := getLocalFSLocation(blobId) + if err != nil { + writeDatabaseError(w) + return + } + + +} + +func apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) { + + // If returning without a bundle (immediately or after timeout), status = 404 + // If returning If-None-Match value is equal to current deployment, status = 304 + // If returning a new value, status = 200 + + // If timeout > 0 AND there is no deployment (or new deployment) available (per If-None-Match), then + // block for up to the specified number of seconds until a new deployment becomes available. + b := r.URL.Query().Get("block") + var timeout int + if b != "" { + var err error + timeout, err = strconv.Atoi(b) + if err != nil { + writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds") + return + } + } + log.Debugf("api timeout: %d", timeout) + + // If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block' + // query param > 0, the server returns a 304 Not Modified response indicating that the client already has the + // most recent bundle list. + ifNoneMatch := r.Header.Get("If-None-Match") + log.Debugf("if-none-match: %s", ifNoneMatch) + + // send unmodified if matches prior eTag and no timeout + eTag := getETag() + if eTag == ifNoneMatch && timeout == 0 { + w.WriteHeader(http.StatusNotModified) + return + } + + // send results if different eTag + if eTag != ifNoneMatch { + sendReadyDeployments(w) + return + } + + // otherwise, subscribe to any new deployment changes + var newDeploymentsChannel chan deploymentsResult + if timeout > 0 && ifNoneMatch != "" { + newDeploymentsChannel = make(chan deploymentsResult, 1) + addSubscriber <- newDeploymentsChannel + } + + log.Debug("Blocking request... Waiting for new Deployments.") + + select { + case result := <-newDeploymentsChannel: + if result.err != nil { + writeDatabaseError(w) + } else { + sendDeployments(w, result.deployments, result.eTag) + } + + case <-time.After(time.Duration(timeout) * time.Second): + removeSubscriber <- newDeploymentsChannel + log.Debug("Blocking deployment request timed out.") + if ifNoneMatch != "" { + w.WriteHeader(http.StatusNotModified) + } else { + sendReadyDeployments(w) + } + } +} + +func sendReadyDeployments(w http.ResponseWriter) { + eTag := getETag() + deployments, err := getReadyDeployments() + if err != nil { + writeDatabaseError(w) + return + } + sendDeployments(w, deployments, eTag) +} + +func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) { + + apiDeps := ApiDeploymentResponse{} + apiDepDetails := []ApiDeploymentDetails{} + + apiDeps.Kind = "Collections" + apiDeps.Self = config.GetString("api_listen") +"/configurations" + + for _, d := range dataDeps { + apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{ + Org: d.OrgID, + Env: d.EnvID, + Revision: d.Revision, + BlobId: d.BlobID, + ResourceBlobId: d.BlobResourceID, + Created: d.Created, + Updated: d.Updated, + Type: d.Type, + BlobURL: d.BlobURL, + }) + } + apiDeps.ApiDeploymentResponse = apiDepDetails + + b, err := json.Marshal(apiDeps) + if err != nil { + log.Errorf("unable to marshal deployments: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + log.Debugf("sending deployments %s: %s", eTag, b) + w.Header().Set("ETag", eTag) + w.Write(b) +} + +// call whenever the list of deployments changes +func incrementETag() string { + e := atomic.AddInt64(&eTag, 1) + return strconv.FormatInt(e, 10) +} + +func getETag() string { + e := atomic.LoadInt64(&eTag) + return strconv.FormatInt(e, 10) +} +
diff --git a/bundle.go b/bundle.go new file mode 100644 index 0000000..2cca9af --- /dev/null +++ b/bundle.go
@@ -0,0 +1,260 @@ +package apiGatewayDeploy + +import ( + + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "path" + "time" +) + +const ( + BLOBSTORE_URI = "/v1/blobstore/signeduri" +) +var ( + markDeploymentFailedAfter time.Duration + bundleDownloadConnTimeout time.Duration + bundleRetryDelay = time.Second + downloadQueue = make(chan *DownloadRequest, downloadQueueSize) + workerQueue = make(chan chan *DownloadRequest, concurrentDownloads) +) + +// simple doubling back-off +func createBackoff(retryIn, maxBackOff time.Duration) func() { + return func() { + log.Debugf("backoff called. will retry in %s.", retryIn) + time.Sleep(retryIn) + retryIn = retryIn * time.Duration(2) + if retryIn > maxBackOff { + retryIn = maxBackOff + } + } +} + +func queueDownloadRequest(dep DataDeployment) { + + retryIn := bundleRetryDelay + maxBackOff := 5 * time.Minute + markFailedAt := time.Now().Add(markDeploymentFailedAfter) + req := &DownloadRequest{ + dep: dep, + bundleFile: getBundleFile(dep), + backoffFunc: createBackoff(retryIn, maxBackOff), + markFailedAt: markFailedAt, + } + downloadQueue <- req +} + +type DownloadRequest struct { + dep DataDeployment + bundleFile string + backoffFunc func() + markFailedAt time.Time +} + +func (r *DownloadRequest) downloadBundle() { + + dep := r.dep + log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BlobID) + + r.checkTimeout() + + tempFile, err := downloadFromURI(dep.BlobID) + + if err == nil { + err = os.Rename(tempFile, r.bundleFile) + if err != nil { + log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err) + } + } + + if tempFile != "" { + go safeDelete(tempFile) + } + + if err == nil { + err = updatelocal_fs_location(dep.BlobID, r.bundleFile) + } + + if err != nil { + // add myself back into the queue after back off + go func() { + r.backoffFunc() + downloadQueue <- r + }() + return + } + + log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BlobID) + + // send deployments to client + deploymentsChanged <- dep.ID +} + +func (r *DownloadRequest) checkTimeout() { + + if !r.markFailedAt.IsZero() { + if time.Now().After(r.markFailedAt) { + r.markFailedAt = time.Time{} + log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", + r.dep.ID, r.dep.BlobID) + } + } + +} + +func getBundleFile(dep DataDeployment) string { + + return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(dep.ID))) + +} + +func getSignedURL(blobId string) (string, error) { + + blobUri, err := url.Parse(config.GetString(configBlobServerBaseURI)) + if err != nil { + log.Panicf("bad url value for config %s: %s", blobUri, err) + } + + blobUri.Path += BLOBSTORE_URI + parameters := url.Values{} + parameters.Add("action", "GET") + parameters.Add("key", blobId) + blobUri.RawQuery = parameters.Encode() + + uri := blobUri.String() + + surl, err := getURIReader(uri) + if err != nil { + log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err) + return "", err + } + + signedURL, err := ioutil.ReadAll(surl) + if err != nil { + log.Errorf("Invalid response from BlobServer for {%s} error: {%v}", uri, err) + return "", err + } + return string(signedURL), nil +} + + +// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally +// after downloading the resource from GCS (via the signed URL) +func downloadFromURI(blobId string) (tempFileName string, err error) { + + var tempFile *os.File + log.Debugf("Downloading bundle: %s", blobId) + + uri, err := getSignedURL(blobId) + if err != nil { + log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err) + return + } + + tempFile, err = ioutil.TempFile(bundlePath, "download") + if err != nil { + log.Errorf("Unable to create temp file: %v", err) + return + } + defer tempFile.Close() + tempFileName = tempFile.Name() + + var confReader io.ReadCloser + confReader, err = getURIReader(uri) + if err != nil { + log.Errorf("Unable to retrieve bundle %s: %v", uri, err) + return + } + defer confReader.Close() + + _, err = io.Copy(tempFile, confReader) + if err != nil { + log.Errorf("Unable to write bundle %s: %v", tempFileName, err) + return + } + + log.Debugf("Bundle %s downloaded to: %s", uri, tempFileName) + return +} + +// retrieveBundle retrieves bundle data from a URI +func getURIReader(uriString string) (io.ReadCloser, error) { + + client := http.Client{ + Timeout: bundleDownloadConnTimeout, + } + res, err := client.Get(uriString) + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + return nil, fmt.Errorf("GET uri %s failed with status %d", uriString, res.StatusCode) + } + return res.Body, nil +} + +func initializeBundleDownloading() { + + // create workers + for i := 0; i < concurrentDownloads; i++ { + worker := BundleDownloader{ + id: i + 1, + workChan: make(chan *DownloadRequest), + quitChan: make(chan bool), + } + worker.Start() + } + + // run dispatcher + go func() { + for { + select { + case req := <-downloadQueue: + log.Debugf("dispatching downloader for: %s", req.bundleFile) + go func() { + worker := <-workerQueue + log.Debugf("got a worker for: %s", req.bundleFile) + worker <- req + }() + } + } + }() +} + +type BundleDownloader struct { + id int + workChan chan *DownloadRequest + quitChan chan bool +} + +func (w *BundleDownloader) Start() { + go func() { + log.Debugf("started bundle downloader %d", w.id) + for { + // wait for work + workerQueue <- w.workChan + + select { + case req := <-w.workChan: + log.Debugf("starting download %s", req.bundleFile) + req.downloadBundle() + + case <-w.quitChan: + log.Debugf("bundle downloader %d stopped", w.id) + return + } + } + }() +} + +func (w *BundleDownloader) Stop() { + go func() { + w.quitChan <- true + }() +}
diff --git a/data.go b/data.go new file mode 100644 index 0000000..923ad77 --- /dev/null +++ b/data.go
@@ -0,0 +1,176 @@ +package apiGatewayDeploy + +import ( + "database/sql" + "sync" + + "github.com/30x/apid-core" + +) + +var ( + unsafeDB apid.DB + dbMux sync.RWMutex +) + +type DataDeployment struct { + ID string + OrgID string + EnvID string + Type string + Name string + Revision string + BlobID string + BlobResourceID string + Updated string + UpdatedBy string + Created string + CreatedBy string + BlobFSLocation string + BlobURL string +} + +type SQLExec interface { + Exec(query string, args ...interface{}) (sql.Result, error) +} + +func InitDB(db apid.DB) error { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS edgex_blob_available ( + blob_id character varying NOT NULL, + local_fs_location character varying NOT NULL, + access_url character varying + ); + `) + if err != nil { + return err + } + + log.Debug("Database tables created.") + return nil +} + +func getDB() apid.DB { + dbMux.RLock() + db := unsafeDB + dbMux.RUnlock() + return db +} + +// caller is responsible for calling dbMux.Lock() and dbMux.Unlock() +func SetDB(db apid.DB) { + if unsafeDB == nil { // init API when DB is initialized + go InitAPI() + } + unsafeDB = db +} + +// getUnreadyDeployments() returns array of resources that are not yet to be processed +func getUnreadyDeployments() (deployments []DataDeployment, err error) { + + err = nil + db := getDB() + + rows, err := db.Query(` + SELECT id, org_id, env_id, name, revision, project_runtime_blob_metadata.blob_id, resource_blob_id + FROM project_runtime_blob_metadata + LEFT JOIN edgex_blob_available + ON project_runtime_blob_metadata.blob_id = edgex_blob_available.blob_id + WHERE edgex_blob_available.blob_id IS NULL; + `) + + if err != nil { + log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) + return + } + defer rows.Close() + + for rows.Next() { + dep := DataDeployment{} + rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID, + &dep.BlobResourceID) + deployments = append(deployments, dep) + log.Debugf("New configurations to be processed Id {%s}, blobId {%s}", dep.ID, dep.BlobID) + } + if len(deployments) == 0 { + log.Debug("No new resources found to be processed") + err = sql.ErrNoRows + } + return + +} + +// getDeployments() +func getReadyDeployments() (deployments []DataDeployment, err error) { + + err = nil + db := getDB() + + rows, err := db.Query(` + SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id, + a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by, + b.local_fs_location, b.access_url + FROM project_runtime_blob_metadata as a + INNER JOIN edgex_blob_available as b + ON a.blob_id = b.blob_id + `) + + if err != nil { + log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) + return + } + defer rows.Close() + + for rows.Next() { + dep := DataDeployment{} + rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Type, &dep.Revision, &dep.BlobID, + &dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated, + &dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL) + deployments = append(deployments, dep) + log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID) + } + if len(deployments) == 0 { + log.Debug("No resources ready to be deployed") + err = sql.ErrNoRows + } + return + +} + +func updatelocal_fs_location(depID, local_fs_location string) error { + + access_url := config.GetString("api_listen") + "/blob/" + depID + stmt, err := getDB().Prepare(` + INSERT INTO edgex_blob_available (blob_id, local_fs_location, access_url) + VALUES (?, ?, ?)`) + if err != nil { + log.Errorf("PREPARE updatelocal_fs_location failed: %v", err) + return err + } + defer stmt.Close() + + _, err = stmt.Exec(depID, local_fs_location, access_url) + if err != nil { + log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, local_fs_location, err) + return err + } + + log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, local_fs_location) + return nil + +} + +func getLocalFSLocation (blobId string) (locfs string , err error) { + + db := getDB() + + rows, err := db.Query("SELECT local_fs_location FROM edgex_blob_available WHERE blob_id = " + blobId) + if err != nil { + log.Errorf("SELECT local_fs_location failed %v", err) + return "", err + } + + defer rows.Close() + rows.Scan(&locfs) + return +}
diff --git a/init.go b/init.go new file mode 100644 index 0000000..b00325a --- /dev/null +++ b/init.go
@@ -0,0 +1,128 @@ +package apiGatewayDeploy + +import ( + "fmt" + "net/url" + "os" + "path" + "time" + + "github.com/30x/apid-core" +) + +const ( + configHTTProtocol = "apidHTTProtocol" + configBundleDirKey = "gatewaydeploy_bundle_dir" + configDebounceDuration = "gatewaydeploy_debounce_duration" + configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay" + configMarkDeployFailedAfter = "gatewaydeploy_deployment_timeout" + configDownloadConnTimeout = "gatewaydeploy_download_connection_timeout" + configApiServerBaseURI = "apigeesync_proxy_server_base" + configApidInstanceID = "apigeesync_apid_instance_id" + configApidClusterID = "apigeesync_cluster_id" + configConcurrentDownloads = "apigeesync_concurrent_downloads" + configDownloadQueueSize = "apigeesync_download_queue_size" + configBlobServerBaseURI = "apigeesync_blob_server_base" +) + +var ( + services apid.Services + log apid.LogService + data apid.DataService + config apid.ConfigService + bundlePath string + debounceDuration time.Duration + bundleCleanupDelay time.Duration + apiServerBaseURI *url.URL + blobServerURL string + apidInstanceID string + apidClusterID string + downloadQueueSize int + concurrentDownloads int +) + +func init() { + apid.RegisterPlugin(initPlugin) +} + +func initPlugin(s apid.Services) (apid.PluginData, error) { + services = s + log = services.Log().ForModule("apiGatewayDeploy") + log.Debug("start init") + + config = services.Config() + + if !config.IsSet(configApiServerBaseURI) { + return pluginData, fmt.Errorf("Missing required config value: %s", configApiServerBaseURI) + } + + if !config.IsSet(configBlobServerBaseURI) { + return pluginData, fmt.Errorf("Missing required config value: %s", configBlobServerBaseURI) + } + + var err error + apiServerBaseURI, err = url.Parse(config.GetString(configApiServerBaseURI)) + if err != nil { + return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err) + } + + if !config.IsSet(configApidInstanceID) { + return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID) + } + apidInstanceID = config.GetString(configApidInstanceID) + + if !config.IsSet(configApidClusterID) { + return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID) + } + apidClusterID = config.GetString(configApidClusterID) + + config.SetDefault(configBundleDirKey, "bundles") + config.SetDefault(configDebounceDuration, time.Second) + config.SetDefault(configBundleCleanupDelay, time.Minute) + config.SetDefault(configMarkDeployFailedAfter, 5*time.Minute) + config.SetDefault(configDownloadConnTimeout, 5*time.Minute) + config.SetDefault(configConcurrentDownloads, 15) + config.SetDefault(configDownloadQueueSize, 2000) + + debounceDuration = config.GetDuration(configDebounceDuration) + if debounceDuration < time.Millisecond { + return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration) + } + + bundleCleanupDelay = config.GetDuration(configBundleCleanupDelay) + if bundleCleanupDelay < time.Millisecond { + return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay) + } + + markDeploymentFailedAfter = config.GetDuration(configMarkDeployFailedAfter) + if markDeploymentFailedAfter < time.Millisecond { + return pluginData, fmt.Errorf("%s must be a positive duration", configMarkDeployFailedAfter) + } + + bundleDownloadConnTimeout = config.GetDuration(configDownloadConnTimeout) + if bundleDownloadConnTimeout < time.Millisecond { + return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout) + } + + data = services.Data() + blobServerURL = config.GetString(configBlobServerBaseURI) + concurrentDownloads = config.GetInt(configConcurrentDownloads) + downloadQueueSize = config.GetInt(configDownloadQueueSize) + relativeBundlePath := config.GetString(configBundleDirKey) + storagePath := config.GetString("local_storage_path") + bundlePath = path.Join(storagePath, relativeBundlePath) + if err := os.MkdirAll(bundlePath, 0700); err != nil { + return pluginData, fmt.Errorf("Failed bundle directory creation: %v", err) + } + log.Infof("Bundle directory path is %s", bundlePath) + + initializeBundleDownloading() + + go distributeEvents() + + initListener(services) + + log.Debug("end init") + + return pluginData, nil +}
diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..0e8a5ea --- /dev/null +++ b/listener.go
@@ -0,0 +1,152 @@ +package apiGatewayDeploy + +import ( + "os" + "time" + + "github.com/30x/apid-core" + "github.com/apigee-labs/transicator/common" + "database/sql" +) + +const ( + APIGEE_SYNC_EVENT = "ApigeeSync" + CONFIG_METADATA_TABLE = "project.runtime_blob_metadata" +) + +func initListener(services apid.Services) { + services.Events().Listen(APIGEE_SYNC_EVENT, &apigeeSyncHandler{}) +} + +type bundleConfigJson struct { + Name string `json:"name"` + URI string `json:"uri"` + ChecksumType string `json:"checksumType"` + Checksum string `json:"checksum"` +} + +type apigeeSyncHandler struct { +} + +func (h *apigeeSyncHandler) String() string { + return "gatewayDeploy" +} + +func (h *apigeeSyncHandler) Handle(e apid.Event) { + + if changeSet, ok := e.(*common.ChangeList); ok { + processChangeList(changeSet) + } else if snapData, ok := e.(*common.Snapshot); ok { + processSnapshot(snapData) + } else { + log.Debugf("Received invalid event. Ignoring. %v", e) + } +} + +func processSnapshot(snapshot *common.Snapshot) { + + log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo) + + db, err := data.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Unable to access database: %v", err) + } + + // Update the DB pointer + dbMux.Lock() + SetDB(db) + dbMux.Unlock() + + InitDB(db) + startupOnExistingDatabase() + log.Debug("Snapshot processed") +} + +func startupOnExistingDatabase() { + // start bundle downloads that didn't finish + go func() { + deployments, err := getUnreadyDeployments() + + if err != nil && err != sql.ErrNoRows { + log.Panicf("unable to query database for unready deployments: %v", err) + } + log.Debugf("Queuing %d deployments for bundle download", len(deployments)) + for _, dep := range deployments { + queueDownloadRequest(dep) + } + }() +} + +func processChangeList(changes *common.ChangeList) { + + log.Debugf("Processing changes") + // changes have been applied to DB + var insertedDeployments, deletedDeployments []DataDeployment + for _, change := range changes.Changes { + switch change.Table { + case CONFIG_METADATA_TABLE: + switch change.Operation { + case common.Insert: + dep := dataDeploymentFromRow(change.NewRow) + insertedDeployments = append(insertedDeployments, dep) + case common.Delete: + var id string + change.OldRow.Get("id", &id) + // only need these two fields to delete and determine bundle file + dep := DataDeployment{ + ID: id, + } + deletedDeployments = append(deletedDeployments, dep) + default: + log.Errorf("unexpected operation: %s", change.Operation) + } + } + } + + for _, d := range deletedDeployments { + deploymentsChanged <- d.ID + } + + for _, dep := range insertedDeployments { + queueDownloadRequest(dep) + } + + // clean up old bundles + if len(deletedDeployments) > 0 { + log.Debugf("will delete %d old bundles", len(deletedDeployments)) + go func() { + // give clients a minute to avoid conflicts + time.Sleep(bundleCleanupDelay) + for _, dep := range deletedDeployments { + bundleFile := getBundleFile(dep) + log.Debugf("removing old bundle: %v", bundleFile) + // TODO Remove from the Database table edgex_blob_available + safeDelete(bundleFile) + } + }() + } +} + +func dataDeploymentFromRow(row common.Row) (d DataDeployment) { + + row.Get("id", &d.ID) + row.Get("org_id", &d.OrgID) + row.Get("env_id", &d.EnvID) + row.Get("type", &d.Type) + row.Get("name", &d.Name) + row.Get("revision", &d.Revision) + row.Get("blob_id", &d.BlobID) + row.Get("resource_blob_id", &d.BlobResourceID) + row.Get("updated_at", &d.Updated) + row.Get("updated_by", &d.UpdatedBy) + row.Get("created_at", &d.Created) + row.Get("created_by", &d.CreatedBy) + + return +} + +func safeDelete(file string) { + if e := os.Remove(file); e != nil && !os.IsNotExist(e) { + log.Warnf("unable to delete file %s: %v", file, e) + } +}
diff --git a/pluginData.go b/pluginData.go new file mode 100644 index 0000000..bb85ec3 --- /dev/null +++ b/pluginData.go
@@ -0,0 +1,11 @@ +package apiGatewayDeploy + +import "github.com/30x/apid-core" + +var pluginData = apid.PluginData{ + Name: "apidGatewayConfDeploy", + Version: "0.0.1", + ExtraData: map[string]interface{}{ + "schemaVersion": "0.0.1", + }, +}