TSM是时序数据库Influxdb使用的存储引擎,可以看做是LSM的一种实现。本文以Influxdb中内嵌的引擎包tsm1为源代码,学习TSM的数据结构与具体实现。

引擎接口

type Engine interface {
	Open() error
	Close() error
	WithLogger(zap.Logger)
	LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error
	Backup(w io.Writer, basePath string, since time.Time) error
	Restore(r io.Reader, basePath string) error
	CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
	WritePoints(points []models.Point) error
	ContainsSeries(keys []string) (map[string]bool, error)
	DeleteSeries(keys []string) error
	DeleteSeriesRange(keys []string, min, max int64) error
	DeleteMeasurement(name string, seriesKeys []string) error
	SeriesCount() (n int, err error)
	MeasurementFields(measurement string) *MeasurementFields
	CreateSnapshot() (string, error)
	SetEnabled(enabled bool)
	Format() EngineFormat
	Statistics(tags map[string]string) []models.Statistic
	LastModified() time.Time
  io.WriterTo
}

tsm1.Engine实现了tsdb.Engine

WritePoints方法

从Point结构生成一条kv记录,key为influxdb组合的measurement+tag的组合,value为拼接结构。

  err := e.Cache.WriteMulti(values)
	if err != nil {
		return err
	}

	_, err = e.WAL.WritePoints(values)
	return err

拼接完成后,首先调用Cache写入,然后调用WAL写入,Cache的核心是一个soter接口

type storer interface {
	entry(key string) (*entry, bool)                // Get an entry by its key.
	write(key string, values Values) error          // Write an entry to the store.
	add(key string, entry *entry)                   // Add a new entry to the store.
	remove(key string)                              // Remove an entry from the store.
	keys(sorted bool) []string                      // Return an optionally sorted slice of entry keys.
	apply(f func(string, *entry) error) error       // Apply f to all entries in the store in parallel.
	applySerial(f func(string, *entry) error) error // Apply f to all entries in serial.
	reset()                                         // Reset the store to an initial unused state.
}

文件tsdb/engine/tsm1/ring.go中结构ring实现了storer接口

type ring struct {
	// The unique set of partitions in the ring.
	// len(partitions) <= len(continuum)
	partitions []*partition

	// A mapping of partition to location on the ring continuum. This is used
	// to lookup a partition.
	continuum []*partition

	// Number of keys within the ring. This is used to provide a hint for
	// allocating the return values in keys(). It will not be perfectly accurate
	// since it doesn't consider adding duplicate keys, or trying to remove non-
	// existent keys.
	keysHint int64
}

ring的本质是一个大小为4096的hash表,由key做hashkey,将值分散到各个slot中。