Apr 6, 2017 - influxdb的tsm引擎(1)

TSM是时序数据库Influxdb使用的存储引擎,可以看做是LSM的一种实现。本文以Influxdb中内嵌的引擎包tsm1为源代码,学习TSM的数据结构与具体实现。

引擎接口

type Engine interface {
	Open() error
	Close() error
	WithLogger(zap.Logger)
	LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error
	Backup(w io.Writer, basePath string, since time.Time) error
	Restore(r io.Reader, basePath string) error
	CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
	WritePoints(points []models.Point) error
	ContainsSeries(keys []string) (map[string]bool, error)
	DeleteSeries(keys []string) error
	DeleteSeriesRange(keys []string, min, max int64) error
	DeleteMeasurement(name string, seriesKeys []string) error
	SeriesCount() (n int, err error)
	MeasurementFields(measurement string) *MeasurementFields
	CreateSnapshot() (string, error)
	SetEnabled(enabled bool)
	Format() EngineFormat
	Statistics(tags map[string]string) []models.Statistic
	LastModified() time.Time
  io.WriterTo
}

tsm1.Engine实现了tsdb.Engine

WritePoints方法

从Point结构生成一条kv记录,key为influxdb组合的measurement+tag的组合,value为拼接结构。

  err := e.Cache.WriteMulti(values)
	if err != nil {
		return err
	}

	_, err = e.WAL.WritePoints(values)
	return err

拼接完成后,首先调用Cache写入,然后调用WAL写入,Cache的核心是一个soter接口

type storer interface {
	entry(key string) (*entry, bool)                // Get an entry by its key.
	write(key string, values Values) error          // Write an entry to the store.
	add(key string, entry *entry)                   // Add a new entry to the store.
	remove(key string)                              // Remove an entry from the store.
	keys(sorted bool) []string                      // Return an optionally sorted slice of entry keys.
	apply(f func(string, *entry) error) error       // Apply f to all entries in the store in parallel.
	applySerial(f func(string, *entry) error) error // Apply f to all entries in serial.
	reset()                                         // Reset the store to an initial unused state.
}

文件tsdb/engine/tsm1/ring.go中结构ring实现了storer接口

type ring struct {
	// The unique set of partitions in the ring.
	// len(partitions) <= len(continuum)
	partitions []*partition

	// A mapping of partition to location on the ring continuum. This is used
	// to lookup a partition.
	continuum []*partition

	// Number of keys within the ring. This is used to provide a hint for
	// allocating the return values in keys(). It will not be perfectly accurate
	// since it doesn't consider adding duplicate keys, or trying to remove non-
	// existent keys.
	keysHint int64
}

ring的本质是一个大小为4096的hash表,由key做hashkey,将值分散到各个slot中。

Apr 1, 2017 - golang1.8releasenote

原文链接

1.8简介

       Go语言1.8版本的Release在时隔1.7六个月后发布了。大多数改变集中在工具链、运行时、库函数的实现。也有两处语言定义的小变化。一如往常,这个版本保持了Go语言1.x版本的兼容了。所有的gopher继续像往常一样编译、运行golang。

       这个版本:

  • 增加了对32位MIPS(一种处理器架构)的支持
  • 更新了编译器后端(也就是1.7发布的基于SSA的新后端)以生成效率更好的代码
  • 通过消除STW栈重扫(stop-the-world stack rescanning)减少GC停顿,
  • 增加http/2 push支持
  • 增加http graceful shutdown
  • 增加更多的context支持
  • 增加对mutex的profiling
  • 简化对slice的排序。

语言的变化

       当隐式地将一个结构体实例转换到另一个结构体类型时,在go1.8中,字段tag会被忽略。因此,两个只有tag有区别的结构体就可以互相转换。

func example() {
	type T1 struct {
		X int `json:"foo"`
	}
	type T2 struct {
		X int `json:"bar"`
	}
	var v1 T1
	var v2 T2
	v1 = T1(v2) // 现在合法
}

       语言定义现在只要求实现支持最多16位(16-bit)指数的浮点常量。这不影响’gc’或’gccgo’编译器,两者都支持32位。

工具

汇编

针对64位x86系统,增加了一下指令VBROADCASTSD, BROADCASTSS, MOVDDUP, MOVSHDUP, MOVSLDUP, VMOVDDUP, VMOVSHDUP, and VMOVSLDUP.

