blob: 32814536cb5d35ca8216b00728f344ea11ca8921 [file] [log] [blame]
/*
Copyright 2016 The Transicator Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"math/rand"
"sync/atomic"
"time"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
startIndex = 2
maxWait = 10 * time.Microsecond
trackerTestTimeout = 10 * time.Second
trackerCount = 1000
)
var tracker *changeTracker
var _ = Describe("Change tracker", func() {
BeforeEach(func() {
tracker = createTracker()
tracker.update(common.MakeSequence(uint64(startIndex), 0), "")
})
AfterEach(func() {
tracker.close()
})
It("Behind", func() {
behind := tracker.wait(common.MakeSequence(1, 0), []string{""})
Expect(behind.LSN).Should(BeEquivalentTo(2))
})
It("Behind with selectors", func() {
tracker.update(common.MakeSequence(3, 0), "foo")
tracker.update(common.MakeSequence(4, 0), "bar")
behind := tracker.wait(common.MakeSequence(1, 0), []string{"foo"})
Expect(behind.LSN).Should(BeEquivalentTo(3))
behind = tracker.wait(common.MakeSequence(1, 0), []string{"bar"})
Expect(behind.LSN).Should(BeEquivalentTo(4))
behind = tracker.wait(common.MakeSequence(1, 0), []string{"foo", "bar"})
Expect(behind.LSN).Should(BeEquivalentTo(4))
behind = tracker.wait(common.MakeSequence(1, 0), []string{"bar", "foo"})
Expect(behind.LSN).Should(BeEquivalentTo(4))
})
It("Caught up", func() {
behind := tracker.wait(common.MakeSequence(2, 0), []string{""})
Expect(behind.LSN).Should(BeEquivalentTo(2))
})
It("Caught up with selectors", func() {
tracker.update(common.MakeSequence(3, 0), "foo")
tracker.update(common.MakeSequence(4, 0), "bar")
time.Sleep(time.Millisecond * 250)
behind := tracker.wait(common.MakeSequence(2, 0), []string{"foo"})
Expect(behind.LSN).Should(BeEquivalentTo(3))
behind = tracker.wait(common.MakeSequence(2, 0), []string{"bar"})
Expect(behind.LSN).Should(BeEquivalentTo(4))
behind = tracker.wait(common.MakeSequence(2, 0), []string{"foo", "bar"})
Expect(behind.LSN).Should(BeEquivalentTo(4))
})
It("Timeout", func() {
blocked := tracker.timedWait(common.MakeSequence(3, 0), 250*time.Millisecond, []string{""})
Expect(blocked.LSN).Should(BeEquivalentTo(0))
})
It("Up to date", func() {
doneChan := make(chan uint64, 1)
go func() {
new := tracker.wait(common.MakeSequence(3, 0), []string{""})
doneChan <- new.LSN
}()
tracker.update(common.MakeSequence(3, 0), "")
Eventually(doneChan).Should(Receive(BeEquivalentTo(3)))
})
It("Up to date with timeout", func() {
doneChan := make(chan uint64, 1)
go func() {
new := tracker.timedWait(common.MakeSequence(3, 0), 2*time.Second, []string{""})
doneChan <- new.LSN
}()
tracker.update(common.MakeSequence(3, 0), "")
Eventually(doneChan).Should(Receive(BeEquivalentTo(3)))
})
It("Behind with tags", func() {
doneChanFoo := make(chan uint64, 1)
doneChanBar := make(chan uint64, 1)
go func() {
new := tracker.wait(common.MakeSequence(4, 0), []string{"foo"})
doneChanFoo <- new.LSN
}()
go func() {
new := tracker.timedWait(common.MakeSequence(4, 0), 2*time.Second, []string{"bar"})
doneChanBar <- new.LSN
}()
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(4, 0), "foo")
Eventually(doneChanFoo).Should(Receive(BeEquivalentTo(4)))
Eventually(doneChanBar, 5*time.Second).Should(Receive(BeZero()))
})
It("Tag never updated", func() {
blocked := tracker.timedWait(common.MakeSequence(3, 0), 250*time.Millisecond, []string{"baz"})
Expect(blocked.LSN).Should(BeEquivalentTo(0))
})
It("Less up to date with timeout", func() {
doneChan := make(chan uint64, 1)
go func() {
new := tracker.timedWait(common.MakeSequence(4, 0), 2*time.Second, []string{""})
doneChan <- new.LSN
}()
tracker.update(common.MakeSequence(3, 0), "")
tracker.update(common.MakeSequence(4, 0), "")
Eventually(doneChan).Should(Receive(BeEquivalentTo(4)))
})
It("Up to date timeout", func() {
doneChan := make(chan uint64, 1)
go func() {
new := tracker.timedWait(common.MakeSequence(3, 0), 250*time.Millisecond, []string{""})
doneChan <- new.LSN
}()
time.Sleep(1 * time.Second)
tracker.update(common.MakeSequence(3, 0), "")
Eventually(doneChan).Should(Receive(BeEquivalentTo(0)))
})
It("Update", func() {
doneChan := make(chan uint64, 1)
go func() {
new := tracker.wait(common.MakeSequence(4, 0), []string{""})
doneChan <- new.LSN
}()
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(3, 0), "")
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(4, 0), "")
Eventually(doneChan).Should(Receive(BeEquivalentTo(4)))
})
It("Update with selectors", func() {
doneChan := make(chan uint64, 1)
go func() {
new := tracker.wait(common.MakeSequence(4, 0), []string{"baz"})
doneChan <- new.LSN
}()
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(3, 0), "baz")
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(4, 0), "baz")
Eventually(doneChan).Should(Receive(BeEquivalentTo(4)))
})
It("Update with two selectors", func() {
doneChan := make(chan uint64, 1)
go func() {
new := tracker.wait(common.MakeSequence(4, 0), []string{"bar", "baz"})
doneChan <- new.LSN
}()
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(3, 0), "foo")
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(4, 0), "baz")
Eventually(doneChan).Should(Receive(BeEquivalentTo(4)))
})
It("Update twice", func() {
doneChan := make(chan uint64, 1)
doneChan2 := make(chan uint64, 1)
go func() {
new := tracker.wait(common.MakeSequence(4, 0), []string{""})
doneChan <- new.LSN
}()
go func() {
new2 := tracker.wait(common.MakeSequence(4, 0), []string{""})
doneChan2 <- new2.LSN
}()
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(3, 0), "")
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(4, 0), "")
got := <-doneChan
Expect(got).Should(BeEquivalentTo(4))
got = <-doneChan2
Expect(got).Should(BeEquivalentTo(4))
})
It("Multi Update", func() {
prematureDoneChan := make(chan uint64, 1)
doneChan := make(chan uint64, 1)
go func() {
oldNew := tracker.wait(common.MakeSequence(10, 0), []string{""})
prematureDoneChan <- oldNew.LSN
}()
go func() {
new := tracker.wait(common.MakeSequence(4, 0), []string{""})
doneChan <- new.LSN
}()
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(3, 0), "")
time.Sleep(250 * time.Millisecond)
tracker.update(common.MakeSequence(4, 0), "")
// No loop -- we expect that the first case arrive before the second
select {
case gotVal := <-doneChan:
Expect(gotVal).Should(BeEquivalentTo(4))
case <-prematureDoneChan:
Expect(true).Should(BeFalse())
}
})
It("Close", func() {
tracker = createTracker()
tracker.update(common.MakeSequence(2, 0), "")
done := make(chan uint64, 1)
go func() {
new := tracker.wait(common.MakeSequence(3, 0), []string{""})
done <- new.LSN
}()
time.Sleep(250 * time.Millisecond)
tracker.close()
Eventually(done).Should(Receive(BeEquivalentTo(2)))
})
It("Stress 1, 1", func() {
trackerStress(1, 1, trackerCount)
})
It("Stress 100, 1", func() {
trackerStress(100, 1, trackerCount)
})
It("Stress 1, 100", func() {
trackerStress(1, 100, trackerCount)
})
It("Stress 100, 100", func() {
trackerStress(100, 100, trackerCount)
})
It("Stress 100, 1000", func() {
trackerStress(100, 1000, trackerCount)
})
})
func trackerStress(producers, consumers int, max int64) {
// Make a separate tracker here. The global pointer may get shared
// by multiple tests otherwise as one shuts down while the next starts.
stressTracker := createTracker()
defer stressTracker.close()
var start int64 = startIndex
prodDone := make(chan bool, producers)
consDone := make(chan bool, consumers)
allDone := make(chan bool, 1)
go func() {
ticker := time.NewTicker(time.Second)
count := 0
for {
select {
case <-ticker.C:
fmt.Fprintf(GinkgoWriter, "%d: %d\n", count, atomic.LoadInt64(&start))
count++
case <-allDone:
return
}
}
}()
for i := 0; i <= producers; i++ {
go func() {
for atomic.LoadInt64(&start) < max {
waitTime := rand.Int63n(int64(maxWait))
time.Sleep(time.Duration(waitTime))
val := atomic.AddInt64(&start, 1)
seq := common.MakeSequence(uint64(val), 0)
stressTracker.update(seq, "")
}
prodDone <- true
}()
}
for i := 0; i <= consumers; i++ {
go func() {
var last int64 = startIndex
for last < max {
waitTime := rand.Int63n(int64(maxWait))
seq := common.MakeSequence(uint64(last+1), 0)
lastSeq := stressTracker.timedWait(seq, time.Duration(waitTime), []string{""})
last = int64(lastSeq.LSN)
}
consDone <- true
}()
}
prodCount := 0
consCount := 0
timeout := time.NewTimer(trackerTestTimeout)
for prodCount < producers || consCount < consumers {
select {
case <-prodDone:
prodCount++
case <-consDone:
consCount++
case <-timeout.C:
fmt.Fprintf(GinkgoWriter,
"Test timed out after %d producers and %d consumers. last = %d\n",
prodCount, consCount, atomic.LoadInt64(&start))
//stacks := make([]byte, 1000000)
//stackLen := runtime.Stack(stacks, true)
//fmt.Fprintf(GinkgoWriter, string(stacks[:stackLen]))
Expect(false).Should(BeTrue())
allDone <- true
return
}
}
allDone <- true
}