diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index 6f8683155cb5..d5365f301d84 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -39,62 +39,103 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) + var hash KeyValueHash batchNum := s.cfg.CompactionBatchLimit h := newKVHasher(prevCompactRev, compactMainRev, keep) last := make([]byte, 8+1+8) - for { - var rev Revision - - start := time.Now() - - tx := s.b.BatchTx() - tx.LockOutsideApply() - // gofail: var compactAfterAcquiredBatchTxLock struct{} - keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) - for i := range keys { - rev = BytesToRev(keys[i]) - if _, ok := keep[rev]; !ok { - tx.UnsafeDelete(schema.Key, keys[i]) - keyCompactions++ + toDelRevsChan := make(chan Revision, batchNum) + + go func() { + for { + var rev Revision + rtx := s.b.ReadTx() + rtx.RLock() + keys, values := rtx.UnsafeRange(schema.Key, last, end, int64(batchNum)) + rtx.RUnlock() + + for i := range keys { + rev = BytesToRev(keys[i]) + if _, ok := keep[rev]; !ok { + toDelRevsChan <- rev + } + h.WriteKeyValue(keys[i], values[i]) } - h.WriteKeyValue(keys[i], values[i]) - } - if len(keys) < batchNum { - // gofail: var compactBeforeSetFinishedCompact struct{} - UnsafeSetFinishedCompact(tx, compactMainRev) - tx.Unlock() - dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) - // gofail: var compactAfterSetFinishedCompact struct{} - hash := h.Hash() - size, sizeInUse := s.b.Size(), s.b.SizeInUse() - s.lg.Info( - "finished scheduled compaction", - zap.Int64("compact-revision", compactMainRev), - zap.Duration("took", time.Since(totalStart)), - zap.Int("number-of-keys-compacted", keyCompactions), - zap.Uint32("hash", hash.Hash), - zap.Int64("current-db-size-bytes", size), - zap.String("current-db-size", humanize.Bytes(uint64(size))), - zap.Int64("current-db-size-in-use-bytes", sizeInUse), - zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))), - ) - return hash, nil - } + if len(keys) < batchNum { + hash = h.Hash() + s.lg.Info( + "finished hash computation", + zap.Int64("compact-revision", compactMainRev), + zap.Duration("took", time.Since(totalStart)), + zap.Uint32("hash", hash.Hash), + ) + close(toDelRevsChan) + break + } - tx.Unlock() - // update last - last = RevToBytes(Revision{Main: rev.Main, Sub: rev.Sub + 1}, last) - // Immediately commit the compaction deletes instead of letting them accumulate in the write buffer - // gofail: var compactBeforeCommitBatch struct{} - s.b.ForceCommit() - // gofail: var compactAfterCommitBatch struct{} - dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) + // update last + last = RevToBytes(Revision{Main: rev.Main, Sub: rev.Sub + 1}, last) + } + }() + tx := s.b.BatchTx() + tx.LockOutsideApply() + start := time.Now() + completed := false + for !completed { select { - case <-time.After(s.cfg.CompactionSleepInterval): - case <-s.stopc: - return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal") + case rev, ok := <-toDelRevsChan: + // toDelRevsChan is closed + if !ok { + completed = true + break + } + + tx.UnsafeDelete(schema.Key, RevToBytes(rev, make([]byte, 8+1+8))) + keyCompactions++ + + if keyCompactions%batchNum == 0 { + tx.Unlock() + + // Immediately commit the compaction deletes instead of letting them accumulate in the write buffer + // gofail: var compactBeforeCommitBatch struct{} + s.b.ForceCommit() + // gofail: var compactAfterCommitBatch struct{} + dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) + + select { + case <-time.After(s.cfg.CompactionSleepInterval): + case <-s.stopc: + return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal") + } + + // reset tx + tx = s.b.BatchTx() + tx.LockOutsideApply() + // gofail: var compactAfterAcquiredBatchTxLock struct{} + start = time.Now() + } } } + + // gofail: var compactBeforeSetFinishedCompact struct{} + UnsafeSetFinishedCompact(tx, compactMainRev) + tx.Unlock() + dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) + // gofail: var compactAfterSetFinishedCompact struct{} + + size, sizeInUse := s.b.Size(), s.b.SizeInUse() + s.lg.Info( + "finished scheduled compaction", + zap.Int64("compact-revision", compactMainRev), + zap.Duration("took", time.Since(totalStart)), + zap.Int("number-of-keys-compacted", keyCompactions), + zap.Uint32("hash", hash.Hash), + zap.Int64("current-db-size-bytes", size), + zap.String("current-db-size", humanize.Bytes(uint64(size))), + zap.Int64("current-db-size-in-use-bytes", sizeInUse), + zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))), + ) + + return hash, nil }