yacc

yacc工具(go tool yacc)被移除了。在1.7版本,编译器不再使用。他被移出了工具库,现在位于地址=》https://godoc.org/golang.org/x/tools/cmd/goyacc

fix

fix工具有了一个新的修复,可以将“golang.org/x/net/context”改到“context”。

Pprof

pprof工具现在可以profile TLS服务器,并且通过使用”https+insecure”URL范式来跳过证书验证。 callgrind输出现在有了指令级别的粒度。

Trace

trace工具现在有了一个新的-pprof选项来产生pprof兼容的数据。 GC事件在执行追踪视图中更加清楚地展示了出来。GC活动在自己的行上展示,GC协程被他们自己的角色标注了。

Vet

Vet现在检查对锁数组的拷贝、重复的json、xml结构体字段tag,非空格分割的结构体tag,http请求体的关闭,Printf中使用的参数。

编译器工具链

Go1.7介绍了一个x86-64系统的新的编译器后端。在1.8中,这个后端更加精进,现在已经用于所有平台。 这个新的后端,基于SSA,生成更加压缩、高效的代码,并且提供了更好的平台优化,比如边界检查消除。这个新的后端在32位系统中可以减少性能测试时20%~30%的CPU需求。对于已经使用了1.7版本中SSA的64位x86系统,收益是0%~10%。其他平台的数据更接近于32位ARM。 在1.7中的-ssa=0临时选项在1.8中被移除了。 除了全平台支持新的编译器后端之外,Go1.8介绍了一个新的编译器前端。这个新的编译器前端对用户无感知,但是是未来性能优化工作的基础。 编译器和连接器优化过比Go1.7跑得更快,虽然仍然低于我们的预期,在未来会继续优化。相比于前一个版本,Go1.8大约快了15%。

Cgo

这个Go工具现在在make.bash时记住环境变量CGO_ENABLED,并将它应用到以后的编译中,以修复issue#12808。在做原生编译时,隐式设置CGO_ENABLED环境变量十分必要,因为make.bash会自动检查正确的设置。隐式设置CGO_ENABLED的主要原因是当你的环境支持时,但你不希望隐式使用cgo,可以在make.bash或all.bash时,设置CGO_ENABLED=0。 环境变量PKG_CONFIG现在可以用来直接配置cgo的包依赖(pkg-config)。pkg-config默认使用最早的releases。这是为了更方便的交叉编译cgo代码。 cgo工具现在支持一个-srcdir选项,这是一个过去用在go命令上的选项。 如果cgo代码调用了C.malloc,并且malloc返回了NULL,程序会返回一个oom错误并崩溃。C.malloc永远不会返回nil,不想其他C函数,C.malloc不应该使用多值返回来处理错误。 如果cgo传一个union指针来调用一个C函数,并且如果这个C union结构能包含任何指针值,并且如果允许cgo指针检查,这个union值现在当做Go指针来检查

Gccgo

因为Go半年发布一次,而GCC一年发布一次,所以GCC6包含了1.6.1版本的gccgo。我们希望GCC7能包含Go1.8版本的gccgo。

默认GOPATH

现在GOPATH有了一个默认值。默认指向 unix系统上为$HOME/go,windows系统上为%USERPROFILE%/go

Go get

HTTP代理环境变量现在能够一直对go get 命令生效,无论-insecure选项是否使用了。在之前的版本中,-insecure选项在没有设置代理时有副作用。

Go bug

新的go bug命令在github上生成一个bug报告,报告中预写了当前系统的信息。

Go doc

go doc命令现在通过类型将常量和变量分组,与godoc的行为一致 为了提升文档的可读性,所有第一层项目(the first-level items)的总结都保证独占一行。 interface定义中的指定方法,可以生成文档,比如”go doc net.Conn.SetDeadline”.

插件/补丁

Go现在支持“plugin”模式下的用Go写的插件,和一个新的plugin包用来在运行时加载。插件支持现在只在Linux平台上可用。请随时提交议题。

运行时

参数声明周期

gc不再认为参数的声明周期是贯穿整个函数的。更多信息,或学习如何使得变量存活,可以在1.7新增的runtime.KeepAlive方法中看到。 更新:对变量设置finalizer方法的对象现在可能需要增加runtime.KeepAlive方法的调用。阅读KeepAlive的文档获取更多细节。

