blob: 2971a08cb92104536dd7533eed46ce88268ed8a8 [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apiGatewayConfDeploy
import (
"fmt"
"github.com/apid/apid-core"
"github.com/apid/apid-core/util"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"math/rand"
"time"
)
var _ = Describe("listener", func() {
var dummyDbMan *dummyDbManager
var dummyApiMan *dummyApiManager
var dummyBundleMan *dummyBundleManager
var testHandler *apigeeSyncHandler
var testCount int
var _ = BeforeEach(func() {
testCount += 1
// stop handler created by initPlugin()
eventHandler.stopListener(services)
dummyApiMan = &dummyApiManager{
notifyChan: make(chan bool, 1),
initCalled: make(chan bool),
}
dummyDbMan = &dummyDbManager{}
dummyBundleMan = &dummyBundleManager{
blobChan: make(chan string),
}
testHandler = &apigeeSyncHandler{
dbMan: dummyDbMan,
apiMan: dummyApiMan,
bundleMan: dummyBundleMan,
}
testHandler.initListener(services)
time.Sleep(100 * time.Millisecond)
})
var _ = AfterEach(func() {
testHandler.stopListener(services)
})
Context("Snapshot", func() {
It("Snapshot event should enqueue download requests for all unready blobs", func() {
// init unready blob ids
unreadyBlobIds := make([]string, 0)
blobMap := make(map[string]int)
for i := 0; i < 1+rand.Intn(10); i++ {
id := util.GenerateUUID()
blobMap[id] = 1
unreadyBlobIds = append(unreadyBlobIds, id)
}
dummyDbMan.unreadyBlobIds = unreadyBlobIds
// emit snapshot
snapshot := &common.Snapshot{
SnapshotInfo: fmt.Sprint(rand.Uint32()),
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
for i := 0; i < len(unreadyBlobIds); i++ {
id := <-dummyBundleMan.blobChan
blobMap[id]++
}
// verify all unready blobids are enqueued
for _, val := range blobMap {
Expect(val).Should(Equal(2))
}
})
It("Snapshot events should set db version", func() {
// emit snapshot
for i := 0; i < 2+rand.Intn(5); i++ {
version := fmt.Sprint(rand.Uint32())
snapshot := &common.Snapshot{
SnapshotInfo: version,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
Expect(dummyDbMan.version).Should(Equal(version))
}
})
It("Snapshot event should init API endpoint and notify long-polling", func() {
// emit snapshot
version := fmt.Sprint(rand.Uint32())
snapshot := &common.Snapshot{
SnapshotInfo: version,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
Expect(dummyDbMan.version).Should(Equal(version))
Expect(<-dummyApiMan.initCalled).Should(BeTrue())
Expect(<-dummyApiMan.notifyChan).Should(BeTrue())
})
It("Should load LSN when apid starts", func() {
dummyDbMan.dbLSN = fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
// emit snapshot
version := fmt.Sprint(rand.Uint32())
snapshot := &common.Snapshot{
SnapshotInfo: version,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
Expect(dummyDbMan.getLSN()).Should(Equal(dummyDbMan.dbLSN))
})
It("Should store LSN when receiving snapshot at runtime", func() {
dummyDbMan.lsn = fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
// emit snapshot
version := fmt.Sprint(rand.Uint32())
snapshot := &common.Snapshot{
SnapshotInfo: version,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
Expect(dummyDbMan.getLSN()).Should(Equal(dummyDbMan.dbLSN))
})
})
Context("Change list", func() {
It("Insert event should enqueue download requests for all inserted deployments", func() {
// emit change event
changes := make([]common.Change, 0)
blobs := make(map[string]int)
for i := 0; i < 1+rand.Intn(10); i++ {
dep := makeTestDeployment()
change := common.Change{
Operation: common.Insert,
Table: CONFIG_METADATA_TABLE,
NewRow: rowFromDeployment(dep),
}
changes = append(changes, change)
blobs[dep.BlobID]++
blobs[dep.BlobResourceID]++
}
changeList := &common.ChangeList{
Changes: changes,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
// verify
for i := 0; i < 2*len(changes); i++ {
blobId := <-dummyBundleMan.blobChan
blobs[blobId]++
Expect(blobs[blobId]).Should(Equal(2))
}
})
XIt("Delete event should deliver to the bundle manager", func() {
// emit change event
changes := make([]common.Change, 0)
deployments := make(map[string]bool)
for i := 0; i < 1+rand.Intn(10); i++ {
dep := makeTestDeployment()
change := common.Change{
Operation: common.Delete,
Table: CONFIG_METADATA_TABLE,
OldRow: rowFromDeployment(dep),
}
changes = append(changes, change)
deployments[dep.ID] = true
}
changeList := &common.ChangeList{
Changes: changes,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
// verify
for i := 0; i < len(changes); i++ {
}
Expect(len(deployments)).Should(BeZero())
})
It("Update event should enqueue download requests", func() {
changes := make([]common.Change, 0)
blobsNew := make(map[string]int)
for i := 0; i < 1+rand.Intn(10); i++ {
confNew := makeTestDeployment()
confNew.BlobID = util.GenerateUUID()
confNew.BlobResourceID = util.GenerateUUID()
confOld := makeTestDeployment()
confOld.BlobID = util.GenerateUUID()
confOld.BlobResourceID = util.GenerateUUID()
change := common.Change{
Operation: common.Update,
Table: CONFIG_METADATA_TABLE,
NewRow: rowFromDeployment(confNew),
OldRow: rowFromDeployment(confOld),
}
changes = append(changes, change)
blobsNew[confNew.BlobID]++
blobsNew[confNew.BlobResourceID]++
}
testLSN := "1.1.1"
// emit change event
changeList := &common.ChangeList{
Changes: changes,
LastSequence: testLSN,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
// verify
for i := 0; i < 2*len(changes); i++ {
blobId := <-dummyBundleMan.blobChan
blobsNew[blobId]++
Expect(blobsNew[blobId]).Should(Equal(2))
}
})
})
Context("LSN", func() {
var _ = BeforeEach(func() {
dummyDbMan.lsn = "0.0.1"
})
It("changelist with CONFIG_METADATA_TABLE should update apidLSN", func() {
// emit change event
changes := make([]common.Change, 0)
deployments := make(map[string]Configuration)
testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
for i := 0; i < 1+rand.Intn(10); i++ {
dep := makeTestDeployment()
change := common.Change{
Operation: common.Insert,
Table: CONFIG_METADATA_TABLE,
NewRow: rowFromDeployment(dep),
}
changes = append(changes, change)
deployments[dep.ID] = *dep
}
changeList := &common.ChangeList{
Changes: changes,
LastSequence: testLSN,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
for i := 0; i < 2*len(changes); i++ {
<-dummyBundleMan.blobChan
}
Expect(dummyDbMan.getLSN()).Should(Equal(testLSN))
})
It("changelist without CONFIG_METADATA_TABLE shouldn't update apidLSN", func() {
testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
dummyDbMan.lsn = testLSN
// emit change event
changes := make([]common.Change, 0)
deployments := make(map[string]Configuration)
for i := 0; i < 1+rand.Intn(10); i++ {
dep := makeTestDeployment()
change := common.Change{
Operation: common.Insert,
Table: "somewhat-table",
NewRow: rowFromDeployment(dep),
}
changes = append(changes, change)
deployments[dep.ID] = *dep
}
changeList := &common.ChangeList{
Changes: changes,
LastSequence: "aaa.aaa.aaa",
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
Expect(dummyDbMan.getLSN()).Should(Equal(testLSN))
})
It("changelist should always update apidLSN if it has init value", func() {
testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
dummyDbMan.lsn = InitLSN
// emit change event
changeList := &common.ChangeList{
Changes: nil,
LastSequence: testLSN,
}
<-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
Expect(dummyDbMan.getLSN()).Should(Equal(testLSN))
})
})
})
func rowFromDeployment(dep *Configuration) common.Row {
row := common.Row{}
row["id"] = &common.ColumnVal{Value: dep.ID}
row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
row["environment_id"] = &common.ColumnVal{Value: dep.EnvID}
row["bean_blob_id"] = &common.ColumnVal{Value: dep.BlobID}
row["resource_blob_id"] = &common.ColumnVal{Value: dep.BlobResourceID}
row["type"] = &common.ColumnVal{Value: dep.Type}
row["name"] = &common.ColumnVal{Value: dep.Name}
row["revision"] = &common.ColumnVal{Value: dep.Revision}
row["path"] = &common.ColumnVal{Value: dep.Path}
row["created_at"] = &common.ColumnVal{Value: dep.Created}
row["created_by"] = &common.ColumnVal{Value: dep.CreatedBy}
row["updated_at"] = &common.ColumnVal{Value: dep.Updated}
row["updated_by"] = &common.ColumnVal{Value: dep.UpdatedBy}
return row
}