Merge pull request #14 from 30x/SQLlite_tohave_schemas
SQLlite now have schemas
diff --git a/buffering_manager.go b/buffering_manager.go
index 184b1d6..d581135 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -17,12 +17,14 @@
// Channel where analytics records are buffered before being dumped to a
// file as write to file should not performed in the Http Thread
var internalBuffer chan axRecords
+
// channel to indicate that internalBuffer channel is closed
var doneInternalBufferChan chan bool
// Channel where close bucket event is published i.e. when a bucket
// is ready to be closed based on collection interval
var closeBucketEvent chan bucket
+
// channel to indicate that closeBucketEvent channel is closed
var doneClosebucketChan chan bool
@@ -87,7 +89,7 @@
// staging to indicate its ready for upload
err := os.Rename(dirToBeClosed, stagingPath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from" +
+ log.Errorf("Cannot move directory '%s' from"+
" tmp to staging folder due to '%s", bucket.DirName, err)
} else {
// Remove bucket from bucket map once its closed successfully
@@ -104,7 +106,7 @@
// Save records to correct file based on what timestamp data is being collected for
func save(records axRecords) error {
- bucket, err := getBucketForTimestamp(time.Now(), records.Tenant)
+ bucket, err := getBucketForTimestamp(time.Now().UTC(), records.Tenant)
if err != nil {
return err
}
@@ -124,11 +126,11 @@
if exists {
return b, nil
} else {
- timestamp := time.Unix(ts, 0).Format(timestampLayout)
+ timestamp := time.Unix(ts, 0).UTC().Format(timestampLayout)
// endtimestamp of bucket = starttimestamp + collectionInterval
endTime := time.Unix(ts+int64(config.GetInt(analyticsCollectionInterval)), 0)
- endtimestamp := endTime.Format(timestampLayout)
+ endtimestamp := endTime.UTC().Format(timestampLayout)
dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp
newPath := filepath.Join(localAnalyticsTempDir, dirName)
@@ -159,7 +161,7 @@
//Send event to close directory after endTime + 5
// seconds to make sure all buffers are flushed to file
- timer := time.After(endTime.Sub(time.Now()) + time.Second*5)
+ timer := time.After(endTime.Sub(time.Now().UTC()) + time.Second*5)
go func() {
<-timer
closeBucketEvent <- newBucket
diff --git a/buffering_manager_test.go b/buffering_manager_test.go
index c3c747e..42cd512 100644
--- a/buffering_manager_test.go
+++ b/buffering_manager_test.go
@@ -12,7 +12,7 @@
var _ = Describe("test getBucketForTimestamp()", func() {
It("should return new bucket or existing bucket if created previously", func() {
- t := time.Date(2017, 1, 20, 10, 24, 5, 0, time.Local)
+ t := time.Date(2017, 1, 20, 10, 24, 5, 0, time.UTC)
tenant := tenant{Org: "testorg", Env: "testenv", TenantId: "tenantid"}
bucket, err := getBucketForTimestamp(t, tenant)
@@ -25,7 +25,7 @@
Expect(fw.file.Name()).To(ContainSubstring("20170120102400.20170120102600"))
// Should return existing bucket if same interval timestamp is passed
- t2 := time.Date(2017, 1, 20, 10, 25, 5, 0, time.Local)
+ t2 := time.Date(2017, 1, 20, 10, 25, 5, 0, time.UTC)
bucket, err = getBucketForTimestamp(t2, tenant)
Expect(err).ShouldNot(HaveOccurred())
Expect(bucket.DirName).To(Equal("testorg~testenv~20170120102400"))
diff --git a/init.go b/init.go
index f865bc5..a0efde2 100644
--- a/init.go
+++ b/init.go
@@ -222,11 +222,11 @@
log.Debugf("sent signal to close closebucketevent channel")
// block on channel to ensure channel is closed
- <- doneInternalBufferChan
+ <-doneInternalBufferChan
log.Debugf("closed internal buffer channel successfully")
// block on channel to ensure channel is closed
- <- doneClosebucketChan
+ <-doneClosebucketChan
log.Debugf("closed closebucketevent channel successfully")
// Close all open files and move directories in tmp to staging.
diff --git a/uploader.go b/uploader.go
index fb8a267..64b1f40 100644
--- a/uploader.go
+++ b/uploader.go
@@ -76,6 +76,7 @@
// eg. date=2017-01-30/time=16-32/1069_20170130163200.20170130163400_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0.txt.gz
q.Add("relative_file_path", relativeFilePath)
q.Add("file_content_type", "application/x-gzip")
+ q.Add("encrypt", "true")
req.URL.RawQuery = q.Encode()
// Add Bearer Token to each request
@@ -113,6 +114,7 @@
req.Header.Set("Expect", "100-continue")
req.Header.Set("Content-Type", "application/x-gzip")
+ req.Header.Set("x-amz-server-side-encryption", "AES256")
fileStats, err := file.Stat()
if err != nil {