经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
boltdb一瞥
来源:cnblogs  作者:by_mzy  时间:2024/7/12 12:44:16  对本文有异议

boltdb

网上关于boltdb的文章有很多,特别是微信公众号上,例如:
boltdb源码分析系列-事务-腾讯云开发者社区-腾讯云 (tencent.com)
这些文章都写的挺好,但不一定覆盖了我所关注的几个点,下面我把我关注的几个点就来下来。

node page bucket tx db的关系

  • 磁盘数据mmap到page内存区域,也可以理解为就是磁盘数据
    • page需要一段连续的内存
  • node封装的B+树节点数据结构
  • bucket一个B+树数据结构。可以理解成一个表
  • tx 读事务或读写事务
    • bucket是内存结构每个tx中都会生成一个
    • 会将tx中涉及到(读取过、修改过)的nodes都记录在bucket中
    • 读写事务最终写入磁盘时是需要重新申请新的page的,即不会修改原有的page
  • db整个数据库文件
    • db中的freelist记录了db文件中空闲的页(即已经可以释放掉的页)

tx.commit

在boltdb的 commit中才会执行b+树的rebalance操作,执行完后再进行写入磁盘的操作。也就是说在一个事务中涉及到的多次写操作,会最终在commit的时候同意执行写入磁盘spill操作。

  1. func (tx *Tx) Commit() error {
  2. _assert(!tx.managed, "managed tx commit not allowed")
  3. if tx.db == nil {
  4. return ErrTxClosed
  5. } else if !tx.writable {
  6. return ErrTxNotWritable
  7. }
  8. // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
  9. // Rebalance nodes which have had deletions.
  10. var startTime = time.Now()
  11. tx.root.rebalance()
  12. if tx.stats.Rebalance > 0 {
  13. tx.stats.RebalanceTime += time.Since(startTime)
  14. }
  15. // spill data onto dirty pages.
  16. startTime = time.Now()
  17. if err := tx.root.spill(); err != nil {
  18. tx.rollback()
  19. return err
  20. }

也正因为txn中可能有多个key插入,所以split就可能会进行多次

  1. func (n *node) split(pageSize int) []*node {
  2. var nodes []*node
  3. node := n
  4. for {
  5. // Split node into two.
  6. a, b := node.splitTwo(pageSize)
  7. nodes = append(nodes, a)
  8. // If we can't split then exit the loop.
  9. if b == nil {
  10. break
  11. }
  12. // Set node to b so it gets split on the next iteration.
  13. node = b
  14. }
  15. return nodes
  16. }
  17. node.go

数据写入到磁盘的时候,是从下层节点往上层节点写的

  1. // spill writes the nodes to dirty pages and splits nodes as it goes.
  2. // Returns an error if dirty pages cannot be allocated.
  3. func (n *node) spill() error {
  4. var tx = n.bucket.tx
  5. if n.spilled {
  6. return nil
  7. }
  8. // Spill child nodes first. Child nodes can materialize sibling nodes in
  9. // the case of split-merge so we cannot use a range loop. We have to check
  10. // the children size on every loop iteration.
  11. sort.Sort(n.children)
  12. for i := 0; i < len(n.children); i++ {
  13. if err := n.children[i].spill(); err != nil {
  14. return err
  15. }
  16. }
  17. // We no longer need the child list because it's only used for spill tracking.
  18. n.children = nil
  19. // Split nodes into appropriate sizes. The first node will always be n.
  20. var nodes = n.split(tx.db.pageSize)
  21. node.go

数据较大如何处理?直接将构造一个大的page将数据存储进去。与此同时,原先node关联的page可以释放掉了。因为整个是一个append only模式,原先的page在新事务生成,且没有其他读事务访问后就可以释放掉了。

  1. for _, node := range nodes {
  2. // Add node's page to the freelist if it's not new.
  3. if node.pgid > 0 {
  4. tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
  5. node.pgid = 0
  6. }
  7. // Allocate contiguous space for the node.
  8. p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
  9. if err != nil {
  10. return err
  11. }
  12. node.go

哪些node需要rebalance呢,size < 25% page_size或者中间节点小于2个key,叶子节点小于1个key。

  1. func (n *node) rebalance() {
  2. if !n.unbalanced {
  3. return
  4. }
  5. n.unbalanced = false
  6. // Update statistics.
  7. n.bucket.tx.stats.Rebalance++
  8. // Ignore if node is above threshold (25%) and has enough keys.
  9. var threshold = n.bucket.tx.db.pageSize / 4
  10. if n.size() > threshold && len(n.inodes) > n.minKeys() {
  11. return
  12. }
  13. node.go

bucket中读到了node,就将node加入到bucket中,读到了就意味着这些node可能就会发生改变。它是在cursor移动的时候加入到bucket中的。

  1. func (c *Cursor) node() *node {
  2. _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")
  3. // If the top of the stack is a leaf node then just return it.
  4. if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
  5. return ref.node
  6. }
  7. // Start from root and traverse down the hierarchy.
  8. var n = c.stack[0].node
  9. if n == nil {
  10. n = c.bucket.node(c.stack[0].page.id, nil)
  11. }
  12. for _, ref := range c.stack[:len(c.stack)-1] {
  13. _assert(!n.isLeaf, "expected branch node")
  14. n = n.childAt(int(ref.index))
  15. }
  16. _assert(n.isLeaf, "expected leaf node")
  17. return n
  18. }
  1. // node creates a node from a page and associates it with a given parent.
  2. func (b *Bucket) node(pgid pgid, parent *node) *node {
  3. _assert(b.nodes != nil, "nodes map expected")
  4. // Retrieve node if it's already been created.
  5. if n := b.nodes[pgid]; n != nil {
  6. return n
  7. }
  8. // Otherwise create a node and cache it.
  9. n := &node{bucket: b, parent: parent}
  10. if parent == nil {
  11. b.rootNode = n
  12. } else {
  13. parent.children = append(parent.children, n)
  14. }
  15. // Use the inline page if this is an inline bucket.
  16. var p = b.page
  17. if p == nil {
  18. p = b.tx.page(pgid)
  19. }
  20. // Read the page into the node and cache it.
  21. n.read(p)
  22. b.nodes[pgid] = n
  23. // Update statistics.
  24. b.tx.stats.NodeCount++

freelist

它表示的是磁盘中已经释放的页

结构

  • ids 所有空闲页
  • pending {txid, pageids[]}即将释放的txid以及其关联的pageid
  • cache map索引

->pending 释放实际

  • tx.commit时会将事务中涉及到的老的node对应的page都放到pending中
    • node.spill中将关联的旧node(node与page对应)放到freelist的pending中

pending->release释放时机

tx的commit阶段会将事务涉及的原先老page放到freelist的pending中。

  1. func (f *freelist) free(txid txid, p *page) {
  2. if p.id <= 1 {
  3. panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id))
  4. }
  5. // Free page and all its overflow pages.
  6. var ids = f.pending[txid]
  7. for id := p.id; id <= p.id+pgid(p.overflow); id++ {
  8. // Verify that page is not already free.
  9. if f.cache[id] {
  10. panic(fmt.Sprintf("page %d already freed", id))
  11. }
  12. // Add to the freelist and cache.
  13. ids = append(ids, id)
  14. f.cache[id] = true
  15. }
  16. f.pending[txid] = ids
  17. }

db.beginRWTx 开启读写事务的时候会尝试将过期的page释放掉

  1. func (f *freelist) release(txid txid) {
  2. m := make(pgids, 0)
  3. for tid, ids := range f.pending {
  4. if tid <= txid {
  5. // Move transaction's pending pages to the available freelist.
  6. // Don't remove from the cache since the page is still free.
  7. m = append(m, ids...)
  8. delete(f.pending, tid)
  9. }
  10. }
  11. sort.Sort(m)
  12. f.ids = pgids(f.ids).merge(m)
  13. }

原文链接:https://www.cnblogs.com/bymzy/p/18298137

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号