add support of "update" in changelist
diff --git a/api_test.go b/api_test.go
index bbdd9c4..ccd3b0c 100644
--- a/api_test.go
+++ b/api_test.go
@@ -321,7 +321,7 @@
Name: dep.Name,
Type: dep.Type,
Revision: dep.Revision,
- BeanBlobUrl: getBlobUrl(testBlobId),
+ BeanBlobUrl: getBlobUrl(dep.BlobID),
Org: dep.OrgID,
Env: dep.EnvID,
ResourceBlobUrl: "",
diff --git a/bundle.go b/bundle.go
index a53f31c..43f243e 100644
--- a/bundle.go
+++ b/bundle.go
@@ -36,7 +36,8 @@
queueDownloadRequest(*DataDeployment)
enqueueRequest(*DownloadRequest)
makeDownloadRequest(string) *DownloadRequest
- deleteBundles([]DataDeployment)
+ deleteBundlesFromDeployments([]DataDeployment)
+ deleteBundleById(string)
Close()
}
@@ -117,9 +118,12 @@
close(bm.downloadQueue)
}
-// TODO add delete support
+func (bm *bundleManager) deleteBundlesFromDeployments(deletedDeployments []DataDeployment) {
+ for _, dep := range deletedDeployments {
+ go bm.deleteBundleById(dep.BlobID)
+ go bm.deleteBundleById(dep.BlobResourceID)
+ }
-func (bm *bundleManager) deleteBundles(deletedDeployments []DataDeployment) {
/*
log.Debugf("will delete %d old bundles", len(deletedDeployments))
go func() {
@@ -135,6 +139,11 @@
*/
}
+// TODO add delete support
+func (bm *bundleManager) deleteBundleById(blobId string) {
+
+}
+
type DownloadRequest struct {
bm *bundleManager
blobId string
diff --git a/listener.go b/listener.go
index 8961bae..0da9192 100644
--- a/listener.go
+++ b/listener.go
@@ -100,6 +100,7 @@
log.Debugf("Processing changes")
// changes have been applied to DB
var insertedDeployments, deletedDeployments []DataDeployment
+ var updatedNewBlobs, updatedOldBlobs []string
for _, change := range changes.Changes {
switch change.Table {
case CONFIG_METADATA_TABLE:
@@ -108,13 +109,21 @@
dep := dataDeploymentFromRow(change.NewRow)
insertedDeployments = append(insertedDeployments, dep)
case common.Delete:
- var id string
- change.OldRow.Get("id", &id)
- // only need these two fields to delete and determine bundle file
- dep := DataDeployment{
- ID: id,
- }
+ dep := dataDeploymentFromRow(change.OldRow)
deletedDeployments = append(deletedDeployments, dep)
+ case common.Update:
+ depNew := dataDeploymentFromRow(change.NewRow)
+ depOld := dataDeploymentFromRow(change.OldRow)
+
+ if depOld.BlobID != depNew.BlobID {
+ updatedNewBlobs = append(updatedNewBlobs, depNew.BlobID)
+ updatedOldBlobs = append(updatedOldBlobs, depOld.BlobID)
+ }
+
+ if depOld.BlobResourceID != depNew.BlobResourceID {
+ updatedNewBlobs = append(updatedNewBlobs, depNew.BlobResourceID)
+ updatedOldBlobs = append(updatedOldBlobs, depOld.BlobResourceID)
+ }
default:
log.Errorf("unexpected operation: %s", change.Operation)
}
@@ -127,15 +136,25 @@
}
*/
+ // insert
for i := range insertedDeployments {
go h.bundleMan.queueDownloadRequest(&insertedDeployments[i])
}
- // clean up old bundles
+ // update
+ for i := range updatedNewBlobs {
+ go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(updatedNewBlobs[i]))
+ }
+
+ for i := range updatedOldBlobs {
+ go h.bundleMan.deleteBundleById(updatedOldBlobs[i])
+ }
+
+ // delete
if len(deletedDeployments) > 0 {
log.Debugf("will delete %d old bundles", len(deletedDeployments))
//TODO delete bundles for deleted deployments
- h.bundleMan.deleteBundles(deletedDeployments)
+ h.bundleMan.deleteBundlesFromDeployments(deletedDeployments)
}
}
diff --git a/listener_test.go b/listener_test.go
index d82ce1c..7549efb 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -40,6 +40,7 @@
requestChan: make(chan *DownloadRequest),
depChan: make(chan *DataDeployment),
delChan: make(chan *DataDeployment),
+ delBlobChan: make(chan string),
}
testHandler = &apigeeSyncHandler{
dbMan: dummyDbMan,
@@ -55,7 +56,7 @@
})
Context("Snapshot", func() {
- It("Snapshot event shoud enqueue download requests for all unready blobs", func() {
+ It("Snapshot event should enqueue download requests for all unready blobs", func() {
// init unready blob ids
unreadyBlobIds := make([]string, 0)
blobMap := make(map[string]int)
@@ -84,7 +85,7 @@
}
})
- It("Snapshot events shoud set db version, and should only init API endpoint once", func() {
+ It("Snapshot events should set db version, and should only init API endpoint once", func() {
// emit snapshot
for i := 0; i < 2+rand.Intn(5); i++ {
@@ -104,7 +105,7 @@
Context("Change list", func() {
- It("Insert event shoud enqueue download requests for all inserted deployments", func() {
+ It("Insert event should enqueue download requests for all inserted deployments", func() {
// emit change event
changes := make([]common.Change, 0)
deployments := make(map[string]DataDeployment)
@@ -134,7 +135,7 @@
Expect(len(deployments)).Should(BeZero())
})
- It("Delete event shoud deliver to the bundle manager", func() {
+ It("Delete event should deliver to the bundle manager", func() {
// emit change event
changes := make([]common.Change, 0)
deployments := make(map[string]bool)
@@ -163,6 +164,116 @@
}
Expect(len(deployments)).Should(BeZero())
})
+
+ It("Update event should enqueue download requests and delete old blobs", func() {
+
+ changes := make([]common.Change, 0)
+ blobIdNew := make(map[string]int)
+ blobIdOld := make(map[string]int)
+ for i := 0; i < 1+rand.Intn(10); i++ {
+ depNew := makeTestDeployment()
+ depNew.BlobID = GenerateUUID()
+ depNew.BlobResourceID = GenerateUUID()
+
+ depOld := makeTestDeployment()
+ depOld.BlobID = GenerateUUID()
+ depOld.BlobResourceID = GenerateUUID()
+
+ change := common.Change{
+ Operation: common.Update,
+ Table: CONFIG_METADATA_TABLE,
+ NewRow: rowFromDeployment(depNew),
+ OldRow: rowFromDeployment(depOld),
+ }
+ changes = append(changes, change)
+
+ blobIdNew[depNew.BlobID]++
+ blobIdNew[depNew.BlobResourceID]++
+ blobIdOld[depOld.BlobID]++
+ blobIdOld[depOld.BlobResourceID]++
+ }
+
+ // emit change event
+ changeList := &common.ChangeList{
+ Changes: changes,
+ }
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+
+ // verify
+ for i := 0; i < len(blobIdNew); i++ {
+ req := <-dummyBundleMan.requestChan
+ blobIdNew[req.blobId]++
+ Expect(blobIdNew[req.blobId]).Should(Equal(2))
+ }
+ for i := 0; i < len(blobIdOld); i++ {
+ blobId := <-dummyBundleMan.delBlobChan
+ blobIdOld[blobId]++
+ Expect(blobIdOld[blobId]).Should(Equal(2))
+ }
+
+ })
+
+ It("Update event should only download/delete changed blobs", func() {
+ changes := make([]common.Change, 0)
+ blobIdChangedNew := make(map[string]int)
+ blobIdChangedOld := make(map[string]int)
+
+ for i := 0; i < 1+rand.Intn(10); i++ {
+ depNew := makeTestDeployment()
+ depNew.BlobID = GenerateUUID()
+ depNew.BlobResourceID = GenerateUUID()
+
+ depOld := makeTestDeployment()
+
+ if rand.Intn(2) == 0 {
+ // blob id changed
+ depOld.BlobID = GenerateUUID()
+ blobIdChangedNew[depNew.BlobID]++
+ blobIdChangedOld[depOld.BlobID]++
+ } else {
+ // blob id unchanged
+ depOld.BlobID = depNew.BlobID
+ }
+
+ if rand.Intn(2) == 0 {
+ // blob id changed
+ depOld.BlobResourceID = GenerateUUID()
+ blobIdChangedNew[depNew.BlobResourceID]++
+ blobIdChangedOld[depOld.BlobResourceID]++
+ } else {
+ // blob id unchanged
+ depOld.BlobResourceID = depNew.BlobResourceID
+ }
+
+ change := common.Change{
+ Operation: common.Update,
+ Table: CONFIG_METADATA_TABLE,
+ NewRow: rowFromDeployment(depNew),
+ OldRow: rowFromDeployment(depOld),
+ }
+ changes = append(changes, change)
+ }
+
+ // emit change event
+ changeList := &common.ChangeList{
+ Changes: changes,
+ }
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+
+ // verify
+ for i := 0; i < len(blobIdChangedNew); i++ {
+ req := <-dummyBundleMan.requestChan
+ blobIdChangedNew[req.blobId]++
+ Expect(blobIdChangedNew[req.blobId]).Should(Equal(2))
+ }
+ for i := 0; i < len(blobIdChangedOld); i++ {
+ blobId := <-dummyBundleMan.delBlobChan
+ blobIdChangedOld[blobId]++
+ Expect(blobIdChangedOld[blobId]).Should(Equal(2))
+ }
+ })
})
})
@@ -170,6 +281,7 @@
requestChan chan *DownloadRequest
depChan chan *DataDeployment
delChan chan *DataDeployment
+ delBlobChan chan string
}
func (bm *dummyBundleManager) initializeBundleDownloading() {
@@ -190,12 +302,16 @@
}
}
-func (bm *dummyBundleManager) deleteBundles(deployments []DataDeployment) {
+func (bm *dummyBundleManager) deleteBundlesFromDeployments(deployments []DataDeployment) {
for i := range deployments {
bm.delChan <- &deployments[i]
}
}
+func (bm *dummyBundleManager) deleteBundleById(blobId string) {
+ bm.delBlobChan <- blobId
+}
+
func (bm *dummyBundleManager) Close() {
}