Merge src/leveldb changes for LevelDB 1.15

This commit is contained in:
Pieter Wuille
2013-12-12 22:08:18 +01:00
31 changed files with 449 additions and 366 deletions

View File

@@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
seed_(0),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL),
consecutive_compaction_errors_(0) {
manual_compaction_(NULL) {
mem_->Ref();
has_imm_.Release_Store(NULL);
@@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
void DBImpl::DeleteObsoleteFiles() {
if (!bg_error_.ok()) {
// After a background error, we don't know whether a new version may
// or may not have been committed, so we cannot safely garbage collect.
return;
}
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
@@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
return s;
}
Status DBImpl::CompactMemTable() {
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != NULL);
@@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() {
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
}
return s;
}
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
@@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
}
MutexLock l(&mutex_);
while (!manual.done) {
while (manual_compaction_ != NULL) {
bg_cv_.Wait();
}
manual_compaction_ = &manual;
MaybeScheduleCompaction();
while (manual_compaction_ == &manual) {
while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
if (manual_compaction_ == NULL) { // Idle
manual_compaction_ = &manual;
MaybeScheduleCompaction();
} else { // Running either my compaction or another compaction.
bg_cv_.Wait();
}
}
if (manual_compaction_ == &manual) {
// Cancel my manual compaction since we aborted early for some reason.
manual_compaction_ = NULL;
}
}
Status DBImpl::TEST_CompactMemTable() {
@@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() {
return s;
}
void DBImpl::RecordBackgroundError(const Status& s) {
mutex_.AssertHeld();
if (bg_error_.ok()) {
bg_error_ = s;
bg_cv_.SignalAll();
}
}
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
@@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(bg_compaction_scheduled_);
if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundCompaction();
if (s.ok()) {
// Success
consecutive_compaction_errors_ = 0;
} else if (shutting_down_.Acquire_Load()) {
// Error most likely due to shutdown; do not wait
} else {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
++consecutive_compaction_errors_;
int seconds_to_sleep = 1;
for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
seconds_to_sleep *= 2;
}
env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
mutex_.Lock();
}
if (shutting_down_.Acquire_Load()) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
BackgroundCompaction();
}
bg_compaction_scheduled_ = false;
@@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() {
bg_cv_.SignalAll();
}
Status DBImpl::BackgroundCompaction() {
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
if (imm_ != NULL) {
return CompactMemTable();
CompactMemTable();
return;
}
Compaction* c;
@@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() {
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
@@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() {
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
@@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() {
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
}
if (is_manual) {
@@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() {
}
manual_compaction_ = NULL;
}
return status;
}
void DBImpl::CleanupCompaction(CompactionState* compact) {
@@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (status.ok()) {
status = InstallCompactionResults(compact);
}
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
@@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();