add support of "update" in changelist
diff --git a/api_test.go b/api_test.go index bbdd9c4..ccd3b0c 100644 --- a/api_test.go +++ b/api_test.go
@@ -321,7 +321,7 @@ Name: dep.Name, Type: dep.Type, Revision: dep.Revision, - BeanBlobUrl: getBlobUrl(testBlobId), + BeanBlobUrl: getBlobUrl(dep.BlobID), Org: dep.OrgID, Env: dep.EnvID, ResourceBlobUrl: "",
diff --git a/bundle.go b/bundle.go index a53f31c..43f243e 100644 --- a/bundle.go +++ b/bundle.go
@@ -36,7 +36,8 @@ queueDownloadRequest(*DataDeployment) enqueueRequest(*DownloadRequest) makeDownloadRequest(string) *DownloadRequest - deleteBundles([]DataDeployment) + deleteBundlesFromDeployments([]DataDeployment) + deleteBundleById(string) Close() } @@ -117,9 +118,12 @@ close(bm.downloadQueue) } -// TODO add delete support +func (bm *bundleManager) deleteBundlesFromDeployments(deletedDeployments []DataDeployment) { + for _, dep := range deletedDeployments { + go bm.deleteBundleById(dep.BlobID) + go bm.deleteBundleById(dep.BlobResourceID) + } -func (bm *bundleManager) deleteBundles(deletedDeployments []DataDeployment) { /* log.Debugf("will delete %d old bundles", len(deletedDeployments)) go func() { @@ -135,6 +139,11 @@ */ } +// TODO add delete support +func (bm *bundleManager) deleteBundleById(blobId string) { + +} + type DownloadRequest struct { bm *bundleManager blobId string
diff --git a/listener.go b/listener.go index 8961bae..0da9192 100644 --- a/listener.go +++ b/listener.go
@@ -100,6 +100,7 @@ log.Debugf("Processing changes") // changes have been applied to DB var insertedDeployments, deletedDeployments []DataDeployment + var updatedNewBlobs, updatedOldBlobs []string for _, change := range changes.Changes { switch change.Table { case CONFIG_METADATA_TABLE: @@ -108,13 +109,21 @@ dep := dataDeploymentFromRow(change.NewRow) insertedDeployments = append(insertedDeployments, dep) case common.Delete: - var id string - change.OldRow.Get("id", &id) - // only need these two fields to delete and determine bundle file - dep := DataDeployment{ - ID: id, - } + dep := dataDeploymentFromRow(change.OldRow) deletedDeployments = append(deletedDeployments, dep) + case common.Update: + depNew := dataDeploymentFromRow(change.NewRow) + depOld := dataDeploymentFromRow(change.OldRow) + + if depOld.BlobID != depNew.BlobID { + updatedNewBlobs = append(updatedNewBlobs, depNew.BlobID) + updatedOldBlobs = append(updatedOldBlobs, depOld.BlobID) + } + + if depOld.BlobResourceID != depNew.BlobResourceID { + updatedNewBlobs = append(updatedNewBlobs, depNew.BlobResourceID) + updatedOldBlobs = append(updatedOldBlobs, depOld.BlobResourceID) + } default: log.Errorf("unexpected operation: %s", change.Operation) } @@ -127,15 +136,25 @@ } */ + // insert for i := range insertedDeployments { go h.bundleMan.queueDownloadRequest(&insertedDeployments[i]) } - // clean up old bundles + // update + for i := range updatedNewBlobs { + go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(updatedNewBlobs[i])) + } + + for i := range updatedOldBlobs { + go h.bundleMan.deleteBundleById(updatedOldBlobs[i]) + } + + // delete if len(deletedDeployments) > 0 { log.Debugf("will delete %d old bundles", len(deletedDeployments)) //TODO delete bundles for deleted deployments - h.bundleMan.deleteBundles(deletedDeployments) + h.bundleMan.deleteBundlesFromDeployments(deletedDeployments) } }
diff --git a/listener_test.go b/listener_test.go index d82ce1c..7549efb 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -40,6 +40,7 @@ requestChan: make(chan *DownloadRequest), depChan: make(chan *DataDeployment), delChan: make(chan *DataDeployment), + delBlobChan: make(chan string), } testHandler = &apigeeSyncHandler{ dbMan: dummyDbMan, @@ -55,7 +56,7 @@ }) Context("Snapshot", func() { - It("Snapshot event shoud enqueue download requests for all unready blobs", func() { + It("Snapshot event should enqueue download requests for all unready blobs", func() { // init unready blob ids unreadyBlobIds := make([]string, 0) blobMap := make(map[string]int) @@ -84,7 +85,7 @@ } }) - It("Snapshot events shoud set db version, and should only init API endpoint once", func() { + It("Snapshot events should set db version, and should only init API endpoint once", func() { // emit snapshot for i := 0; i < 2+rand.Intn(5); i++ { @@ -104,7 +105,7 @@ Context("Change list", func() { - It("Insert event shoud enqueue download requests for all inserted deployments", func() { + It("Insert event should enqueue download requests for all inserted deployments", func() { // emit change event changes := make([]common.Change, 0) deployments := make(map[string]DataDeployment) @@ -134,7 +135,7 @@ Expect(len(deployments)).Should(BeZero()) }) - It("Delete event shoud deliver to the bundle manager", func() { + It("Delete event should deliver to the bundle manager", func() { // emit change event changes := make([]common.Change, 0) deployments := make(map[string]bool) @@ -163,6 +164,116 @@ } Expect(len(deployments)).Should(BeZero()) }) + + It("Update event should enqueue download requests and delete old blobs", func() { + + changes := make([]common.Change, 0) + blobIdNew := make(map[string]int) + blobIdOld := make(map[string]int) + for i := 0; i < 1+rand.Intn(10); i++ { + depNew := makeTestDeployment() + depNew.BlobID = GenerateUUID() + depNew.BlobResourceID = GenerateUUID() + + depOld := makeTestDeployment() + depOld.BlobID = GenerateUUID() + depOld.BlobResourceID = GenerateUUID() + + change := common.Change{ + Operation: common.Update, + Table: CONFIG_METADATA_TABLE, + NewRow: rowFromDeployment(depNew), + OldRow: rowFromDeployment(depOld), + } + changes = append(changes, change) + + blobIdNew[depNew.BlobID]++ + blobIdNew[depNew.BlobResourceID]++ + blobIdOld[depOld.BlobID]++ + blobIdOld[depOld.BlobResourceID]++ + } + + // emit change event + changeList := &common.ChangeList{ + Changes: changes, + } + + apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + + // verify + for i := 0; i < len(blobIdNew); i++ { + req := <-dummyBundleMan.requestChan + blobIdNew[req.blobId]++ + Expect(blobIdNew[req.blobId]).Should(Equal(2)) + } + for i := 0; i < len(blobIdOld); i++ { + blobId := <-dummyBundleMan.delBlobChan + blobIdOld[blobId]++ + Expect(blobIdOld[blobId]).Should(Equal(2)) + } + + }) + + It("Update event should only download/delete changed blobs", func() { + changes := make([]common.Change, 0) + blobIdChangedNew := make(map[string]int) + blobIdChangedOld := make(map[string]int) + + for i := 0; i < 1+rand.Intn(10); i++ { + depNew := makeTestDeployment() + depNew.BlobID = GenerateUUID() + depNew.BlobResourceID = GenerateUUID() + + depOld := makeTestDeployment() + + if rand.Intn(2) == 0 { + // blob id changed + depOld.BlobID = GenerateUUID() + blobIdChangedNew[depNew.BlobID]++ + blobIdChangedOld[depOld.BlobID]++ + } else { + // blob id unchanged + depOld.BlobID = depNew.BlobID + } + + if rand.Intn(2) == 0 { + // blob id changed + depOld.BlobResourceID = GenerateUUID() + blobIdChangedNew[depNew.BlobResourceID]++ + blobIdChangedOld[depOld.BlobResourceID]++ + } else { + // blob id unchanged + depOld.BlobResourceID = depNew.BlobResourceID + } + + change := common.Change{ + Operation: common.Update, + Table: CONFIG_METADATA_TABLE, + NewRow: rowFromDeployment(depNew), + OldRow: rowFromDeployment(depOld), + } + changes = append(changes, change) + } + + // emit change event + changeList := &common.ChangeList{ + Changes: changes, + } + + apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + + // verify + for i := 0; i < len(blobIdChangedNew); i++ { + req := <-dummyBundleMan.requestChan + blobIdChangedNew[req.blobId]++ + Expect(blobIdChangedNew[req.blobId]).Should(Equal(2)) + } + for i := 0; i < len(blobIdChangedOld); i++ { + blobId := <-dummyBundleMan.delBlobChan + blobIdChangedOld[blobId]++ + Expect(blobIdChangedOld[blobId]).Should(Equal(2)) + } + }) }) }) @@ -170,6 +281,7 @@ requestChan chan *DownloadRequest depChan chan *DataDeployment delChan chan *DataDeployment + delBlobChan chan string } func (bm *dummyBundleManager) initializeBundleDownloading() { @@ -190,12 +302,16 @@ } } -func (bm *dummyBundleManager) deleteBundles(deployments []DataDeployment) { +func (bm *dummyBundleManager) deleteBundlesFromDeployments(deployments []DataDeployment) { for i := range deployments { bm.delChan <- &deployments[i] } } +func (bm *dummyBundleManager) deleteBundleById(blobId string) { + bm.delBlobChan <- blobId +} + func (bm *dummyBundleManager) Close() { }