Concurrent Map 误用

在Go1.6,运行时增加了轻量级、尽力尝试的对maps并发误用的检测。这个版本增加了检测器,支持对并发写和遍历的检测。 一如往常,如果一个协程正在对map写,其他协程都不应该读(包括遍历)或者并发写。如果运行时检查到了这个情况,就会打印出诊断信息并且引起崩溃。最好的发现这个问题的方法是在竞态检测下运行程序,可以更加可靠地证明竞争,以及给出更多细节。

MemStats文档

runtime.MemStats类型有了更加完善的文档。

性能

像之前一样,变化非常广泛和多元化,很难做出对性能的精确描述。大多数程序应该能够跑得更快一些,因为gc的提速和标准库的优化。 多个包的实现上都有优化,包括以下:

bytes, crypto/aes, crypto/cipher, crypto/elliptic, crypto/sha256, crypto/sha512, encoding/asn1, encoding/csv, encoding/hex, encoding/json, hash/crc32, image/color, image/draw, math, math/big, reflect, regexp, runtime, strconv, strings, syscall, text/template, unicode/utf8.

GC

GC停顿现在相比Go1.7应该显著缩短了,通常低于100微秒,并且经常低于10微妙。更多详情见关于淘汰stw栈重扫的文档。Go1.9会完成更多工作。

Defer

defer函数的调用开销减半了。

Cgo

从Go调用进C的开销减半。

标准库

例子

更多包的例子加进了文档。

排序

sort包现在包含了一个通用的函数Slice以排序一个slice以给定的函数。这意味着不再需要实现一个新的sorter类型。此外还有新的函数SliceStableSliceIsSorted

HTTP/2 Push

net/http包现在包含了发送HTTP/2从handler推送的机制。跟已存的Flusher和Hijacker接口类似,一个Http/2的ResponseWriter现在实现了新的Pusher接口。

HTTP Server Graceful Shutdown

HTTP Server现在支持用新的Server.Shutdown方法来支持graceful shutdown和新的Server.Close来突然关闭。

更多的Context支持

紧接着Go1.7将context.Context采纳进标准库,Go1.8增加了更多的支持到现存的包: 新的Server.Shutdown接受一个context参数。 database/sql包增加了很多对context的支持。 全部九个新的net.Resolver的lookup方法现在持有一个context.

Mutex Contention Profiling

现在运行时支持分析竞争互斥锁。 大多数用户会希望在使用go test时指定-mutexprofile选项,,并且在相关文件上使用pprof。 通过新的MutexProfileSetMutexProfileFraction可以实现更低层次的支持。 一个已知的Go1.8的局限是,这个profile只支持sync.Mutex而不支持sync.RWMutex

库的微小改动

像往常一样,在保证Go1.x兼容的情况下。对标准库做了诸多微小改动和更新。以下的章节罗列了用户可见的改动和增加。性能优化和bug修复没有再次列出。(译者注:详情不再列出了)

Mar 30, 2017 - mgo的session与连接池

简介

       mgo是由Golang编写的开源mongodb驱动。由于mongodb官方并没有开发Golang驱动,因此这款驱动被广泛使用。mongodb官网也推荐了这款开源驱动,并且作者在github也表示受到了mongodb官方的赞助。但由于作者的个人安排原因,该驱动的更新、bug修复、issue维护略微受到诟病。

       mgo在功能方面还是比较完善的,api使用也方便。由于mongodb丰富的玩法,mgo代码庞大,其中大部分是与mongodb的协议代码。核心的处理连接和请求的结构,逻辑上还是比较清晰的。

简单的使用

func dial() {
	session ,_ := mgo.Dial("mongodb://127.0.0.1")
}

mgo面向调用者的核心数据结构是mgo.Session,dial函数演示了如何获取一个session

func foo1() {
	session.DB("test").C("coll").Insert(bson.M{"name":"zhangsan"})
}

foo1函数通过生成的session,向test数据库的coll集合写入了一条数据。

但mgo的正确使用方法并非如此,而是应该在每次使用时从源session拷贝

func foo2() {
	s := session.Copy()
	defer s.Close()
	s.DB("test").C("coll").Insert(bson.M{"name":"zhangsan"})
}

