blob: 2b2fbd64cb346d5a500e1b828bd3a253c2dec0c2 [file] [log] [blame]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidAnalytics
import (
"compress/gzip"
"encoding/json"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"os"
"path/filepath"
"time"
)
var _ = Describe("test getBucketForTimestamp()", func() {
It("should return new bucket or existing bucket if created previously", func() {
t := time.Date(2017, 1, 20, 10, 24, 5, 0, time.UTC)
tenant := tenant{Org: "testorg", Env: "testenv", TenantId: "tenantid"}
bucket, err := getBucketForTimestamp(t, tenant)
Expect(err).ShouldNot(HaveOccurred())
Expect(bucket.DirName).To(Equal("testorg~testenv~20170120102400"))
Expect(bucket.FileWriter).ToNot(BeNil())
fw := bucket.FileWriter
Expect(fw.file.Name()).To(ContainSubstring("20170120102400.20170120102600"))
// Should return existing bucket if same interval timestamp is passed
t2 := time.Date(2017, 1, 20, 10, 25, 5, 0, time.UTC)
bucket, err = getBucketForTimestamp(t2, tenant)
Expect(err).ShouldNot(HaveOccurred())
Expect(bucket.DirName).To(Equal("testorg~testenv~20170120102400"))
})
})
var _ = Describe("test getRandomHex()", func() {
It("should return a 4 digit hex", func() {
r1 := getRandomHex()
Expect(len(r1)).To(Equal(4))
})
It("should return differe 4 digit hex each time", func() {
r1 := getRandomHex()
r2 := getRandomHex()
Expect(r1).NotTo(Equal(r2))
})
})
var _ = Describe("test createWriteAndCloseFile()", func() {
Context("Cannot create file", func() {
It("should return error", func() {
fileName := "testFile" + fileExtension
completeFilePath := filepath.Join(localAnalyticsTempDir, "fakedir", fileName)
_, err := createGzipFile(completeFilePath)
Expect(err).To((HaveOccurred()))
})
})
Context("Create file, write to it and close file", func() {
It("should save content to file and read correctly", func() {
fileName := "testFile" + fileExtension
completeFilePath := filepath.Join(localAnalyticsTempDir, fileName)
fw, err := createGzipFile(completeFilePath)
Expect(err).ToNot((HaveOccurred()))
var records = []byte(`{
"records":[{
"response_status_code": 200,
"client_id":"testapikey",
"client_received_start_timestamp": 1486406248277,
"client_received_end_timestamp": 1486406248290
}]
}`)
raw := getRaw(records)
writeGzipFile(fw, raw["records"].([]interface{}))
closeGzipFile(fw)
// Verify file was written to properly
f, err := os.Open(completeFilePath)
defer f.Close()
gzReader, err := gzip.NewReader(f)
defer gzReader.Close()
Expect(err).ToNot((HaveOccurred()))
var record map[string]interface{}
decoder := json.NewDecoder(gzReader) // Decode payload to JSON data
decoder.UseNumber()
err = decoder.Decode(&record)
Expect(err).ToNot((HaveOccurred()))
Expect(record["client_id"]).To(Equal("testapikey"))
Expect(record["response_status_code"]).To(Equal(json.Number("200")))
Expect(record["client_received_start_timestamp"]).To(Equal(json.Number("1486406248277")))
Expect(record["client_received_end_timestamp"]).To(Equal(json.Number("1486406248290")))
err = os.Remove(completeFilePath)
Expect(err).ToNot((HaveOccurred()))
})
})
})
var _ = Describe("test closeBucketChannel()", func() {
Context("send close bucket event on channel", func() {
It("close file and move to staging dir", func() {
dirName := "testorg~testenv~20160101230000"
dirPath := filepath.Join(localAnalyticsTempDir, dirName)
err := os.Mkdir(dirPath, os.ModePerm)
Expect(err).ShouldNot(HaveOccurred())
fileName := "testFile" + fileExtension
completeFilePath := filepath.Join(dirPath, fileName)
fw, e := createGzipFile(completeFilePath)
Expect(e).ShouldNot(HaveOccurred())
bucket := bucket{keyTS: 112312, DirName: dirName, FileWriter: fw}
closeBucketEvent <- bucket
// wait for it to close dir and move to staging
time.Sleep(time.Second * 2)
expectedDirPath := filepath.Join(localAnalyticsStagingDir, dirName)
Expect(expectedDirPath).To(BeADirectory())
expectedfilePath := filepath.Join(localAnalyticsStagingDir, dirName, fileName)
Expect(expectedfilePath).To(BeAnExistingFile())
})
})
})