Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 89 additions & 48 deletions server/storage/mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,62 +39,103 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))

var hash KeyValueHash
batchNum := s.cfg.CompactionBatchLimit
h := newKVHasher(prevCompactRev, compactMainRev, keep)
last := make([]byte, 8+1+8)
for {
var rev Revision

start := time.Now()

tx := s.b.BatchTx()
tx.LockOutsideApply()
// gofail: var compactAfterAcquiredBatchTxLock struct{}
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
for i := range keys {
rev = BytesToRev(keys[i])
if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(schema.Key, keys[i])
keyCompactions++
toDelRevsChan := make(chan Revision, batchNum)

go func() {
for {
var rev Revision
rtx := s.b.ReadTx()
rtx.RLock()
keys, values := rtx.UnsafeRange(schema.Key, last, end, int64(batchNum))
rtx.RUnlock()

for i := range keys {
rev = BytesToRev(keys[i])
if _, ok := keep[rev]; !ok {
toDelRevsChan <- rev
}
h.WriteKeyValue(keys[i], values[i])
}
h.WriteKeyValue(keys[i], values[i])
}

if len(keys) < batchNum {
// gofail: var compactBeforeSetFinishedCompact struct{}
UnsafeSetFinishedCompact(tx, compactMainRev)
tx.Unlock()
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
// gofail: var compactAfterSetFinishedCompact struct{}
hash := h.Hash()
size, sizeInUse := s.b.Size(), s.b.SizeInUse()
s.lg.Info(
"finished scheduled compaction",
zap.Int64("compact-revision", compactMainRev),
zap.Duration("took", time.Since(totalStart)),
zap.Int("number-of-keys-compacted", keyCompactions),
zap.Uint32("hash", hash.Hash),
zap.Int64("current-db-size-bytes", size),
zap.String("current-db-size", humanize.Bytes(uint64(size))),
zap.Int64("current-db-size-in-use-bytes", sizeInUse),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))),
)
return hash, nil
}
if len(keys) < batchNum {
hash = h.Hash()
s.lg.Info(
"finished hash computation",
zap.Int64("compact-revision", compactMainRev),
zap.Duration("took", time.Since(totalStart)),
zap.Uint32("hash", hash.Hash),
)
close(toDelRevsChan)
break
}

tx.Unlock()
// update last
last = RevToBytes(Revision{Main: rev.Main, Sub: rev.Sub + 1}, last)
// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
// gofail: var compactBeforeCommitBatch struct{}
s.b.ForceCommit()
// gofail: var compactAfterCommitBatch struct{}
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
// update last
last = RevToBytes(Revision{Main: rev.Main, Sub: rev.Sub + 1}, last)
}
}()

tx := s.b.BatchTx()
tx.LockOutsideApply()
start := time.Now()
completed := false
for !completed {
select {
case <-time.After(s.cfg.CompactionSleepInterval):
case <-s.stopc:
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
case rev, ok := <-toDelRevsChan:
// toDelRevsChan is closed
if !ok {
completed = true
break
}

tx.UnsafeDelete(schema.Key, RevToBytes(rev, make([]byte, 8+1+8)))
keyCompactions++

if keyCompactions%batchNum == 0 {
tx.Unlock()

// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
// gofail: var compactBeforeCommitBatch struct{}
s.b.ForceCommit()
// gofail: var compactAfterCommitBatch struct{}
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))

select {
case <-time.After(s.cfg.CompactionSleepInterval):
case <-s.stopc:
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
}

// reset tx
tx = s.b.BatchTx()
tx.LockOutsideApply()
// gofail: var compactAfterAcquiredBatchTxLock struct{}
start = time.Now()
}
}
}

// gofail: var compactBeforeSetFinishedCompact struct{}
UnsafeSetFinishedCompact(tx, compactMainRev)
tx.Unlock()
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
// gofail: var compactAfterSetFinishedCompact struct{}

size, sizeInUse := s.b.Size(), s.b.SizeInUse()
s.lg.Info(
"finished scheduled compaction",
zap.Int64("compact-revision", compactMainRev),
zap.Duration("took", time.Since(totalStart)),
zap.Int("number-of-keys-compacted", keyCompactions),
zap.Uint32("hash", hash.Hash),
zap.Int64("current-db-size-bytes", size),
zap.String("current-db-size", humanize.Bytes(uint64(size))),
zap.Int64("current-db-size-in-use-bytes", sizeInUse),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))),
)

return hash, nil
}