foo2函数从源session拷贝出了一个临时的session,使用临时session写入一条数据,在函数退出时关闭这个临时的session。

session的拷贝与并发

       为什么要在每次使用时都Copy,而不是直接使用Dial生成的session实例呢?个人认为,这与mgo.Session的Socket缓存机制有关。来看Session的核心数据结构。

type Session struct {
	m                sync.RWMutex
	...
	slaveSocket      *mongoSocket
	masterSocket     *mongoSocket
	...
	consistency      Mode
	...
	poolLimit        int
	...
}

这里列出了mgo.Session的五个私有成员变量,与Copy机制有关的是,m,slaveSocket,masterSocket。

mmgo.Session的并发锁,因此所有的Session实例都是线程安全的。

slaveSocket,masterSocket代表了该Session到mongodb主节点和从节点的一个物理连接的缓存。而Session的策略总是优先使用缓存的连接。是否缓存连接,由consistency也就是该Session的模式决定。假设在并发程序中,使用同一个Session实例,不使用Copy,而该Session实例的模式又恰好会缓存连接,那么,所有的通过该Session实例的操作,都会通过同一条连接到达mongodb。虽然mongodb本身的网络模型是非阻塞通信,请求可以通过一条链路,非阻塞地处理;但经过比较简陋的性能测试,在mongodb3.0中,10条连接并发写比单条连接的效率高一倍(在mongodb3.4中基本没有差别)。所以,使用Session Copy的一个重要原因是,可以将请求并发地分散到多个连接中。

以上只是效率问题,但第二个问题是致命的。mgo.Session缓存的一主一从连接,实例本身不负责维护。也就是说,当slaveSocket,masterSocket任意其一,连接断开,Session自己不会重置缓存,该Session的使用者如果不主动重置缓存,调用者得到的将永远是EOF。这种情况在主从切换时就会发生,在网络抖动时也会发生。在业务代码中主动维护数据库Session的可用性,显然是不招人喜欢的。

func (s *Session) Copy() *Session {
	s.m.Lock()
	scopy := copySession(s, true)
	s.m.Unlock()
	scopy.Refresh()
	return scopy
}

以上是Copy函数的实现,解决了使用全局Session的两个问题。其中,copySession将源Session浅拷贝到临时Session中,这样源Session的配置就拷贝到了临时Session中。关键的Refresh,将源Session浅拷贝到临时Session的连接缓存指针,也就是slaveSocket,masterSocket置为空,这样临时Session就不存在缓存连接,而转为去尝试获取一个空闲的连接。

Session的连接从哪里来?连接池

明确了使用Session Copy机制的必要性,那么问题来了,Copy出来的临时Session是怎么获取一个到mongodb的物理连接的。答案就是连接池。mgo自身维护了一套到mongodb集群的连接池。这套连接池机制以mongodb数据库服务器为最小单位,每个mongodb都会在mgo内部,对应一个mongoServer结构体的实例,一个实例代表着mgo持有的到该数据库的连接。来看该连接池的定义。

type mongoServer struct {
	sync.RWMutex
	...
	unusedSockets []*mongoSocket
	liveSockets   []*mongoSocket
	...
	info          *mongoServerInfo
}

其中,info代表了该实例对应的数据库服务器在集群中的信息——是否master,ReplicaSetName等。而两个Slice,就是传说中的连接池。unusedSockets存储当前空闲的连接,liveSockets存储当前活跃中的连接,Session缓存的连接就同时存放在liveSockets切片中,而临时Session获取到的连接就位于unusedSockets切片中。

每个mongoServer都会隶属于一个mongoCluster结构,相当于mgo在内部,模拟出了mongo数据库集群的模型。

type mongoCluster struct {
	sync.RWMutex
	...
	servers      mongoServers
	masters      mongoServers
	...
	setName      string
	...
}

如定义所示,mongoCluster持有一系列mongoServer的实例,以主从结构分散到两个数组中。

每个Session都会存储自己对应的,要操作的mongoCluster的引用。

长途跋涉的Session

       以下描述一个Copy出来的临时Session是如何获取到一个mongodb物理连接的。

