Merge pull request #14 from 30x/SQLlite_tohave_schemas

SQLlite now have schemas
diff --git a/buffering_manager.go b/buffering_manager.go
index 184b1d6..d581135 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -17,12 +17,14 @@
 // Channel where analytics records are buffered before being dumped to a
 // file as write to file should not performed in the Http Thread
 var internalBuffer chan axRecords
+
 // channel to indicate that internalBuffer channel is closed
 var doneInternalBufferChan chan bool
 
 // Channel where close bucket event is published i.e. when a bucket
 // is ready to be closed based on collection interval
 var closeBucketEvent chan bucket
+
 // channel to indicate that closeBucketEvent channel is closed
 var doneClosebucketChan chan bool
 
@@ -87,7 +89,7 @@
 			// staging to indicate its ready for upload
 			err := os.Rename(dirToBeClosed, stagingPath)
 			if err != nil {
-				log.Errorf("Cannot move directory '%s' from" +
+				log.Errorf("Cannot move directory '%s' from"+
 					" tmp to staging folder due to '%s", bucket.DirName, err)
 			} else {
 				// Remove bucket from bucket map once its closed successfully
@@ -104,7 +106,7 @@
 
 // Save records to correct file based on what timestamp data is being collected for
 func save(records axRecords) error {
-	bucket, err := getBucketForTimestamp(time.Now(), records.Tenant)
+	bucket, err := getBucketForTimestamp(time.Now().UTC(), records.Tenant)
 	if err != nil {
 		return err
 	}
@@ -124,11 +126,11 @@
 	if exists {
 		return b, nil
 	} else {
-		timestamp := time.Unix(ts, 0).Format(timestampLayout)
+		timestamp := time.Unix(ts, 0).UTC().Format(timestampLayout)
 
 		// endtimestamp of bucket = starttimestamp + collectionInterval
 		endTime := time.Unix(ts+int64(config.GetInt(analyticsCollectionInterval)), 0)
-		endtimestamp := endTime.Format(timestampLayout)
+		endtimestamp := endTime.UTC().Format(timestampLayout)
 
 		dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp
 		newPath := filepath.Join(localAnalyticsTempDir, dirName)
@@ -159,7 +161,7 @@
 
 		//Send event to close directory after endTime + 5
 		// seconds to make sure all buffers are flushed to file
-		timer := time.After(endTime.Sub(time.Now()) + time.Second*5)
+		timer := time.After(endTime.Sub(time.Now().UTC()) + time.Second*5)
 		go func() {
 			<-timer
 			closeBucketEvent <- newBucket
diff --git a/buffering_manager_test.go b/buffering_manager_test.go
index c3c747e..42cd512 100644
--- a/buffering_manager_test.go
+++ b/buffering_manager_test.go
@@ -12,7 +12,7 @@
 
 var _ = Describe("test getBucketForTimestamp()", func() {
 	It("should return new bucket or existing bucket if created previously", func() {
-		t := time.Date(2017, 1, 20, 10, 24, 5, 0, time.Local)
+		t := time.Date(2017, 1, 20, 10, 24, 5, 0, time.UTC)
 		tenant := tenant{Org: "testorg", Env: "testenv", TenantId: "tenantid"}
 
 		bucket, err := getBucketForTimestamp(t, tenant)
@@ -25,7 +25,7 @@
 		Expect(fw.file.Name()).To(ContainSubstring("20170120102400.20170120102600"))
 
 		// Should return existing bucket if same interval timestamp is passed
-		t2 := time.Date(2017, 1, 20, 10, 25, 5, 0, time.Local)
+		t2 := time.Date(2017, 1, 20, 10, 25, 5, 0, time.UTC)
 		bucket, err = getBucketForTimestamp(t2, tenant)
 		Expect(err).ShouldNot(HaveOccurred())
 		Expect(bucket.DirName).To(Equal("testorg~testenv~20170120102400"))
diff --git a/init.go b/init.go
index f865bc5..a0efde2 100644
--- a/init.go
+++ b/init.go
@@ -222,11 +222,11 @@
 	log.Debugf("sent signal to close closebucketevent channel")
 
 	// block on channel to ensure channel is closed
-	<- doneInternalBufferChan
+	<-doneInternalBufferChan
 	log.Debugf("closed internal buffer channel successfully")
 
 	// block on channel to ensure channel is closed
-	<- doneClosebucketChan
+	<-doneClosebucketChan
 	log.Debugf("closed closebucketevent channel successfully")
 
 	// Close all open files and move directories in tmp to staging.
diff --git a/uploader.go b/uploader.go
index fb8a267..64b1f40 100644
--- a/uploader.go
+++ b/uploader.go
@@ -76,6 +76,7 @@
 	// eg. date=2017-01-30/time=16-32/1069_20170130163200.20170130163400_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0.txt.gz
 	q.Add("relative_file_path", relativeFilePath)
 	q.Add("file_content_type", "application/x-gzip")
+	q.Add("encrypt", "true")
 	req.URL.RawQuery = q.Encode()
 
 	// Add Bearer Token to each request
@@ -113,6 +114,7 @@
 
 	req.Header.Set("Expect", "100-continue")
 	req.Header.Set("Content-Type", "application/x-gzip")
+	req.Header.Set("x-amz-server-side-encryption", "AES256")
 
 	fileStats, err := file.Stat()
 	if err != nil {