blob: 17893171b0b2b8bdd7cd133bf2a33bfbe0d3bf65 [file] [log] [blame] [edit]
package apidAnalytics
import (
"compress/gzip"
"encoding/json"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"os"
"path/filepath"
"time"
)
var _ = Describe("test getBucketForTimestamp()", func() {
It("should return new bucket or existing bucket if created previously", func() {
t := time.Date(2017, 1, 20, 10, 24, 5, 0, time.Local)
tenant := tenant{Org: "testorg", Env: "testenv", TenantId: "tenantid"}
bucket, err := getBucketForTimestamp(t, tenant)
Expect(err).ShouldNot(HaveOccurred())
Expect(bucket.DirName).To(Equal("testorg~testenv~20170120102400"))
Expect(bucket.FileWriter).ToNot(BeNil())
fw := bucket.FileWriter
Expect(fw.file.Name()).To(ContainSubstring("20170120102400.20170120102600"))
// Should return existing bucket if same interval timestamp is passed
t2 := time.Date(2017, 1, 20, 10, 25, 5, 0, time.Local)
bucket, err = getBucketForTimestamp(t2, tenant)
Expect(err).ShouldNot(HaveOccurred())
Expect(bucket.DirName).To(Equal("testorg~testenv~20170120102400"))
})
})
var _ = Describe("test getRandomHex()", func() {
It("should return a 4 digit hex", func() {
r1 := getRandomHex()
Expect(len(r1)).To(Equal(4))
})
It("should return differe 4 digit hex each time", func() {
r1 := getRandomHex()
r2 := getRandomHex()
Expect(r1).NotTo(Equal(r2))
})
})
var _ = Describe("test createWriteAndCloseFile()", func() {
Context("Cannot create file", func() {
It("should return error", func() {
fileName := "testFile" + fileExtension
completeFilePath := filepath.Join(localAnalyticsTempDir, "fakedir", fileName)
_, err := createGzipFile(completeFilePath)
Expect(err).To((HaveOccurred()))
})
})
Context("Create file, write to it and close file", func() {
It("should save content to file and read correctly", func() {
fileName := "testFile" + fileExtension
completeFilePath := filepath.Join(localAnalyticsTempDir, fileName)
fw, err := createGzipFile(completeFilePath)
Expect(err).ToNot((HaveOccurred()))
var records = []byte(`{
"records":[{
"response_status_code": 200,
"client_id":"testapikey",
"client_received_start_timestamp": 1486406248277,
"client_received_end_timestamp": 1486406248290
}]
}`)
raw := getRaw(records)
writeGzipFile(fw, raw["records"].([]interface{}))
closeGzipFile(fw)
// Verify file was written to properly
f, err := os.Open(completeFilePath)
defer f.Close()
gzReader, err := gzip.NewReader(f)
defer gzReader.Close()
Expect(err).ToNot((HaveOccurred()))
var record map[string]interface{}
decoder := json.NewDecoder(gzReader) // Decode payload to JSON data
decoder.UseNumber()
err = decoder.Decode(&record)
Expect(err).ToNot((HaveOccurred()))
Expect(record["client_id"]).To(Equal("testapikey"))
Expect(record["response_status_code"]).To(Equal(json.Number("200")))
Expect(record["client_received_start_timestamp"]).To(Equal(json.Number("1486406248277")))
Expect(record["client_received_end_timestamp"]).To(Equal(json.Number("1486406248290")))
})
})
})
var _ = Describe("test closeBucketChannel()", func() {
Context("send close bucket event on channel", func() {
It("close file and move to staging dir", func() {
dirName := "testorg~testenv~20160101230000"
dirPath := filepath.Join(localAnalyticsTempDir, dirName)
err := os.Mkdir(dirPath, os.ModePerm)
Expect(err).ShouldNot(HaveOccurred())
fileName := "testFile" + fileExtension
completeFilePath := filepath.Join(dirPath, fileName)
fw, e := createGzipFile(completeFilePath)
Expect(e).ShouldNot(HaveOccurred())
bucket := bucket{DirName: dirName, FileWriter: fw}
closeBucketEvent <- bucket
// wait for it to close dir and move to staging
time.Sleep(time.Second * 2)
expectedDirPath := filepath.Join(localAnalyticsStagingDir, dirName)
status, _ := exists(expectedDirPath)
Expect(status).To(BeTrue())
expectedfilePath := filepath.Join(localAnalyticsStagingDir, dirName, fileName)
status, _ = exists(expectedfilePath)
Expect(status).To(BeTrue())
})
})
})
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}