当临时Session被Copy出来,并且通过调用一系列api,将一次数据库操作设置到了Session内部后,此时万事俱备,只差连接。新生的Session首先会检查自己的缓存里是否有连接可用,初来乍到的他当然不知道自己是一个一无所有的光杆司令。由于mgo的实现,可怜的他还要去检查两次,一次使用读锁,一次使用写锁。作者的意图应该是期望在对同一个session并发操作时,能在第二次排他锁检查之前,恰巧缓存到一条连接,那么就可以减少一次对连接池的操作。但这次,这种好事没有发生在这个Session身上,“摸”了两次“口袋”反复确认以后,他终于还是发现自己身无分文。没有连接的他向组织求救,也就是这个session所要操作的mongodb集群,也就是所提到的mongoCluster结构。

“组织”问了这个Session一系列问题,其中最主要的是两个问题,一是”你要主库连接还是从库连接”,二是“你期望的连接池最大大小是多少”。第一个问题,Session很好回答,他首先看了看自己的模式,是必须到主库还是必须到从库,还是两者皆可看情况而定。再看了看自己手里的操作是读还是写,写操作当然不可能到从库去完成。第二个问题就有点强人所难,但是他不用自己思考,因为这是从源Session那里拿过来的配置,也算是一点祖产吧。

这个Cluster此时表现得像一个掌柜,他先根据主从,从自己手下的mongoServer里挑出了一个,然后问他,你现在手里有没有空闲的连接。如果有,那幸运的Session就可以顺利地获取到这个空闲的连接,高高兴兴的揣到兜里回家干活。但如果不巧,正好unusedSockets为空,那么掌柜会问另一个问题,你有没有超过这个家伙的期望的最大连接数。如果没有超过,那还好,作为伙计的mongoServer就干活了,他会跑到他负责的数据库服务器那里去申请一条全新的连接,亲手交到Session的手里。但如果这个伙计算了算,还去申请新连接的话,恐怕就超限了,那就Session同学对不起了您,您等吧。每100ms,伙计自旋一次,等着unusedSockets里出现可用的连接。

当然有人会问,那这么自旋下去,如果连接一直被其他Session占用,会不会就死循环了呢,答案是不会。这个伙计作为一个数据库服务器的管理员吧可以说,他自己也要常常去确认他负责的这个服务器是不是还活着。因此,伙计同学每15s会给服务器发一个ping命令。作为管理员,伙计可就不管什么连接池大小超不超的问题了,那是他们那些普通session要考虑的琐事。伙计同学要ping的时候,也去unusedSockets里看,如果有最好,就拿一个来用;没有的话,直接去问服务器要新的。ping完之后,新的连接就会被放入unusedSockets中。这样的话,自旋中的获取连接请求,就可以拿到连接了。

经过摸口袋,找组织,问伙计,伙计再干点小活,临时Session终于拿到了梦寐以求的数据库物理连接,把他放到了自己的口袋里(当然有些模式的Session不会这么干)。心满意足地将自己手里的操作通过这条连接写了出去,等到数据库给了他想要的应答,他的生命也就结束了。通过Close方法,我们剥夺了他口袋里的得之不易的连接,放回到了对应mongoServerunusedSockets中。不久之后,GC又杀死了这个Session。

为什么我司的代码没有使用Copy也没有出问题?

看过我司Go项目代码的同学可能知道,我司的服务端代码中并没有使用Copy,而是类似如下的使用

func Dial(){
	localSession ,_ := mgo.Dial(mongoUrl)
	localSession.SetMode(mgo.Eventual)
	globalDatabase = localSession.DB("db")
}

func Insert() {
	globalDatabase.C("coll").Insert(...)
}

使用了一个全局的mgo.Database实例,所有的对该db的操作,都通过这个实例完成。 原因就在于,我们使用的是模式是mgo.Eventual,该模式最大的特点就是不会缓存连接,拒绝持有mongodb的一针一线。通过该mgo.Database实例的操作,每次都会发现自己的口袋里一无所有,都会经过一次上一节所述的长途跋涉获取连接,因此也规避了不使用Copy带来的两个副作用。一并发效率问题,Eventual的Session每次操作都从连接池取连接,相当于分散在连接池中完成了操作,二连接可用性问题,连接池机制确保了,从mongoServer取得的连接,都是活的连接。

Copy机制或Eventual模式的并发模型的问题

