• Updated on 11 Jun 2019
  • 11 minutes to read
  • Contributors
  • Print
  • Share
  • Dark


  • Print
  • Share
  • Dark


C8 is a coordination free geo-distributed fast data platform supporting multiple data models, and can thus be scaled horizontally, that is, by using many servers, typically based on commodity hardware. This approach not only delivers performance as well as capacity improvements, but also achieves resilience by means of replication and automatic fail-over.

Typically when you choose a database or streaming system today, you’re not choosing one piece of technology, you’re choosing three: storage technology, data model, and API/query language.

For example, if you choose Postgres, you are choosing the Postgres storage engine, a relational data model, and the SQL query language. If you choose MongoDB you are choosing the MongoDB distributed storage engine, a document data model, and the MongoDB API. In systems like these, features are interwoven between all of the layers. For example, both of those systems provide indexes, and the notion of an index exists in all three layers.

Document databases, Graph databases, Key-Value, Pub-Sub Streams, Queues etc. all make sense in the right context, and often different parts of an application call for different choices. This creates a tough decision: Use a whole new database or new streaming system to support a new data model, or try to shoehorn data into your existing database or messaging system.

C8 uses layered concepts and decouples its data storage technology from its data model. C8’s core ordered key-value and log storage technology can be efficiently adapted and remapped to a broad array of rich data models and streams.

C8 is a co-ordination free geo-distributed multi-master, multi-model and streaming data fabric platform. Within a single datacenter, C8 is a CP master/master model with no single point of failure. With CP we mean that in the presence of a network partition, C8 prefers internal consistency over availability.

With master/master we mean that clients can send their requests to an arbitrary node within a data center, and experience the same view on the geofabric regardless. No single point of failure means that the cluster can continue to serve requests, even if one machine fails completely.

With geo-distributed we mean that clients can send their requests to any region and experience the same view on the C8 data platform outside of bounded latencies window.

In this way, C8 has been designed as a natively decentralized, distributed multi-master multi-model data and streaming platform. This section gives a short outline on the architecture of a C8 within a single data center and how the above features and capabilities are achieved.


Replication within DC

Replication in C8 is synchronous and works on a per-shard basis. The system configures for each collection, how many copies of each shard are kept in the cluster. The default is 2 replicas per datacenter. At any given time, one of the copies is declared to be the leader and all other replicas are followers.

Write operations for this shard are always sent to a instance that holds the leader copy, which in turn replicates the changes to all followers before the operation is considered to be done and reported back to the user.

Read operations are all served by the server holding the leader copy, this allows to provide snapshot semantics for complex transactions.

If an instance that holds a follower copy of a shard fails, then the leader can no longer synchronize its changes to that follower. After a short timeout (3 seconds), the leader gives up on the follower, declares it to be out of sync, and continues service without the follower. When the server with the follower copy comes back, it automatically resynchronizes its data with the leader and synchronous replication is restored.

If an instance that holds a leader copy of a shard fails, then the leader can no longer serve any requests. It will no longer send heartbeats. A supervision process takes the necessary action (after 15 seconds of missing heartbeats), namely to promote one of the servers that hold in-sync replicas of the shard to leader for that shard.

The other surviving replicas automatically resynchronize their data with the new leader. When the instance with the original leader copy comes back, it notices that it now holds a follower replica, resynchronizes its data with the new leader and order is restored.

All shard data synchronizations are done in an incremental way, such that resynchronizations are quick. This technology allows to move shards (follower and leader ones) between instances without service interruptions.

This allows to scale down a C8 cluster without service interruption, loss of fault tolerance or data loss. Furthermore, one can re-balance the distribution of the shards, either manually or automatically.

All these operations can be triggered via a REST/JSON API or via the graphical web UI. All fail-over operations are completely handled within the C8 cluster.

Similarly when messages are produced on a C8 stream, they are first persisted in the local cluster and then forwarded asynchronously to the remote clusters.

In normal cases, when there are no connectivity issues, messages are replicated immediately, at the same time as they are dispatched to local consumers. Typically, end-to-end delivery latency is defined by the network round-trip time (RTT) between the remote regions.

Applications can create producers and consumers in any of the clusters, even when the remote clusters are not reachable (like during a network partition).

Subscriptions are local to a C8 cluster.

While producers and consumers can publish to and consume from any cluster in a C8 instance, subscriptions are local to the clusters in which they are created and cannot be transferred between clusters. If you do need to transfer a subscription, you’ll need to create a new subscription in the desired cluster.

Say stream S1 is being replicated between 3 clusters, Cluster-A, Cluster-B, and Cluster-C. Also let's say each cluster has 1 producer i.e., P1, P2 and P3. Similarly assume C1 & C2 are consumers in Cluster-A and Cluster-B respectively.

Now all messages produced in any cluster will be delivered to all subscriptions in all the other clusters. So consumers C1 and C2 will receive all messages published by producers P1, P2, and P3. Ordering is still guaranteed on a per-producer basis.

