|  | /* | 
|  | Copyright 2016 The Transicator Authors | 
|  |  | 
|  | Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | you may not use this file except in compliance with the License. | 
|  | You may obtain a copy of the License at | 
|  |  | 
|  | http://www.apache.org/licenses/LICENSE-2.0 | 
|  |  | 
|  | Unless required by applicable law or agreed to in writing, software | 
|  | distributed under the License is distributed on an "AS IS" BASIS, | 
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | See the License for the specific language governing permissions and | 
|  | limitations under the License. | 
|  | */ | 
|  | package main | 
|  |  | 
|  | import ( | 
|  | "time" | 
|  |  | 
|  | log "github.com/Sirupsen/logrus" | 
|  | ) | 
|  |  | 
|  | type cleaner struct { | 
|  | s        *server | 
|  | maxAge   time.Duration | 
|  | stopChan chan bool | 
|  | } | 
|  |  | 
|  | func (s *server) startCleanup(maxAge time.Duration) { | 
|  | c := &cleaner{ | 
|  | s:        s, | 
|  | maxAge:   maxAge, | 
|  | stopChan: make(chan bool, 1), | 
|  | } | 
|  | s.cleaner = c | 
|  | go c.run() | 
|  | } | 
|  |  | 
|  | func (c *cleaner) stop() { | 
|  | c.stopChan <- true | 
|  | } | 
|  |  | 
|  | func (c *cleaner) run() { | 
|  | tick := time.NewTicker(cleanupDelay(c.maxAge)) | 
|  | defer tick.Stop() | 
|  |  | 
|  | for { | 
|  | select { | 
|  | case <-tick.C: | 
|  | c.performCleanup() | 
|  | case <-c.stopChan: | 
|  | return | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | func (c *cleaner) performCleanup() { | 
|  | cleanupAge := time.Now().Add(-c.maxAge) | 
|  | log.Debugf("Cleaning up data records since before %v", cleanupAge) | 
|  |  | 
|  | // Get the current last sequence | 
|  | _, _, lastSeq, err := c.s.db.Scan(nil, 0, 0, 0, nil) | 
|  | if err != nil { | 
|  | log.Errorf("Error after preparing for cleanup: %s", err) | 
|  | return | 
|  | } | 
|  |  | 
|  | // Always insert a dummy record before cleaning up. That way when the | 
|  | // database is empty, we'll still have the last change number in there. | 
|  | err = c.s.db.Put(internalScope, lastSeq.LSN, lastSeq.Index, nil) | 
|  | if err != nil { | 
|  | log.Errorf("Error after preparing for cleanup: %s", err) | 
|  | return | 
|  | } | 
|  |  | 
|  | // Now we can do the cleanup knowing that there will still be one record | 
|  | // so we can keep track of the highest sequence that we processed. | 
|  | cleanupCount, err := c.s.db.Purge(cleanupAge) | 
|  |  | 
|  | if err != nil { | 
|  | log.Errorf("Error after cleaning up %d records: %s", cleanupCount, err) | 
|  | } else if cleanupCount > 0 { | 
|  | log.Infof("Purged %d old records from the database", cleanupCount) | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | cleanupDelay selects how often to run the cleanup task based | 
|  | on the duration. | 
|  | */ | 
|  | func cleanupDelay(maxAge time.Duration) time.Duration { | 
|  | if maxAge <= time.Minute { | 
|  | return time.Second | 
|  | } | 
|  | if maxAge < 5*time.Minute { | 
|  | return 5 * time.Second | 
|  | } | 
|  | if maxAge < time.Hour { | 
|  | return 5 * time.Minute | 
|  | } | 
|  | return time.Hour | 
|  | } |