1 Put 操作
1.1 接口函数:
-
接口:
// Convenience methods Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); }
-
WriteOption 结构
// Options that control write operations struct LEVELDB_EXPORT WriteOptions { WriteOptions() = default; // If true, the write will be flushed from the operating system // buffer cache (by calling WritableFile::Sync()) before the write // is considered complete. If this flag is true, writes will be // slower. // // If this flag is false, and the machine crashes, some recent // writes may be lost. Note that if it is just the process that // crashes (i.e., the machine does not reboot), no writes will be // lost even if sync==false. // // In other words, a DB write with sync==false has similar // crash semantics as the "write()" system call. A DB write // with sync==true has similar crash semantics to a "write()" // system call followed by "fsync()". bool sync = false; };
WriteOptions 就是保存了一个是否 sync 的 bool 变量
-
put函数实现:
class DBImpl : public DB
// Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; //leveldb中不管单个插入还是多个插入都是以WriteBatch的方式进行的 batch.Put(key, value); return Write(opt, &batch); }
可以看到对于 Put 单个 kv 的情况,leveldb 默认实现也将它封装成一个 WriteBatch
1.2 write 函数
- 函数实现:
// 处理过程 // 1. 队列化请求 // mutex l上锁之后, 到了"w.cv.Wait()"的时候, 会先释放锁等待, 然后收到signal时再次上锁. // 这段代码的作用就是多线程在提交任务的时候,一个接一个push_back进队列. // 但只有位于队首的线程有资格继续运行下去. 目的是把多个写请求合并成一个大batch提升效率. // 2. 写入的前期检查和保证 // 3. 按格式组装数据为二进制 // 4. 写入log文件和memtable // 5. 唤醒队列的其他人去干活,自己返回 Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false; //串行化writer。如果有其他writer在执行则进入队列等待被唤醒执行 MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } //writer的任务被其他writer帮忙执行了,则返回。BuildBatchGroup会有合并写的操作。 if (w.done) { return w.status; } // May temporarily unlock and wait. // 写入前的各种检查。是否该停写,是否该切memtable,是否该compact Status status = MakeRoomForWrite(updates == nullptr); // 获取本次写入的版本号,其实就是个uint64 uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; //这里writer还是队列中第一个,由于下面会队列前面的writers也可能合并起来,所以last_writer指针会指向被合并的最后一个writer if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); //这里会把writers队列中的其他适合的写操作一起执行 WriteBatchInternal::SetSequence(updates, last_sequence + 1); //把版本号写入batch中 last_sequence += WriteBatchInternal::Count(updates); //updates如果合并了n条操作,版本号也会跳跃n // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); //第一步写入log,用于故障恢复,防止数据丢失。 bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); //插入memtable了 } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (updates == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); } // 将处理完的任务从队列里取出,并置状态为done,然后通知对应的CondVar启动。 while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; //直到last_writer通知为止。 } // Notify new head of write queue。通知队列中第一个writer干活。 if (!writers_.empty()) { writers_.front()->cv.Signal(); } return status; }
原文链接: https://www.cnblogs.com/lihaihui1991/p/14604330.html
欢迎关注
微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;
也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬
原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/400327
非原创文章文中已经注明原地址,如有侵权,联系删除
关注公众号【高性能架构探索】,第一时间获取最新文章
转载文章受原作者版权保护。转载请注明原作者出处!