[XAPID-854] Delete bucket from bucket map only if successfully closed and moved to staging
diff --git a/buffering_manager.go b/buffering_manager.go
index 6fb156d..b52c89c 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -8,8 +8,8 @@
"fmt"
"os"
"path/filepath"
- "time"
"sync"
+ "time"
)
const fileExtension = ".txt.gz"
@@ -30,7 +30,7 @@
var bucketMaplock = sync.RWMutex{}
type bucket struct {
- keyTS int64
+ keyTS int64
DirName string
// We need file handle and writer to close the file
FileWriter fileWriter
@@ -52,7 +52,6 @@
bucketMap = make(map[int64]bucket)
bucketMaplock.Unlock()
-
// Keep polling the internal buffer for new messages
go func() {
for {
@@ -81,14 +80,14 @@
// staging to indicate its ready for upload
err := os.Rename(dirToBeClosed, stagingPath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from" +
+ log.Errorf("Cannot move directory '%s' from"+
" tmp to staging folder due to '%s", bucket.DirName, err)
+ } else {
+ // Remove bucket from bucket map once its closed successfully
+ bucketMaplock.Lock()
+ delete(bucketMap, bucket.keyTS)
+ bucketMaplock.Unlock()
}
-
- // Remove bucket from bucket map once its closed successfully
- bucketMaplock.Lock()
- delete(bucketMap, bucket.keyTS)
- bucketMaplock.Unlock()
}
}()
}
diff --git a/init.go b/init.go
index 386aa1e..c8ac1f0 100644
--- a/init.go
+++ b/init.go
@@ -146,7 +146,7 @@
// Create a listener for shutdown event and register callback
h := func(event apid.Event) {
log.Infof("Received ApidShutdown event. %v", event)
- shutdownPlugin();
+ shutdownPlugin()
return
}
log.Infof("registered listener for shutdown event")
@@ -228,10 +228,10 @@
if err != nil {
log.Errorf("Cannot move directory '%s' from"+
" tmp to staging folder due to '%s", bucket.DirName, err)
+ } else {
+ bucketMaplock.Lock()
+ delete(bucketMap, bucket.keyTS)
+ bucketMaplock.Unlock()
}
-
- bucketMaplock.Lock()
- delete(bucketMap, bucket.keyTS)
- bucketMaplock.Unlock()
}
-}
\ No newline at end of file
+}