[XAPID-854] Delete bucket from bucket map only if successfully closed and moved to staging
diff --git a/buffering_manager.go b/buffering_manager.go index 6fb156d..b52c89c 100644 --- a/buffering_manager.go +++ b/buffering_manager.go
@@ -8,8 +8,8 @@ "fmt" "os" "path/filepath" - "time" "sync" + "time" ) const fileExtension = ".txt.gz" @@ -30,7 +30,7 @@ var bucketMaplock = sync.RWMutex{} type bucket struct { - keyTS int64 + keyTS int64 DirName string // We need file handle and writer to close the file FileWriter fileWriter @@ -52,7 +52,6 @@ bucketMap = make(map[int64]bucket) bucketMaplock.Unlock() - // Keep polling the internal buffer for new messages go func() { for { @@ -81,14 +80,14 @@ // staging to indicate its ready for upload err := os.Rename(dirToBeClosed, stagingPath) if err != nil { - log.Errorf("Cannot move directory '%s' from" + + log.Errorf("Cannot move directory '%s' from"+ " tmp to staging folder due to '%s", bucket.DirName, err) + } else { + // Remove bucket from bucket map once its closed successfully + bucketMaplock.Lock() + delete(bucketMap, bucket.keyTS) + bucketMaplock.Unlock() } - - // Remove bucket from bucket map once its closed successfully - bucketMaplock.Lock() - delete(bucketMap, bucket.keyTS) - bucketMaplock.Unlock() } }() }
diff --git a/init.go b/init.go index 386aa1e..c8ac1f0 100644 --- a/init.go +++ b/init.go
@@ -146,7 +146,7 @@ // Create a listener for shutdown event and register callback h := func(event apid.Event) { log.Infof("Received ApidShutdown event. %v", event) - shutdownPlugin(); + shutdownPlugin() return } log.Infof("registered listener for shutdown event") @@ -228,10 +228,10 @@ if err != nil { log.Errorf("Cannot move directory '%s' from"+ " tmp to staging folder due to '%s", bucket.DirName, err) + } else { + bucketMaplock.Lock() + delete(bucketMap, bucket.keyTS) + bucketMaplock.Unlock() } - - bucketMaplock.Lock() - delete(bucketMap, bucket.keyTS) - bucketMaplock.Unlock() } -} \ No newline at end of file +}