并发锁效率

Copy机制或Eventual模式的共同点是,每次的数据库操作都要经过一次代码路径略深的获取连接的过程。而这个路径中,会操作多个线程安全的结构体,包括mongoServermongoCluster等,线程安全的代价就是并发锁冲突带来的性能下降。举个例子,假设有十个写操作并发,无论使用Copy还是Eventual,最终都会走到cluster的masterServer,请求一个主库连接;而完成这个请求,需要写两个slice,将连接从unusedSockets删除,并加入liveSockets,对切片的更新势必要加排他锁,因此这十个请求很有可能会产生锁冲突。

连接池上限与冲击数据库

mgo对Session有一个poolLimit配置,也就是上文中所说的cluster问session的第二个问题——代表了对连接池连接数的上限限制。默认配置的连接数上限是4096,显然对生产环境来说太过大了。但这个配置我以为非常的鸡肋,属于设置也不好,不设置也不好;个人认为这是被mgo的并发模型所拖累了。

假设高并发场景,若设置的连接池上限为4096,并发为10000,那么理论上,一瞬间,mgo可能会产生4096个到mongodb的物理连接,而剩下的六千的请求会自旋等待。4096个连接对mongodb来说,首先意味着4096 * 10M的内存消耗,如此高的连接会导致各种各样的问题。那么加入设置连接池上限为100,并发为10000,9900个等待的请求每次100个排队完成,对应用的效率又是不小的消耗。况且实际测试中,poolLimit的设置也无法严格地限制住连接数。

连接池只伸不缩

mgo另一个问题是连接池连接不释放,一旦由于并发原因,连接池的数量被撑大,之后再也不会变小,除非客户机或服务器重启。

M:1? 1:1 ? M:N!

排除连接可用性问题,全局缓存连接的Session的问题是M个数据库操作通过1个连接完成。通过Copy、Eventual完成数据库操作的问题是,取到一个连接后,只做一件事情就归还了连接。这两种并发模型都存在问题。因此,最好的模型是M:N,有M个数据库操作需要完成,一次性取N个连接,分散到N个连接中完成,此后无论有多少批请求,都可以在N个连接中分散完成。第一可以规避连接池锁冲突,第二不会大规模产生真实连接,充分利用已建立的连接。

SessionPool

M:N的模型无法通过mgo原生支持完成,api也无法支持用户获取到物理连接。

可以利用Session会缓存连接的特性,通过一些小技巧实现一个SessionPool。例如,有M个写操作,则可以一次性生成N个StrongSession,每个StrongSession自己会缓存一条masterSocket;于是,之后的写操作,可以以某种方式负载均衡到这N个由Strong模式的Session缓存的连接中。

mgop简单实现了上述的StrongSessionPool,以轮询的方式负载。对从库连接的缓存以及动态负载还有待实现。

实现SessionPool要特别注意的问题是刷新问题,缓存Session中的连接随时可能会失效,mgop的方式是遍历发送isMaster命令,第一确认连接存活,第二确认连接确实是到主库的。若发现问题,则马上重置缓存。

空闲连接释放

mgo的连接池释放问题,在我的mgo fork中做了一个简单的实现解决这个问题。github.com/JodeZer/mgo

在mongodb url标准中,有两个option:minPoolSize,maxIdleTimeMS。mgo没有支持这两个选项,通过实现这两个选项可以达到释放连接的目的。官网描述:

minPoolSize
The minimum number of connections in the connection pool. The default value is 0.

maxIdleTimeMS
The maximum number of milliseconds that a connection can remain idle in the pool before being removed and closed.

实现方式是,将mongoServer中的unusedSockets类型改造为timedMongoSocket(fork中自定义的类型)

type timedMongoSocket struct {
	soc          *mongoSocket
	lastTimeUsed *time.Time
}

每次有连接被重置到空闲池时,打一个时间戳。在轮询goroutine中每隔一段时间review空闲连接的空闲时长,当时长大于maxIdleTimeMS时,就释放连接,将空闲池的大小控制在minPoolSize

实现中,没有特地写设置函数,可以通过在mongo url中写入选项设置,如:mongodb://127.0.0.1?minPoolSize=0&maxIdleTimeMS=3000,若maxIdleTimeMS不设置或为0,则默认为不进行释放