convert postgres time format to ISO8601 format at ApigeeSync, so Sqlite will store ISO8601 time format
diff --git a/data.go b/data.go
index 48f1ff7..bf2d475 100644
--- a/data.go
+++ b/data.go
@@ -10,6 +10,11 @@
"github.com/30x/apid-core"
)
+const (
+ sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST"
+ iso8601 = "2006-01-02T15:04:05.999Z07:00"
+)
+
var (
unsafeDB apid.DB
dbMux sync.RWMutex
diff --git a/listener.go b/listener.go
index 6c4b1ef..8ad3642 100644
--- a/listener.go
+++ b/listener.go
@@ -3,6 +3,7 @@
import (
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
+ "time"
)
const (
@@ -160,6 +161,23 @@
row.Get("updated_by", &dac.UpdatedBy)
row.Get("description", &dac.Description)
+ // convert timestamp to ISO8601
+ if dac.Created != "" {
+ created, err := time.Parse(sqlTimeFormat, dac.Created)
+ if err != nil {
+ log.Panic(err)
+ }
+ dac.Created = created.Format(iso8601)
+ }
+
+ if dac.Updated != "" {
+ updated, err := time.Parse(sqlTimeFormat, dac.Updated)
+ if err != nil {
+ log.Panic(err)
+ }
+ dac.Updated = updated.Format(iso8601)
+ }
+
return dac
}
@@ -177,5 +195,21 @@
row.Get("updated", &ds.Updated)
row.Get("updated_by", &ds.UpdatedBy)
+ // convert timestamp to ISO8601
+ if ds.Created != "" {
+ created, err := time.Parse(sqlTimeFormat, ds.Created)
+ if err != nil {
+ log.Panic(err)
+ }
+ ds.Created = created.Format(iso8601)
+ }
+
+ if ds.Updated != "" {
+ updated, err := time.Parse(sqlTimeFormat, ds.Updated)
+ if err != nil {
+ log.Panic(err)
+ }
+ ds.Updated = updated.Format(iso8601)
+ }
return ds
}
diff --git a/listener_test.go b/listener_test.go
index b5fea9b..2fca58e 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -11,6 +11,110 @@
handler := handler{}
var saveLastSnapshot string
+ validSnapshot := common.Snapshot{
+ SnapshotInfo: "test_snapshot_valid",
+ Tables: []common.Table{
+ {
+ Name: LISTENER_TABLE_APID_CLUSTER,
+ Rows: []common.Row{
+ {
+ "id": &common.ColumnVal{Value: "i"},
+ "name": &common.ColumnVal{Value: "n"},
+ "umbrella_org_app_name": &common.ColumnVal{Value: "o"},
+ "created": &common.ColumnVal{Value: ""},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: ""},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "description": &common.ColumnVal{Value: "d"},
+ },
+ },
+ },
+ {
+ Name: LISTENER_TABLE_DATA_SCOPE,
+ Rows: []common.Row{
+ {
+ "id": &common.ColumnVal{Value: "i"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s1"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e1"},
+ "created": &common.ColumnVal{Value: ""},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: ""},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ },
+ },
+ },
+ {
+ Name: LISTENER_TABLE_DATA_SCOPE,
+ Rows: []common.Row{
+ {
+ "id": &common.ColumnVal{Value: "j"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s1"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e2"},
+ "created": &common.ColumnVal{Value: ""},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: ""},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ },
+ },
+ },
+ {
+ Name: LISTENER_TABLE_DATA_SCOPE,
+ Rows: []common.Row{
+ {
+ "id": &common.ColumnVal{Value: "k"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s2"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e3"},
+ "created": &common.ColumnVal{Value: ""},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: ""},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ },
+ },
+ },
+ },
+ }
+
+ insertChangeList := common.ChangeList{
+ LastSequence: "test",
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ NewRow: common.Row{
+ "id": &common.ColumnVal{Value: "i"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s1"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: ""},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: ""},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ },
+ },
+ {
+ Operation: common.Insert,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ NewRow: common.Row{
+ "id": &common.ColumnVal{Value: "j"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s2"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: ""},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: ""},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ },
+ },
+ },
+ }
Context("ApigeeSync snapshot event", func() {
@@ -68,74 +172,7 @@
It("should process a valid Snapshot", func() {
- event := common.Snapshot{
- SnapshotInfo: "test_snapshot_valid",
- Tables: []common.Table{
- {
- Name: LISTENER_TABLE_APID_CLUSTER,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "i"},
- "name": &common.ColumnVal{Value: "n"},
- "umbrella_org_app_name": &common.ColumnVal{Value: "o"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- "description": &common.ColumnVal{Value: "d"},
- },
- },
- },
- {
- Name: LISTENER_TABLE_DATA_SCOPE,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "i"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s1"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e1"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- },
- {
- Name: LISTENER_TABLE_DATA_SCOPE,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "j"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s1"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e2"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- },
- {
- Name: LISTENER_TABLE_DATA_SCOPE,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "k"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s2"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e3"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- },
- },
- }
+ event := validSnapshot
handler.Handle(&event)
@@ -170,9 +207,9 @@
Expect(dc.Name).To(Equal("n"))
Expect(dc.Description).To(Equal("d"))
Expect(dc.OrgAppName).To(Equal("o"))
- Expect(dc.Created).To(Equal("c"))
+ Expect(dc.Created).To(Equal(""))
Expect(dc.CreatedBy).To(Equal("c"))
- Expect(dc.Updated).To(Equal("u"))
+ Expect(dc.Updated).To(Equal(""))
Expect(dc.UpdatedBy).To(Equal("u"))
// Data Scope
@@ -201,9 +238,9 @@
Expect(ds.Org).To(Equal("o"))
Expect(ds.Env).To(Equal("e1"))
Expect(ds.Scope).To(Equal("s1"))
- Expect(ds.Created).To(Equal("c"))
+ Expect(ds.Created).To(Equal(""))
Expect(ds.CreatedBy).To(Equal("c"))
- Expect(ds.Updated).To(Equal("u"))
+ Expect(ds.Updated).To(Equal(""))
Expect(ds.UpdatedBy).To(Equal("u"))
ds = dds[1]
@@ -221,6 +258,56 @@
//restore the last snapshot
apidInfo.LastSnapshot = saveLastSnapshot
}, 3)
+
+ It("should convert snapshot timestamp o ISO8601", func() {
+
+ sqlTime := []string{"2017-04-05 04:47:36.462 -0700 MST", "2017-04-05 04:47:36.462 +0000 UTC"}
+ isoTime := []string{"2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z"}
+ event := validSnapshot
+
+ for _, t := range event.Tables {
+ for _, r := range t.Rows {
+ r["created"] = &common.ColumnVal{Value: sqlTime[0]}
+ r["updated"] = &common.ColumnVal{Value: sqlTime[1]}
+ }
+ }
+
+ handler.Handle(&event)
+
+ info, err := getApidInstanceInfo()
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
+
+ db := getDB()
+
+ // apid Cluster
+ rows, err := db.Query(` SELECT created, updated FROM APID_CLUSTER`)
+ Expect(err).NotTo(HaveOccurred())
+ defer rows.Close()
+
+ created := ""
+ updated := ""
+ for rows.Next() {
+ rows.Scan(&created, &updated)
+ Expect(created).To(Equal(isoTime[0]))
+ Expect(updated).To(Equal(isoTime[1]))
+ }
+
+ // Data Scope
+ rows, err = db.Query(`SELECT created, updated FROM DATA_SCOPE`)
+ Expect(err).NotTo(HaveOccurred())
+ defer rows.Close()
+
+ for rows.Next() {
+ rows.Scan(&created, &updated)
+ Expect(created).To(Equal(isoTime[0]))
+ Expect(updated).To(Equal(isoTime[1]))
+ }
+
+ //restore the last snapshot
+ apidInfo.LastSnapshot = saveLastSnapshot
+ }, 3)
})
Context("ApigeeSync change event", func() {
@@ -270,41 +357,7 @@
//save the last snapshot, so we can restore it at the end of this context
saveLastSnapshot = apidInfo.LastSnapshot
- event := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_DATA_SCOPE,
- NewRow: common.Row{
- "id": &common.ColumnVal{Value: "i"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s1"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_DATA_SCOPE,
- NewRow: common.Row{
- "id": &common.ColumnVal{Value: "j"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s2"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- },
- }
+ event := insertChangeList
handler.Handle(&event)
@@ -333,9 +386,9 @@
Expect(ds.Org).To(Equal("o"))
Expect(ds.Env).To(Equal("e"))
Expect(ds.Scope).To(Equal("s1"))
- Expect(ds.Created).To(Equal("c"))
+ Expect(ds.Created).To(Equal(""))
Expect(ds.CreatedBy).To(Equal("c"))
- Expect(ds.Updated).To(Equal("u"))
+ Expect(ds.Updated).To(Equal(""))
Expect(ds.UpdatedBy).To(Equal("u"))
ds = dds[1]
@@ -361,9 +414,9 @@
"scope": &common.ColumnVal{Value: "s"},
"org": &common.ColumnVal{Value: "o"},
"env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
+ "created": &common.ColumnVal{Value: ""},
"created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
+ "updated": &common.ColumnVal{Value: ""},
"updated_by": &common.ColumnVal{Value: "u"},
},
},
@@ -405,10 +458,42 @@
}
Expect(func() { handler.Handle(&event) }).To(Panic())
+ }, 3)
+
+ It("should convert change timestamp to ISO8601", func() {
+
+ event := insertChangeList
+
+ sqlTime := []string{"2017-04-05 04:47:36.462 -0700 MST", "2017-04-05 04:47:36.462 +0000 UTC"}
+ isoTime := []string{"2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z"}
+
+
+ for _, c := range event.Changes {
+ c.NewRow["created"] = &common.ColumnVal{Value: sqlTime[0]}
+ c.NewRow["updated"] = &common.ColumnVal{Value: sqlTime[1]}
+
+ }
+
+ handler.Handle(&event)
+
+
+ rows, err := getDB().Query(`SELECT id, created, updated FROM DATA_SCOPE`)
+ Expect(err).NotTo(HaveOccurred())
+ defer rows.Close()
+ id := ""
+ created := ""
+ updated := ""
+ for rows.Next() {
+ rows.Scan(&id, &created, &updated)
+ if id=="i" || id=="j"{
+ Expect(created).To(Equal(isoTime[0]))
+ Expect(updated).To(Equal(isoTime[1]))
+ }
+ }
+
//restore the last snapshot
apidInfo.LastSnapshot = saveLastSnapshot
}, 3)
-
})
})
diff --git a/snapshot.go b/snapshot.go
index 7a3c102..c455e8e 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -252,7 +252,7 @@
req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
return nil
},
- Timeout: httpTimeout,
+ Timeout: httpTimeout,
}
//pollWithBackoff only accepts function that accept a single quit channel