leveldb 代码阅读二

1 Put 操作

1.1 接口函数:

  • 接口:

    // Convenience methods
    Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
      return DB::Put(o, key, val);
    }
    
  • WriteOption 结构

    // Options that control write operations
    struct LEVELDB_EXPORT WriteOptions {
      WriteOptions() = default;
    
      // If true, the write will be flushed from the operating system
      // buffer cache (by calling WritableFile::Sync()) before the write
      // is considered complete.  If this flag is true, writes will be
      // slower.
      //
      // If this flag is false, and the machine crashes, some recent
      // writes may be lost.  Note that if it is just the process that
      // crashes (i.e., the machine does not reboot), no writes will be
      // lost even if sync==false.
      //
      // In other words, a DB write with sync==false has similar
      // crash semantics as the "write()" system call.  A DB write
      // with sync==true has similar crash semantics to a "write()"
      // system call followed by "fsync()".
      bool sync = false;
    };
    
    

    WriteOptions 就是保存了一个是否 sync 的 bool 变量

  • put函数实现:
    class DBImpl : public DB

    // Default implementations of convenience methods that subclasses of DB
    // can call if they wish
    Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
      WriteBatch batch; //leveldb中不管单个插入还是多个插入都是以WriteBatch的方式进行的
      batch.Put(key, value);
      return Write(opt, &batch);
    }
    

    可以看到对于 Put 单个 kv 的情况,leveldb 默认实现也将它封装成一个 WriteBatch

1.2 write 函数

  • 函数实现:
    // 处理过程
    // 1. 队列化请求
    //     mutex l上锁之后, 到了"w.cv.Wait()"的时候, 会先释放锁等待, 然后收到signal时再次上锁. 
    //     这段代码的作用就是多线程在提交任务的时候,一个接一个push_back进队列. 
    //     但只有位于队首的线程有资格继续运行下去. 目的是把多个写请求合并成一个大batch提升效率.
    // 2. 写入的前期检查和保证
    // 3. 按格式组装数据为二进制
    // 4. 写入log文件和memtable
    // 5. 唤醒队列的其他人去干活,自己返回
    Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
      Writer w(&mutex_);
      w.batch = updates;
      w.sync = options.sync;
      w.done = false;
    
      //串行化writer。如果有其他writer在执行则进入队列等待被唤醒执行
      MutexLock l(&mutex_);
      writers_.push_back(&w);
      while (!w.done && &w != writers_.front()) {
        w.cv.Wait();
      }
    
      //writer的任务被其他writer帮忙执行了,则返回。BuildBatchGroup会有合并写的操作。
      if (w.done) {
        return w.status;
      }
    
      // May temporarily unlock and wait.
      // 写入前的各种检查。是否该停写,是否该切memtable,是否该compact
      Status status = MakeRoomForWrite(updates == nullptr);
    
      // 获取本次写入的版本号,其实就是个uint64
      uint64_t last_sequence = versions_->LastSequence();
      Writer* last_writer = &w;
      //这里writer还是队列中第一个,由于下面会队列前面的writers也可能合并起来,所以last_writer指针会指向被合并的最后一个writer
      if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
        WriteBatch* updates = BuildBatchGroup(&last_writer); //这里会把writers队列中的其他适合的写操作一起执行
        WriteBatchInternal::SetSequence(updates, last_sequence + 1); //把版本号写入batch中
        last_sequence += WriteBatchInternal::Count(updates); //updates如果合并了n条操作,版本号也会跳跃n
    
        // Add to log and apply to memtable.  We can release the lock
        // during this phase since &w is currently responsible for logging
        // and protects against concurrent loggers and concurrent writes
        // into mem_.
        {
          mutex_.Unlock();
          status = log_->AddRecord(WriteBatchInternal::Contents(updates));  //第一步写入log,用于故障恢复,防止数据丢失。
          bool sync_error = false;
          if (status.ok() && options.sync) {
            status = logfile_->Sync();
            if (!status.ok()) {
              sync_error = true;
            }
          }
          if (status.ok()) {
            status = WriteBatchInternal::InsertInto(updates, mem_); //插入memtable了
          }
          mutex_.Lock();
          if (sync_error) {
            // The state of the log file is indeterminate: the log record we
            // just added may or may not show up when the DB is re-opened.
            // So we force the DB into a mode where all future writes fail.
            RecordBackgroundError(status);
          }
        }
        if (updates == tmp_batch_) tmp_batch_->Clear();
    
        versions_->SetLastSequence(last_sequence);
      }
    
      // 将处理完的任务从队列里取出,并置状态为done,然后通知对应的CondVar启动。
      while (true) {
        Writer* ready = writers_.front();
        writers_.pop_front();
        if (ready != &w) {
          ready->status = status;
          ready->done = true;
          ready->cv.Signal();
        }
        if (ready == last_writer) break; //直到last_writer通知为止。
      }
    
      // Notify new head of write queue。通知队列中第一个writer干活。
      if (!writers_.empty()) {
        writers_.front()->cv.Signal();
      }
    
      return status;
    }
    
    

原文链接: https://www.cnblogs.com/lihaihui1991/p/14604330.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    leveldb 代码阅读二

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/400327

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年4月14日 下午2:05
下一篇 2023年4月14日 下午2:05

相关推荐