Geo Replication across DCs

C8 uses asynchronous causal ordered replication across DCs (regions). C8 enables data to be written or messages to be produced and consumed in different geo-locations. For instance, your application may be write/publish data in one region and you would like to process it for consumption in other regions or markets. Geo-replication in C8 enables you to do that for both geofabric collection and streams.

Geo-replication is enabled at the geofabric level. Any message published on any global stream in that geofabric will then be replicated to all clusters in the specified set. Similarly any document added to any collection in that geofabric will then be replicated to all clusters associated with that geofabric.


C8 organizes its collection data within a datacenter in shards. Sharding allows to use multiple machines to run a cluster of C8 instances that together constitute a single database. Shards are configured per collection so multiple shards of data form the collection as a whole. To determine in which shard the data is to be stored C8 performs a hash across the values. By default this hash is being created from _key.

The number of shards is fixed at 16. There is no option for user to configure the number of shards. Hashing can be done for another attribute:

This would be useful to keep data of every country in one shard which would result in better performance for queries working on a per country base. You can also specify multiple shardKeys.

Note however that if you change the shard keys from their default ["_key"], then finding a document in the collection by its primary key involves a request to every single shard. Furthermore, in this case one can no longer prescribe the primary key value of a new document but must use the automatically generated one. This latter restriction comes from the fact that ensuring uniqueness of the primary key would be very inefficient if the user could specify the primary key.

On which node in a cluster a particular shard is kept is decided by the system. There is no option to users to configure an affinity based on certain shard keys.

Unique indexes (hash, skiplist, persistent) on sharded collections are only allowed if the fields used to determine the shard key are also included in the list of attribute paths for the index:

shardKeys indexKeys
a a ok
a b not ok
a a, b ok
a, b a not ok
a, b b not ok
a, b a, b ok
a, b a, b, c ok
a, b, c a, b not ok
a, b, c a, b, c ok

Geo-Fabric Streams

C8 GeoFabric streams are built on the publish-subscribe pattern, aka pub-sub. In this pattern, producers publish messages to streams. Consumers can then subscribe to those streams, process incoming messages, and send an acknowledgement when processing is complete.

Once a subscription has been created, all messages will be retained by C8 streams, even if the consumer gets disconnected. Retained messages will be discarded only when a consumer acknowledges that they've been successfully processed.


Messages are the basic "unit" of C8 streams. They're what producers publish to streams and what consumers then consume from streams (and acknowledge when the message has been processed). Messages are the analogue of letters in a postal service system.

Component Purpose
Value / data payload The data carried by the message. All c8 stream messages carry raw bytes, although message data can also conform to data schemas in future
Key Messages can optionally be tagged with keys, which can be useful for things like stream compaction
Properties An optional key/value map of user-defined properties
Producer name The name of the producer that produced the message (producers are automatically given default names, but you can apply your own explicitly as well)
Sequence ID Each c8 stream message belongs to an ordered sequence on its stream. A message's sequence ID is its ordering in that sequence.
Publish time The timestamp of when the message was published (automatically applied by the producer)
Event time An optional timestamp that applications can attach to the message representing when something happened, e.g. when the message was processed. The event time of a message is 0 if none is explicitly set.


A producer is a process that attaches to a stream and publishes messages to a C8 for processing.

Send modes

Producers can send messages to c8 either synchronously (sync) or asynchronously (async).

Mode Description
Sync send The producer will wait for acknowledgement from the broker after sending each message. If acknowledgment isn't received then the producer will consider the send operation a failure.
Async send The producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full, the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.


Messages published by producers can be compressed during transportation in order to save bandwidth. C8 streams currently supports two types of compression:


If batching is enabled, the producer will accumulate and send a batch of messages in a single request. Batching size is defined by the maximum number of messages and maximum publish latency.


A consumer is a process that attaches to a stream via a subscription and then receives messages.

Receive modes

Messages can be received from C8 either synchronously (sync) or asynchronously (async).

Mode Description
Sync receive A sync receive will be blocked until a message is available.
Async receive An async receive will return immediately with a future value


When a consumer has successfully processed a message, it needs to send an acknowledgement to the c8 so that c8 can discard the message (otherwise it stores the message).

Messages can be acknowledged either one by one or cumulatively. With cumulative acknowledgement, the consumer only needs to acknowledge the last message it received. All messages in the stream up to (and including) the provided message will not be re-delivered to that consumer.

Cumulative acknowledgement cannot be used with shared subscription mode, because shared mode involves multiple consumers having access to the same subscription.


Client libraries can provide their own listener implementations for consumers. In this interface, the received method is called whenever a new message is received.

You can find more details in [Streams] section(


This architecture is very flexible and thus allows many configurations, which are suitable for different usage scenarios: The important information here is that each layer in C8 can be scaled independently of the other layers.

Was this article helpful?