Table of Contents:
Status: Accepted
Authors:
References:
SlateDB maintains a live memtable, a set of immutable memtables, and metadata about SSTs in the WAL. put() writes the key-value into the memtable. At some interval, the flushing thread closes the memtable and flushes the contents to a new WAL SST. Immediately before the flush, the memtable is converted to an immutable memtable.
get() operations search the current memtable, immutable memtables, and WAL SSTs in reverse-write order (most recent SSTs first). The search is terminated when the key specified in get is found.
There are a few problems with the current implementation that we address with this design:
get() becomes very expensive because it has to search an ever-growing list of SSTs.The following are out-of-scope for this design. That said, the design should not preclude them. For some items, we will describe how the design can be adapted to achieve these goals in follow-on work.
The design for compaction depends on the database and workload sensitivity to write, read, and space amplification. We will likely ultimately need to support a variety of compaction strategies to accommodate different types of workloads. However there are some common considerations given SlateDB's object-store based architecture:
write amplification: write amplification refers to the added work done by compaction for every write. This is usually measured as some multiplier. For example, if a write is written to the database 5 times (once for the initial write and compacted 4 times) - write amplification is 5x. Write amplification is not much of a concern from a cost perspective. The major cloud providers don't charge for data transfers to/from the "standard" tier of object storage (provided you set up networking correctly). There are charges to/from zonal tiers like s3express, but I expect that most of the data will reside in the regional tier. Write amplification is a concern from a bandwidth usage perspective. If write amplification is too high, writes will become bottlenecked on the compactor node's network. This is usually a higher limit than local or attached disk, but not an order of magnitude higher (~1.5-2x nw-out baseline:local disk write). So we will need to take care to avoid too much write amplification to support write-heavy workloads.
read amplification: read amplification refers to the added work done by the database for reads. This includes added cpu from searching through multiple possible locations for a read, and added i/o from reading multiple locations. We certainly don't want to be doing multiple, if any, GET operations per SlateDB read, as GETs are expensive and slow. Read amplification can be mitigated by effective caching. Ideally, for optimal performance and cost, reads can be served entirely from cache, which spans memory and local disk. Still, we probably don't want to be doing multiple disk reads per read or we may saturate the local/attached disks. Therefore its desirable that the filter and index blocks used for most reads fit in memory. All of this is to say that it is pretty important for SlateDB to reduce read amplification. We need the set of SSTs to search for a read to be small enough so that indexes/filters can fit in memory, and data being read fits in total cache most of the time.
space amplification: SlateDB is not as sensitive to space amplification as disk-based databases are because storage is practically unbound, and is cheap (about 1/5th-1/20th the cost of an on-disk db on s3 depending on replication factor, instance stores vs ebs, reserved vs on-demand, etc). One problem that we can totally sidestep is transient space amplification from compaction. Most dbs suffer from this problem and have to either over-allocate disk space or implement incremental compaction. This won’t be an issue for us. Our main concern with space amplification is that it grows the search space for reads as described above.
In the rest of this document, we propose mechanisms for compacting together SSTs to purge duplicates and keep the total count of SSTs contained. Compaction will be executed both by the main writer and by a separate compactor process.
The main writer compacts the WAL to the first level of the database. This has a number of benefits: As we describe below, the main writer compacts the WAL directly from memory without reading the data back from S3 first. This saves cost for low-latency writers and more importantly, reduces load on the network. This offloads some of the compaction i/o from the compactor onto the writer. Assuming the main bottleneck for compaction will be network, this should help us achieve more throughput. It should be easier to coordinate between the compactor and writer for lower-frequency compactions to lower levels than WAL->L0 compactions.
The compactor is responsible for compacting L0 and the lower levels of the database. Compacted SSTs are maintained in a series of Sorted Runs (SRs). Each SR spans the full keyspace of the database. A SR is made up of an ordered series of SSTs, each of which contains a distinct subset of the total keyspace. We use Sorted Runs instead of large SSTs because it is a simple way to keep the size of the metadata blocks small enough so that they can be easily paged in and out of cache without polluting the cache. Larger SSTs have larger metadata blocks. It’s expensive to page them in and out of cache, and they are likely to displace other data that should be retained by the cache to achieve a high hit rate.
The SRs are themselves ordered by age. When executing point lookups, SlateDB looks up the value for a key in age-order, and terminates the search at the first SR that contains the key. Range scans read every SR and sort-merge the result.
table SortedRun {
id: uint32;
ssts:[SstId];
}
table SstId {
high: uint64;
low: uint64;
}
table Compacted {
runs: [SortedRun];
}
table Manifest {
…
l0_last_compacted: CompactedSstId // The last compacted l0
l0: [SstId];
compacted: Compacted;
…
}
We propose to augment the manifest by adding the following fields:
L0: Contains a list of SST IDs in L0
compacted: Contains a single instance of Compacted. Compacted contains a list of SortedRun instances. A SortedRun instance defines a single sorted run. Each Sorted Run contains a list of SST IDs and has a unique ID. The list of SST IDs defines the SSTs that comprise the sorted run. A given SST belongs to at most 1 SR. The ID describes the SR’s position in the list of sorted runs in compacted. That is, an SR S with an S.id must occur after SR S’ with ID S’.id if S.id < S’.id (so the sorted run with ID 0 must be last in the list). The last SR in the list must have ID 0. The semantics of the ID will be important when we describe how to define compactions.
l0_last_compacted: Contains the ID of the last l0 SST that was compacted to compacted. The compactor uses this value to determine which l0 SSTs it needs to consider for compaction when refreshing its view of state with the manifest stored durably.
We will use ULIDs to name compacted SSTs. The ULID is stored in the manifest in the SstId table, with the high and low fields containing the high and low bits of the ULID, respectively.
In the Object Store, compacted SSTs are stored under the compacted directory. Each SST object is named using its ULID and the suffix .sst, e.g:
/compacted/01ARZ3NDEKTSV4RRFFQ69G5FAV.sst
/compacted/01BX5ZZKBKACTAV9WEVGEMMVRZ.sst
…
Compaction behavior is governed by the following DB options:
l0_sst_size_bytes: defines the target L0 SST size in bytes.
l0_manifest_commit_interval_ms: defines the minimum interval between memtable flush-related manifest updates in milliseconds.
l0_compaction_threshold_ssts: defines the threshold number of SSTs for compacting L0. (default 8)
l0_max_ssts: defines the maximum number of uncompacted L0 SSTs. (default 16)
max_compactions: defines the max number of concurrent compactions. (default 4)
compaction_scheduler: defines the compaction scheduler (currently only supports the value tiered)
compaction_scheduler_tiered: An options struct that defines scheduler options for tiered compaction.
compaction_scheduler_tiered.level_compaction_threshold_runs: defines the threshold number of sorted runs for compacting a lower level (specific to the tiered compaction scheduler). (default 8)
compaction_scheduler_tiered.level_max_runs: defines the maximum number of sorted runs at alower level (specific to the tiered compaction scheduler). (default 16)
The writer is responsible for compacting the WAL to the first level of the database. It does this directly from the memtable rather than reading back the WAL first. Instead of freezing the memtable when writing the WAL, it instead retains it until enough data has accumulated to fill an L0 SST. Only then does it freeze the memtable and write out an L0 SST. When it’s been longer than l0_manifest_commit_interval_ms since the last manifest update, the writer updates the manifest with the new SSTs. The L0 writes are fenced by virtue of committing L0 SSTs in a manifest update. The newly written L0 SSTs are only considered part of the database when the manifest has been updated.
Let’s look at the write and recovery protocols in more detail:
write:
put adds the key-value to the current memtable and updates the WAL using the protocol described in the manifest design.l0_sst_size, freeze the memtable by converting it to an immutable memtable, and write the memtable to a new ULID-named SST S in compacted.l0_manifest_commit_interval_ms:
last_compacted to the last WAL SST included in Sl0 by prepending S, S’, S’’, … to the listwrite-recovery:
last_compacted, reapply the writes as described above in the write protocol.TODO: should we have some way to bound the number of immutable memtables in memory? I left it out since we are free to purge them once their SSTs have been written (even before we update the manifest). But this doesn’t help us if we can’t write the SSTs to S3 for some reason - though in that case we likely can’t commit writes anyway.
The Compactor compacts L0 and the lower levels. It contains two logical processes: a Compaction Executor and a Compaction Scheduler. The Compaction Scheduler observes the current state of the database and schedules Compactions. The Compaction Executor bootstraps the Compactor, executes the Compactions scheduled by the Compaction Scheduler, and notifies the Compaction Scheduler about status. The Compaction Executor is fixed, while The Compaction Scheduler is modular to allow SlateDB to support different compaction styles. The specific scheduler is specified in the compaction.scheduler db option. Initially, we will implement a single scheduler that performs tiered compaction.
Lets start by defining the interface between the CompactionExecutor and CompactionScheduler. I’ll define them as Rust structs/traits. Then, we discuss how each component implements the interfaces.
The Compaction Scheduler tells the Compaction Executor what compactions to execute. A Compaction is defined using the following parameters:
Let’s look at some examples of valid/invalid compactions. I’ll use string IDs for SSTs here instead of ULIDs. Suppose our manifest looks like:
l0: [SST-1, SST-2, SST-3, SST-4]
compacted: [100, 50, 3, 1, 0]
Here are examples of valid/invalid compactions (I’m using the notation Sources->Destination)
[SST-2, SST-1]->101: This describes compacting the oldest 2 L0 SSTs to a new SR
[SST-4, SST-3]->101: This is invalid because it skips SST-2 and SST-1
[SST-1, 100]->100: This describes compacting the oldest L0 SST (SST-1) and SR 100 and saving the result as SR 100
[100, 50]->2: This is invalid because it writes the result to an SR that is not consecutive to 50 (3 is consecutive to 50)
[SST-4, SST-3, SST-2, SST-1, 100, 50, 3, 1, 0]->0: This describes a major compaction that compacts everything and saves it as SR 0
Observe that we can use this basic definition to describe compactions done by different compaction algorithms (this isn’t strictly true in the above proposal - e.g. it doesn’t currently support some-to-all compactions like compacting a single SST from one SR into another SR, but that’s a fairly straightforward extension to the definition of a source) - it’s up to the Compaction Scheduler to decide what compactions to execute. The scheduler can choose to implement leveled compaction by viewing each SR as a level and scheduling Compactions that always merge one SR into the next SR. Or it can implement tiered compaction by grouping SRs into levels and define compactions that merge all the SRs in a level into a new SR at the next level. The levels themselves are a logical construct maintained by the scheduler.
In Rust, this looks like:
union SourceId {
sorted_run: u32,
sst: Ulid,
}
struct Compaction {
id: u32 // a unique identifier for the compaction (it must be unique to the process’s lifetime)
sources: Vec<SourceId>
destination: u32
}
The Compaction Executor provides the following interface to the CompactionScheduler:
trait CompactionExecutor {
/*
* Notifies the compaction executor about a new compaction to execute. The result
* is Ok if the compaction was accepted by the executor. This does not mean that the
* compaction was completed. The executor validates the compaction and returns an
* error if the compaction is invalid.
*/
fn submit(&self, compaction: Compaction) -> Result<(), Error>
}
The Compaction Scheduler provides the following interface to the Compaction Executor:
enum CompactorUpdateKind { DBState, CompactionFinished }
struct DBStateUpdate {
kind: CompactorUpdateKind // always DBState
state: DBState, // we probably don’t want to use DBState here, but rather a subset that contains compactor-relevant state like l0 and compacted. But, you get the idea.
}
struct CompactionFinished {
kind: CompactorUpdateKind // always CompactionFinished
compaction_id: u32,
state: DBState,
}
union CompactorUpdate {
db_state: DBStateUpdate,
compaction_finished: CompactionFinished,
}
trait CompactionScheduler {
/*
* Notifies the scheduler that it should start evaluating the db for compaction. This method
* receives a channel over which the Executor sends the Scheduler updates about changes
* to the database. This includes updates about changes to the database (e.g. arrival of
* new L0 files), and completion of compactions.
*/
fn start(&self, executor: Box<dyn CompactionExecutor>, chan: Receiver<CompactorUpdate>);
}
The Compaction Executor initializes as follows:
compactor_epoch in the manifest using CAScompaction.scheduler, creating, a channel, and calling startThen, the compactor periodically polls the manifest. On every poll, the compactor:
compactor_epoch is different than the compactor’s epoch. If it is, exit.DBStateUpdate message over the scheduler channel.The Compaction Executor implements compact by:
compact returns.The Compaction Executor executes the compaction by reading the SSTs and SRs in sources and sort-merging them into a new SR.
The Compaction Executor needs to coalesce updates. If the same key appears in multiple sources, then it takes the value from the logically latest (i.e. most recent) source. The Compaction Executor handles destination SR 0 specially. If the destination SR is 0, and the value for a key is resolved to a tombstone, then the Compaction Executor will not include include the key in the resulting SR.
The new SR is made up of ULID-named SSTs in the compacted directory (just like L0).
We should implement the sort-merge so that we can make good use of the available network. One good option here is to use async Object Store APIs to concurrently read the various sources, and then to write the resulting SSTs while we move on to the next key ranges. I think the details are something we can work out in the implementation, and it doesn’t have to be optimal in this iteration of work.
Note that compacting whole sorted runs can create a lot of temporary space amplification, especially for compactions that read the last level. This is not a major concern for SlateDB as ObjectStore capacity is practically infinite, and the usage is temporary so it should not contribute meaningfully to cost.
When the new SR has been fully written out, the Compaction Executor finishes the compaction by:
compactor_epoch matches the compactor’s epoch. If it does not, exit.compacted and the SR and SSTs from sources removed.The Compaction Scheduler is responsible for selecting the next Compaction. Our goal here is to implement something simple that works, and then iterate/optimize on it in future cycles. Initially we propose to implement basic tiered compaction, which tries to maintain sorted runs in size-based levels, and constrains the number of sorted runs in a given level by merging the runs together when there are too many of them, usually moving the resulting run to the next level.
We choose tiered compaction because it works well for workloads with a moderate to high volume of writes because it has lower write amplification than leveled compaction (usually by a factor of T where T is the fanout for each level). It still guarantees that the total number of runs is proportional to O(log(N)) where N is the size of the db, but allows multiple runs to accumulate at a level to reduce the amount of merging (at the cost of a multiplier T on the number of runs, where T is the number of runs that can accumulate at a level - so it’s really O(Tlog(N))). So the drawback is that there can be significant space and read amplification. However, I think there are reasonable ways to work around most of these drawbacks for now, and we can trade off some write amplification for better read/space amplification down the line by implementing “lazy-leveling” as described in dostoevsky:
level_compaction_threshold_ssts sorted runs even at the lowest level, which means the entire keyspace may be copied level_compaction_threshold_runs times. This is somewhat concerning from a cost pov, however as explained at the beginning of the design Object Store costs are much lower than block device costs (by a factor of 5-20x), so this is less of a concern for SlateDB than traditional stores.All of this is to say, Tiered compaction feels like a great starting point for a general-purpose store, and we have reasonable short-term workarounds and long term solutions to most of the problems that we anticipate with tiered compaction.
SlateDB’s tiered Compaction Scheduler will work as follows:
l0_sst_size X l0_compaction_threshold X compaction.scheduler.tiered.level_compaction_threshold^Ncompaction.scheduler.tiered.level_compaction_threshold_runscompaction.scheduler.tiered.level_max_runsmax_compactionsl0_compaction_threshold_sstscompaction.scheduler.tiered.level_max_runsmax_compactionsThe design described above applies back-pressure so that we don’t wind up writing faster than we can compact and get unbounded read/space amplification. SlateDB blocks new writes if the number of SSTs in L0 exceeds l0_max_ssts. This shouldn’t happen if compaction is keeping up and is able to merge SSTs from L0 to L1. Compactions to L1 are blocked if the number of runs in L1 is greater than compaction.scheduler.tiered.level_max_runs. Similarly, Compactions to L2, L3, … LN are blocked if the number of runs in those levels exceeds the threshold.
Point-Lookups
The reader looks up keys in the following order, and terminates the search at the first item that contains a given key: memtable immutable memtable L0 SSTs, in order SRs, in order
Range Scans
To serve Range Scans, the reader needs to look through every memtable, immutable memtable, SST, and SR and sort-merge the key-values that fall within the range.
By default, the compactor will run alongside the main writer-reader in the same process. We will also support running the compactor in a separate process by initializing and running just the compactor.
In this section I want to briefly sketch out how we can iterate on the design described above to add some additional features.
Compactions targeting lower levels can take a long time. If the compactor restarts, we don't want to lose compaction progress. We can record what compactions were ongoing in the manifest by defining a flatbuffer table schema that describes the Compaction struct defined above. Then, when the compactor restarts it knows what compactions were already scheduled.
Its not enough to know what compactions were ongoing - a new compactor also needs to be able to reconstruct compaction progress. We can leave breadcrumbs to allow it to piece this information together:
To do this we can allow including uncompleted SRs in the list of SRs in the manifest. We can do this by including a completed flag in the SR definition. For a given SR ID there should only be one with completed set to true, which is what the reader would use to serve reads. Then, the new compactor can inspect the SSTs in the new SR, and see what key ranges have already completed compaction, and finish compacting the uncompacted key ranges.
The tiered compaction scheduler proposed in this document will have very high space amplification, as it maintains multiple SRs at the lowest level. Its likely that each SR at the lowest level contains most of the database, assuming a stable db size. As described above, to fix this (at the cost of added write amplification) we can always maintain a single run at the lowest level. In particular, our tiered compaction policy can always maintain SR 0 as its own level. When the earlier level is full, it can compact (at least) all SRs from that level into SR 0.
Some databases (e.g. ScyllaDB) handle time-series data with a fixed lifetime specially. They maintain data in fixed time buckets that are themselves compacted according to some schedule (e.g. tiered or leveled), where each bucket corresponds to a distinct non-overlapping key range. Then, when a given bucket is no longer accessible (as defined by the fixed lifetime), the entire bucket is removed from the database.
We can implement such a Scheduler fairly easily. The main challenge is that we need to either:
Compacted. Then, the scheduler can maintain each bucket in its own Compacted instance that's associated with some key range. Reads would need to be aware of this mapping. When deleting a bucket the whole Compacted instance is dropped from the mapping.One of the main advantages of running in cloud is that applications can dynamically provision resources for a short time to burst capacity, since compute is easily provisioned and billed only for the time its provisioned. This means that for really expensive compactions, we should be able to quickly spin up a short-lived but beefy compactor node to execute the compaction. There's nothing in the design above that precludes us doing this. The compactor could in the future with some (probably pluggable depending on whether a compactor runs on k8s, or directly on a compute service like ec2) API for provisioning, provision a short-lived node, notify it about compaction work, and then commit the manifest when the compaction work is complete.
Its worth calling out that we already get some of this benefit as most instances have burstable network (e.g. an instance with a 5Gbps baseline network can burst up to 25Gbps for a short duration).
| Instance Type | Network Bandwidth MBps (full-duplex) (Baseline/Burst) | Instance Disk Bandwidth MBps (assumes 4K max io size - experimentally verified for xlarges) (Read/Write) | EBS Bandwidth MBps (full-duplex) (Baseline/Burst) |
|---|---|---|---|
| m5d.xlarge | 160/1280 | 230/113 | 143.75/593.75 |
| m5d.metal | 3200/3200 | 5468/2656 | 2375/2375 |
| i3en.xlarge | 537/3200 | 332/253 | 144.2/593.75 |
| i3en.metal | 12800/12800 | 7812/6250 | 2375/2375 |
| c5d.xlarge | 160/1280 | 156/70 | 143.75/593.75 |
| c5d.metal | 12800/12800 | 5468/2656 | 2375/2375 |
Can you improve this documentation? These fine people already did:
Chris Riccomini, Rohan Desai & RohanEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |