##Fabric - A real-time stream processing framework
###What?
A scalable, practical and safe real-time computation framework designed for easy operability and extension.
Fabric is proven to work very well for:
###Why?
Computation: A computation is a directed acyclic graph which describes the flow of data and the computations done on them. In this graph, the nodes are the components written by users. Nodes can be of two types, SOURCE and PROCESSOR. A single instance of a topology runs in a single JVM. A computation consists of sources and processors written by users linked together according to a json specification. All sources written by users must extend PipelineSource. All processors written by users must extend StreamingProcessor or ScheduledProcessor.
EventSet: An event set is a collection of events. An event set is the basic transmission unit within the computation.
Source: A source is a component that ingests event sets into the computation. A source is responsible for managing the state of the events ingested into the computation. Events can be in acknowledged or unacknowledged state.
Processor: A processor is a component that performs some computation on an incoming event set and emits an outgoing event set. A processor can be of two types, Streaming Processor and Scheduled Processor.
Streaming Processor: A Streaming Processor is a processor that is triggered whenever and event set is sent to the processor.
Scheduled Processor: A Scheduled Processor is a processor which is triggered whenever a fixed period of time elapses in a periodic fashion.
###Walkthrough
Let’s write a word count computation that processes a list of sentences and outputs words frequency counts.
We need three components for this computation:
####RandomSentenceSource.java
// Add this annotation for registering the source with the metadata server
@Source(
namespace = "global",
name = "random-sentence-source",
version = "0.1",
description = "A source that generates random sentences from a pool of sentences",
cpu = 0.1,
memory = 64,
requiredProperties = {},
optionalProperties = {"randomGeneratorSeed"}
)
public class RandomSentenceSource implements PipelineSource {
Random random;
String[] sentences = {
"A quick brown fox jumped over the lazy dog",
"Life is what happens to you when you are busy making other plans",
"Mama always said that life is a box of chocolates",
"I am going to make you an offer you cannot refuse",
"I am speaking to a dead man on the other side of the phone",
"The path of the righteous man is beset on all sides by the inequities of the selfish and the tyranny of evil men"
};
@Override
public void initialize(final String instanceName,
final Properties global,
final Properties local,
final ProcessingContext processingContext,
final ComponentMetadata componentMetadata) throws Exception {
// this method is called to initialize the source
// use this utility method to read properties passed
int seed = ComponentPropertyReader
.readInteger(local, global, "randomGeneratorSeed", instanceName, componentMetadata, 42);
random = new Random(seed);
}
@Override
public RawEventBundle getNewEvents() {
// this method is called to get new events
return RawEventBundle.builder()
.events(getSentences(5).stream()
.map(sentence -> Event.builder().id(random.nextInt()).data(sentence.toLowerCase()).build())
.collect(Collectors.toCollection(ArrayList::new)))
.meta(Collections.emptyMap())
.partitionId(Integer.MAX_VALUE)
.transactionId(Integer.MAX_VALUE)
.build();
}
private List<String> getSentences(int n) {
List<String> listOfSentences = new ArrayList<>();
for (int i = 0; i < n; i++) {
listOfSentences.add(sentences[random.nextInt(sentences.length)]);
}
return listOfSentences;
}
}
####SplitterProcessor.java
@Processor(
namespace = "global",
name = "splitter-processor",
version = "0.1",
cpu = 0.1,
memory = 32,
description = "A processor that splits sentences by a given delimiter",
processorType = ProcessorType.EVENT_DRIVEN,
requiredProperties = {},
optionalProperties = {"delimiter"}
)
public class SplitterProcessor extends StreamingProcessor {
String delimiter;
@Override
protected EventSet consume(final ProcessingContext processingContext, final EventSet eventSet) throws
ProcessingException {
List<Event> events = new ArrayList<>();
eventSet.getEvents().stream()
.forEach(event -> {
String sentence = (String) event.getData();
String[] words = sentence.split(delimiter);
events.add(Event.builder()
.data(words)
.id(Integer.MAX_VALUE)
.properties(Collections.emptyMap())
.build());
});
return EventSet.eventFromEventBuilder()
.isAggregate(false)
.partitionId(eventSet.getPartitionId())
.events(events)
.build();
}
@Override
public void initialize(final String instanceName,
final Properties global,
final Properties local,
final ComponentMetadata componentMetadata) throws InitializationException {
delimiter = ComponentPropertyReader.readString(local, global, "delimiter", instanceName, componentMetadata, ",");
}
@Override
public void destroy() {
// do some cleanup if necessary
}
}
####WordCountProcessor.java
@Processor(
namespace = "global",
name = "word-count-processor",
version = "0.2",
description = "A processor that prints word frequency counts within a tumbling window",
cpu = 0.1,
memory = 128,
processorType = ProcessorType.TIMER_DRIVEN,
requiredProperties = {"triggering_frequency"},
optionalProperties = {}
)
public class WordCountProcessor extends ScheduledProcessor {
Map<String, Integer> wordCounts = new HashMap<>();
@Override
protected void consume(final ProcessingContext processingContext, final EventSet eventSet) throws
ProcessingException {
eventSet.getEvents().stream()
.forEach(event -> {
String[] words = (String[]) event.getData();
for (String word: words) {
if (wordCounts.containsKey(word)) {
wordCounts.put(word, wordCounts.get(word) + 1);
} else {
wordCounts.put(word, 1);
}
}
});
}
@Override
public void initialize(final String instanceName,
final Properties global,
final Properties local,
final ComponentMetadata componentMetadata) throws InitializationException {
// nothing to initialize here
}
@Override
public List<Event> timeTriggerHandler(ProcessingContext processingContext) throws ProcessingException {
// this method will be called after a fixed interval of time, say 5 seconds
System.out.println(Joiner.on(",").withKeyValueSeparator("=").join(wordCounts));
wordCounts.clear();
// nothing to send to downstream processors
return Collections.emptyList();
}
@Override
public void destroy() {
wordCounts.clear();
}
}
RandomSentenceSource -> SplitterProcessor -> WordCountProcessor
Let’s suppose all of these classes reside in a single maven project sample-topology
Suppose the jar sample-topology.jar is hosted at http://localhost:8080
To register these sources and processors, build and release this project into artifactory and then use the following fabric-server API
The json specification for this computation will look like this
{
"name": "word-count-topology",
"sources": [{
"id": "random-sentence-source",
"meta": {
"id": "1cfda5cb-99d4-34e3-83f0-5b1364a92cce",
"type": "SOURCE",
"namespace": "global",
"name": "random-sentence-source",
"version": "0.1",
"description": "A source that generates random sentences from a pool of sentences",
"processorType": null,
"requiredProperties": [],
"optionalProperties": [
"randomGeneratorSeed"
],
"cpu": 0.1,
"memory": 64,
"source": {
"type": "jar",
"url": "file:///fabric-sample-processors/target/fabric-sample-processors-1.0-SNAPSHOT.jar"
}
},
"properties": {}
}],
"processors": [{
"id": "splitter-processor",
"meta": {
"id": "0b749006-d2dd-3684-a521-f76d6ab0dec8",
"type": "PROCESSOR",
"namespace": "global",
"name": "splitter-processor",
"version": "0.1",
"description": "A processor that splits sentences by a given delimiter",
"processorType": "EVENT_DRIVEN",
"requiredProperties": [],
"optionalProperties": [
"delimiter"
],
"cpu": 0.1,
"memory": 32,
"source": {
"type": "jar",
"url": "file:///fabric-sample-processors/target/fabric-sample-processors-1.0-SNAPSHOT.jar"
}
},
"properties": {
"processor.splitter-processor.delimiter": " "
}
}, {
"id": "word-count-processor",
"meta": {
"id": "59f4fe28-b09b-3447-8bb2-26d3c23dd885",
"type": "PROCESSOR",
"namespace": "global",
"name": "word-count-processor",
"version": "0.2",
"description": "A processor that prints word frequency counts within a tumbling window",
"processorType": "TIMER_DRIVEN",
"requiredProperties": [
"triggering_frequency"
],
"optionalProperties": [],
"cpu": 0.1,
"memory": 128,
"source": {
"type": "jar",
"url": "file:///fabric-sample-processors/target/fabric-sample-processors-1.0-SNAPSHOT.jar"
}
},
"properties": {
"processor.word-count-processor.triggering_frequency": "5000"
}
}],
"connections": [{
"fromType": "SOURCE",
"from": "random-sentence-source",
"to": "splitter-processor"
}, {
"fromType": "PROCESSOR",
"from": "splitter-processor",
"to": "word-count-processor"
}],
"properties": {
"computation.name": "word-count-topology",
"computation.eventset.is_serialized": "false"
}
}
###Benchmarks
####Performance Test Configuration
No of messages: 1 million
Payload size: 258 bytes
####Topology
Kafka Source -> Event Counter (Prints number of total events consumed every one second to the console)
No of partitions: 1
Topic Name: end-to-end-latency-perf
No of instances of topology: 1
Kafka source buffer size: 3 MB
Docker CPU (number of cpu shares): 1.0
Docker Memory: 2 GB
JVM Heap Size: 2 GB
####Kafka Broker configuration
2 cores Intel(R) Xeon(R) CPU E5-2666 v3 @ 2.90GHz 8 GB RAM
####Mesos Host configuration
8 cores Intel(R) Xeon(R) CPU E5-2666 v3 @ 2.90GHz 32 GB RAM
End to end latency to process all messages in seconds (ceiling) averaged over multiple runs is presented below
Configuration | Without gzip | With gzip |
---|---|---|
Local Zk, Local Kafka | 8 | 5 |
Remote Zk, Remote Kafka | 36 | 6 |
Throughput with best configuration T ~ 166666 events / second
NOTE: Using Disruptor with YieldWaitingStrategy instead of LBQ for channel communication actually reduced the throughput
###Using Processor Maven Archetype
To create a processor project, please run following command -
mvn archetype:generate -DarchetypeGroupId=com.olacabs.fabric -DarchetypeArtifactId=fabric-processor-archetype -DarchetypeVersion=0.0.1-SNAPSHOT -DartifactId=<artifact_id_of_your_project> -DgroupId=<group_id_of_your_project> -DinteractiveMode=ture
Example -
mvn archetype:generate -DarchetypeGroupId=com.olacabs.fabric -DarchetypeArtifactId=fabric-processor-archetype -DarchetypeVersion=0.0.1-SNAPSHOT -DartifactId=fabric-my-processor -DgroupId=com.olacabs.fabric -DinteractiveMode=ture
What it does -
Can you improve this documentation? These fine people already did:
shashank.g & shaikidris.aliEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close