Incident: XAPID-999
Ensure the api's follow the latest API spec standard.
The DB query updates for performance.
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 6d364e1..ae319c7 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -20,4 +20,4 @@
All submissions, including submissions by project members, require review. We
use GitHub pull requests for this purpose. Consult
[GitHub Help](https://help.github.com/articles/about-pull-requests/) for more
-information on using pull requests.
\ No newline at end of file
+information on using pull requests.
diff --git a/LICENSE b/LICENSE
index 7a4a3ea..d645695 100644
--- a/LICENSE
+++ b/LICENSE
@@ -199,4 +199,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
- limitations under the License.
\ No newline at end of file
+ limitations under the License.
diff --git a/api.go b/api.go
index 68a855d..3b2f6f1 100644
--- a/api.go
+++ b/api.go
@@ -1,12 +1,15 @@
-package apiGatewayDeploy
+package apiGatewayConfDeploy
import (
+ "bytes"
"encoding/json"
+ "github.com/gorilla/mux"
+ "io"
+ "io/ioutil"
"net/http"
"strconv"
"sync/atomic"
"time"
- "github.com/gorilla/mux"
)
// todo: the full set of states should probably be RECEIVED, READY, FAIL, SUCCESS
@@ -49,28 +52,26 @@
}
type ApiDeploymentDetails struct {
- Self string `json:self`
- Name string `json:name`
- Org string `json:org`
- Env string `json:env`
- Scope string `json:scope`
- Type string `json:type`
- BlobURL string `json:bloburl`
- Revision string `json:revision`
- BlobId string `json:blobId`
- ResourceBlobId string `json:resourceBlobId`
- Created string `json:created`
- Updated string `json:updated`
-
+ Self string `json:"self"`
+ Name string `json:"name"`
+ Org string `json:"org"`
+ Env string `json:"env"`
+ Scope string `json:"scope"`
+ Type string `json:"type"`
+ BlobURL string `json:"bloburl"`
+ Revision string `json:"revision"`
+ BlobId string `json:"blobId"`
+ ResourceBlobId string `json:"resourceBlobId"`
+ Created string `json:"created"`
+ Updated string `json:"updated"`
}
type ApiDeploymentResponse struct {
- Kind string `json:kind`
- Self string `json:self`
- ApiDeploymentResponse []ApiDeploymentDetails `json:contents`
+ Kind string `json:"kind"`
+ Self string `json:"self"`
+ ApiDeploymentResponse []ApiDeploymentDetails `json:"contents"`
}
-
const deploymentsEndpoint = "/configurations"
const BlobEndpoint = "/blob/{blobId}"
@@ -94,8 +95,8 @@
log.Debugf("sending %d error to client: %s", status, reason)
}
-func writeDatabaseError(w http.ResponseWriter) {
- writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, "database error")
+func writeInternalError(w http.ResponseWriter, err string) {
+ writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err)
}
func debounce(in chan interface{}, out chan []interface{}, window time.Duration) {
@@ -159,14 +160,23 @@
}
func apiReturnBlobData(w http.ResponseWriter, r *http.Request) {
+
vars := mux.Vars(r)
blobId := vars["blobId"]
- _, err := getLocalFSLocation(blobId)
+ fs, err := getLocalFSLocation(blobId)
if err != nil {
- writeDatabaseError(w)
+ writeInternalError(w, "BlobId "+blobId+" has no mapping blob file")
return
}
-
+ byte, err := ioutil.ReadFile(fs)
+ if err != nil {
+ writeInternalError(w, err.Error())
+ return
+ }
+ _, err = io.Copy(w, bytes.NewReader(byte))
+ if err != nil {
+ writeInternalError(w, err.Error())
+ }
}
@@ -221,7 +231,7 @@
select {
case result := <-newDeploymentsChannel:
if result.err != nil {
- writeDatabaseError(w)
+ writeInternalError(w, "Database error")
} else {
sendDeployments(w, result.deployments, result.eTag)
}
@@ -241,31 +251,41 @@
eTag := getETag()
deployments, err := getReadyDeployments()
if err != nil {
- writeDatabaseError(w)
+ writeInternalError(w, "Database error")
return
}
sendDeployments(w, deployments, eTag)
}
+func get_http_host() string {
+ // apid-core has to set this according to the protocol apid is to be run: http/https
+ proto := config.GetString("protocol_type")
+ if proto == "" {
+ proto = "http"
+ }
+ proto = proto + "://" + config.GetString("api_listen")
+ return proto
+}
+
func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
apiDeps := ApiDeploymentResponse{}
apiDepDetails := []ApiDeploymentDetails{}
apiDeps.Kind = "Collections"
- apiDeps.Self = config.GetString("api_listen") +"/configurations"
+ apiDeps.Self = get_http_host() + "/configurations"
for _, d := range dataDeps {
apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{
- Org: d.OrgID,
- Env: d.EnvID,
- Revision: d.Revision,
- BlobId: d.BlobID,
- ResourceBlobId: d.BlobResourceID,
- Created: d.Created,
- Updated: d.Updated,
- Type: d.Type,
- BlobURL: d.BlobURL,
+ Org: d.OrgID,
+ Env: d.EnvID,
+ Revision: d.Revision,
+ BlobId: d.GWBlobID,
+ ResourceBlobId: d.BlobResourceID,
+ Created: d.Created,
+ Updated: d.Updated,
+ Type: d.Type,
+ BlobURL: d.BlobURL,
})
}
apiDeps.ApiDeploymentResponse = apiDepDetails
@@ -292,4 +312,3 @@
e := atomic.LoadInt64(&eTag)
return strconv.FormatInt(e, 10)
}
-
diff --git a/bundle.go b/bundle.go
index 2cca9af..0378f30 100644
--- a/bundle.go
+++ b/bundle.go
@@ -1,7 +1,6 @@
-package apiGatewayDeploy
+package apiGatewayConfDeploy
import (
-
"encoding/base64"
"fmt"
"io"
@@ -10,12 +9,15 @@
"net/url"
"os"
"path"
+ "strconv"
+ "sync/atomic"
"time"
)
const (
BLOBSTORE_URI = "/v1/blobstore/signeduri"
)
+
var (
markDeploymentFailedAfter time.Duration
bundleDownloadConnTimeout time.Duration
@@ -78,7 +80,12 @@
}
if err == nil {
- err = updatelocal_fs_location(dep.BlobID, r.bundleFile)
+ blobId := atomic.AddInt64(&gwBlobId, 1)
+ blobIds := strconv.FormatInt(blobId, 10)
+ err = updatelocal_fs_location(dep.ID, blobIds, r.bundleFile)
+ if err != nil {
+ dep.GWBlobID = blobIds
+ }
}
if err != nil {
@@ -143,7 +150,6 @@
return string(signedURL), nil
}
-
// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally
// after downloading the resource from GCS (via the signed URL)
func downloadFromURI(blobId string) (tempFileName string, err error) {
diff --git a/data.go b/data.go
index 923ad77..a867e7a 100644
--- a/data.go
+++ b/data.go
@@ -1,33 +1,34 @@
-package apiGatewayDeploy
+package apiGatewayConfDeploy
import (
"database/sql"
"sync"
"github.com/30x/apid-core"
-
)
var (
unsafeDB apid.DB
dbMux sync.RWMutex
+ gwBlobId int64
)
type DataDeployment struct {
- ID string
- OrgID string
- EnvID string
- Type string
- Name string
- Revision string
- BlobID string
- BlobResourceID string
- Updated string
- UpdatedBy string
- Created string
- CreatedBy string
- BlobFSLocation string
- BlobURL string
+ ID string
+ OrgID string
+ EnvID string
+ Type string
+ Name string
+ Revision string
+ BlobID string
+ GWBlobID string
+ BlobResourceID string
+ Updated string
+ UpdatedBy string
+ Created string
+ CreatedBy string
+ BlobFSLocation string
+ BlobURL string
}
type SQLExec interface {
@@ -37,7 +38,8 @@
func InitDB(db apid.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS edgex_blob_available (
- blob_id character varying NOT NULL,
+ gwblobid integer primary key,
+ runtime_meta_id character varying NOT NULL,
local_fs_location character varying NOT NULL,
access_url character varying
);
@@ -72,11 +74,11 @@
db := getDB()
rows, err := db.Query(`
- SELECT id, org_id, env_id, name, revision, project_runtime_blob_metadata.blob_id, resource_blob_id
+ SELECT project_runtime_blob_metadata.id, org_id, env_id, name, revision, blob_id, resource_blob_id
FROM project_runtime_blob_metadata
LEFT JOIN edgex_blob_available
- ON project_runtime_blob_metadata.blob_id = edgex_blob_available.blob_id
- WHERE edgex_blob_available.blob_id IS NULL;
+ ON project_runtime_blob_metadata.id = edgex_blob_available.runtime_meta_id
+ WHERE edgex_blob_available.runtime_meta_id IS NULL;
`)
if err != nil {
@@ -87,7 +89,7 @@
for rows.Next() {
dep := DataDeployment{}
- rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID,
+ rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID,
&dep.BlobResourceID)
deployments = append(deployments, dep)
log.Debugf("New configurations to be processed Id {%s}, blobId {%s}", dep.ID, dep.BlobID)
@@ -109,10 +111,10 @@
rows, err := db.Query(`
SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id,
a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
- b.local_fs_location, b.access_url
+ b.local_fs_location, b.access_url, b.gwblobid
FROM project_runtime_blob_metadata as a
INNER JOIN edgex_blob_available as b
- ON a.blob_id = b.blob_id
+ ON a.id = b.runtime_meta_id
`)
if err != nil {
@@ -125,7 +127,7 @@
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Type, &dep.Revision, &dep.BlobID,
&dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated,
- &dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL)
+ &dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL, &dep.GWBlobID)
deployments = append(deployments, dep)
log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID)
}
@@ -137,19 +139,19 @@
}
-func updatelocal_fs_location(depID, local_fs_location string) error {
+func updatelocal_fs_location(depID, bundleId, local_fs_location string) error {
- access_url := config.GetString("api_listen") + "/blob/" + depID
+ access_url := get_http_host() + "/blob/" + bundleId
stmt, err := getDB().Prepare(`
- INSERT INTO edgex_blob_available (blob_id, local_fs_location, access_url)
- VALUES (?, ?, ?)`)
+ INSERT INTO edgex_blob_available (runtime_meta_id, gwblobid, local_fs_location, access_url)
+ VALUES (?, ?, ?, ?)`)
if err != nil {
log.Errorf("PREPARE updatelocal_fs_location failed: %v", err)
return err
}
defer stmt.Close()
- _, err = stmt.Exec(depID, local_fs_location, access_url)
+ _, err = stmt.Exec(depID, bundleId, local_fs_location, access_url)
if err != nil {
log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, local_fs_location, err)
return err
@@ -160,17 +162,20 @@
}
-func getLocalFSLocation (blobId string) (locfs string , err error) {
+func getLocalFSLocation(blobId string) (locfs string, err error) {
db := getDB()
-
- rows, err := db.Query("SELECT local_fs_location FROM edgex_blob_available WHERE blob_id = " + blobId)
+ log.Debugf("Getting the blob file for blobId {%s}", blobId)
+ rows, err := db.Query("SELECT local_fs_location FROM edgex_blob_available WHERE gwblobid = \"" + blobId + "\"")
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
}
defer rows.Close()
- rows.Scan(&locfs)
+ for rows.Next() {
+ rows.Scan(&locfs)
+ log.Debugf("Got the blob file {%s} for blobId {%s}", locfs, blobId)
+ }
return
}
diff --git a/init.go b/init.go
index b00325a..745197f 100644
--- a/init.go
+++ b/init.go
@@ -1,4 +1,4 @@
-package apiGatewayDeploy
+package apiGatewayConfDeploy
import (
"fmt"
@@ -47,7 +47,7 @@
func initPlugin(s apid.Services) (apid.PluginData, error) {
services = s
- log = services.Log().ForModule("apiGatewayDeploy")
+ log = services.Log().ForModule("apiGatewayConfDeploy")
log.Debug("start init")
config = services.Config()
diff --git a/listener.go b/listener.go
index 0e8a5ea..23cc071 100644
--- a/listener.go
+++ b/listener.go
@@ -1,17 +1,17 @@
-package apiGatewayDeploy
+package apiGatewayConfDeploy
import (
"os"
"time"
+ "database/sql"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
- "database/sql"
)
const (
- APIGEE_SYNC_EVENT = "ApigeeSync"
- CONFIG_METADATA_TABLE = "project.runtime_blob_metadata"
+ APIGEE_SYNC_EVENT = "ApigeeSync"
+ CONFIG_METADATA_TABLE = "project.runtime_blob_metadata"
)
func initListener(services apid.Services) {
@@ -94,7 +94,7 @@
change.OldRow.Get("id", &id)
// only need these two fields to delete and determine bundle file
dep := DataDeployment{
- ID: id,
+ ID: id,
}
deletedDeployments = append(deletedDeployments, dep)
default:
diff --git a/pluginData.go b/pluginData.go
index bb85ec3..ce1eace 100644
--- a/pluginData.go
+++ b/pluginData.go
@@ -1,4 +1,4 @@
-package apiGatewayDeploy
+package apiGatewayConfDeploy
import "github.com/30x/apid-core"