Kafka is a distributed messaging system widely used in the industry. Kafka can be deployed on just a small server but it can also scale up to span multiple datacenters. Given the scale and variety of possible Kafka deployments, it is desirable to have flexible, configurable producer applications able to adapt to and robustly feed any Kafka real-world deployment.
nProbe, thanks to its export plugin, can be configured as a Kafka producer for the streaming of monitored/collected flows to categories called known as topics. The latest nProbe 8.3.x has been extended to allow:
- Custom configuration properties.
- A configurable number of Kafka producer threads.
Being able to set configuration properties is key as it allows to decide if high throughput is the name of the game, or if a low latency service is required. Among all the configuration properties available, for the sake of performance tuning, it is worth mentioning:
batch.num.messages
: the minimum number of messages – flows, in the nProbe parlance – to wait for to accumulate in the queue before sending off a message set.queue.buffering.max.ms
: how long to wait forbatch.num.messages
to fill up in the queue. A lower value improves latency at the cost of lower throughput and higher per-message overhead. A higher value improves throughput at the expense of latency.
Due to the non-realtime nature of flowsy, flows are periodic summaries of network connections – improving latency could be of no value. Contrarily, optimizing throughput could be fundamental to stream tens of thousands of flows per second. For this reason, a set of experiments with queue.buffering.max.ms
set to 1000 (i.e., one second) but with a variable number batch.num.messages
have been carried out. Results are presented in the remainder of this article after a brief introduction that explains how to set custom configuration properties and the number of producer threads.
Setting Custom Configuration Properties
Custom configuration properties are specified using option --kafka-conf
followed by a pair <name=value>
. The option can be passed multiple times to set several properties. As properties are either global or specific to the topic, topic-related properties must be prefixed with the string "topic."
.
The following example shows an nProbe instance started in collector mode that streams flows to a Kafka cluster and has three properties set:
simone@devel:~/nProbe$ ./nprobe -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kafka-conf debug=msg --kafka-conf queue.buffering.max.ms=1000 --kafka-conf=topic.auto.commit.interval.ms=2000
Configuring The Number of Kafka Producer Threads
The number of producer threads is set using option --kafka-num-producers
followed by a number. The default number of threads is 1 but it can be increased up to 8. An nProbe collector instance with two kafka producers can be started as
simone@devel:~/nProbe$ ./nprobe -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kafka-num-producers 2
Adding TLS/SSL Encryption
To add TLS/SSL encryption, brokers and nProbe Kafka clients need to be configured as described in this guide. Specifically, nProbe Kafka configuration properties, such as ssl.key.location
and ssl.certificate.location
, can be configured using option --kafka-conf
as described above.
TLS/SSL encryption requires librdkafka version 0.9.x+. Some distros are still shipping outdated versions 8.4.x. Make sure to have a 0.9.x version installed or all the TLS/SSL-related properties will be ignored by nProbe as documented in this issue.
Performance Tuning Experiments
Setup
Performance numbers stem from tests using the following setup:
- Producer Machine: Intel(R) Xeon(R) CPU E3-1230 v3 @ 3.30GHz, 16GB RAM
- Kafka Machine: Virtual Machine, Intel(R) Core(TM) i5-4258U CPU @ 2.40GHz, 2GB RAM
- Disk performance shortcut by setting the brokers’ flush configuration properties as so:
log.flush.interval.messages=10000000
log.flush.interval.ms=100000
- Two brokers
- One topic with two partitions and replication factor 2
Scope
Experiment configuration settings in order to:
- Determine the size S of flow messages generated by nProbe and streamed to Kafka
- Asses the maximum number of S-size messages that Kafka can ingest per second
- Vary the number of nProbe producer threads and
batch.num.messages
to see how fast it can go and if it can export a number of flows per second that reaches the maximum number determined in 2.
Determining the Size of a Kafka Flow Message
To determine the size S of a Kafka flow message it suffices to instruct nprobe to actually export with batch.num.messages=1
to make sure every message set will contain one and only one flow:
simone@devel:~/nProbe$ ./nprobe -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kafka-conf debug=msg --kafka-conf batch.num.messages=1 ... %7|1521802885.914|PRODUCE|rdkafka#producer-0| simone-VirtualBox-1:9092/0: produce messageset with 1 messages (304 bytes)%7|1521802885.915|MSGSET|rdkafka#producer-0| simone-VirtualBox-1:9092/0: MessageSet with 1 message(s) delivered%7|1521802886.915|PRODUCE|rdkafka#producer-0| simone-VirtualBox-1:9092/0: produce messageset with 1 messages (328 bytes)%7|1521802886.915|PRODUCE|rdkafka#producer-0| simone-VirtualBox-1:9092/0: produce messageset with 1 messages (309 bytes)%7|1521802886.915|MSGSET|rdkafka#producer-0| simone-VirtualBox-1:9092/0: MessageSet with 1 message(s) delivered%7|1521802886.915|MSGSET|rdkafka#producer-0| simone-VirtualBox-1:9092/0: MessageSet with 1 message(s) delivered%7|1521802886.918|PRODUCE|rdkafka#producer-1| simone-VirtualBox-1:9093/1: produce messageset with 1 messages (324 bytes)%7|1521802886.918|PRODUCE|rdkafka#producer-1| simone-VirtualBox-1:9093/1: produce messageset with 1 messages (305 bytes) ...
As it can be seen from the debug output above, a good estimation is S=300 Bytes.
Assessing the Maximum Number of Messages Ingested Per Second
To assess the maximum number of messages with size S=300 Kafka can ingest per second, the tool rdkafka_performance has been used. The tool is basically a command-line diagnostic tool useful to do this kind of performance assessments. As anticipated above, property queue.buffering.max.ms
is always set to 1000 (ie., one second) while property batch.num.messages
is varied to 30, 300, and 3000 to produce message sets with approximate size 10k, 100k, and 1M, respectively.
The outcome of the experiments is the following:
simone@devel:~/librdkafka/examples$ ./rdkafka_performance -P -t test7 -s 300 -c 100000 -m "_____________Test1:TwoBrokers:100kmsgs:300bytes" -S 1 -b 192.168.2.129:9092 -v -X queue.buffering.max.ms=1000 -X batch.num.messages=3000 % 100000 messages produced (30000000 bytes), 100000 delivered (offset 0, 0 failed) in 3521ms: 28400 msgs/s and 8.52 MB/s, 0 produce failures, 0 in queue, no compression
simone@devel:~/librdkafka/examples$ ./rdkafka_performance -P -t test7 -s 300 -c 100000 -m "_____________Test1:TwoBrokers:100kmsgs:300bytes" -S 1 -b 192.168.2.129:9092 -v -X queue.buffering.max.ms=1000 -X batch.num.messages=300 % 100000 messages produced (30000000 bytes), 100000 delivered (offset 0, 0 failed) in 3924ms: 25480 msgs/s and 7.64 MB/s, 0 produce failures, 0 in queue, no compression
simone@devel:~/librdkafka/examples$ ./rdkafka_performance -P -t test7 -s 300 -c 100000 -m "_____________Test1:TwoBrokers:100kmsgs:300bytes" -S 1 -b 192.168.2.129:9092 -v -X queue.buffering.max.ms=1000 -X batch.num.messages=30 % 100000 messages produced (30000000 bytes), 100000 delivered (offset 0, 0 failed) in 9141ms: 10939 msgs/s and 3.28 MB/s, 0 produce failures, 0 in queue, no compression
The maximum insertion speed, 28400 msgs/s and 8.52 MB/s, is reached when batching 3000 messages in message sets with size of approximately 1M.
Testing nProbe Performances
To see how fast nProbe can go, and also to compare its speed with the maximum insertion speed measured above, several experiments have been run. Property batch.num.messages
is varied to 30, 300, and 3000 to produce message sets with size 10k, 100k, and 1M, using either 1 or 2 producer threads.
Results for 1 producer thread:
simone@devel:~/nProbe$ ./nprobes -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kafka-num-producers 1 --kafka-performance-test 10000 --kafka-conf debug=msg --kafka-conf queue.buffering.max.ms=1000 --kafka-conf batch.num.messages=3000 -b 1 23/Mar/2018 15:35:06 [nprobe.c:3217] Kafka producer #0 [msgs produced: 900150][msgs delivered: 900150][bytes delivered: 261254571][msgs failed: 0][msgs/s: 19568][MB/s: 5.68][produce failures: 29][queue len: 99943]
simone@devel:~/nProbe$ ./nprobes -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kafka-num-producers 1 --kafka-performance-test 10000 --kafka-conf debug=msg -X queue.buffering.max.ms=1000 --kafka-conf batch.num.messages=300 -b 1 23/Mar/2018 15:32:58 [nprobe.c:3217] Kafka producer #0 [msgs produced: 900301][msgs delivered: 900301][bytes delivered: 261259728][msgs failed: 0][msgs/s: 22507][MB/s: 6.53][produce failures: 15][queue len: 99795]
simone@devel:~/nProbe$ ./nprobes -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kaa-num-producers 1 --kafka-performance-test 10000 --kafka-conf debug=msg --kafka-conf queue.buffering.max.ms=1000 --kafka-conf batch.num.messages=30 -b 1 23/Mar/2018 15:37:11 [nprobe.c:3217] Kafka producer #0 [msgs produced: 900097][msgs delivered: 900097][bytes delivered: 261240981][msgs failed: 0][msgs/s: 16982][MB/s: 4.93][produce failures: 15][queue len: 100000]
Results for 2 producer threads:
simone@devel:~/nProbe$ ./nprobes -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kaa-num-producers 2 --kafka-performance-test 10000 --kafka-conf debug=msg --kafka-conf queue.buffering.max.ms=1000 --kafka-conf batch.num.messages=3000 -b 1 23/Mar/2018 15:42:24 [nprobe.c:3217] Kafka producer #0 [msgs produced: 400076][msgs delivered: 400076][bytes delivered: 116086398][msgs failed: 0][msgs/s: 9757][MB/s: 2.83][produce failures: 41][queue len: 99968] 23/Mar/2018 15:42:24 [nprobe.c:3217] Kafka producer #1 [msgs produced: 415511][msgs delivered: 415511][bytes delivered: 120603129][msgs failed: 0][msgs/s: 10134][MB/s: 2.94][produce failures: 2][queue len: 84538]
simone@devel:~/nProbe$ ./nprobes -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kaa-num-producers 2 --kafka-performance-test 10000 --kafka-conf debug=msg --kafka-conf queue.buffering.max.ms=1000 --kafka-conf batch.num.messages=300 -b 1 23/Mar/2018 15:40:47 [nprobe.c:3217] Kafka producer #0 [msgs produced: 421172][msgs delivered: 421172][bytes delivered: 122256967][msgs failed: 0][msgs/s: 11383][MB/s: 3.30][produce failures: 2][queue len: 78877] 23/Mar/2018 15:40:47 [nprobe.c:3217] Kafka producer #1 [msgs produced: 400124][msgs delivered: 400124][bytes delivered: 116140787][msgs failed: 0][msgs/s: 10814][MB/s: 3.14][produce failures: 29][queue len: 99921]
simone@devel:~/nProbe$ ./nprobes -i none -n none --collector-port 2055 --kafka "192.168.2.129:9092;test7;none;0" --disable-cache --kaa-num-producers 2 --kafka-performance-test 10000 --kafka-conf debug=msg --kafka-conf queue.buffering.max.ms=1000 --kafka-conf batch.num.messages=30 -b 1 23/Mar/2018 15:39:09 [nprobe.c:3217] Kafka producer #0 [msgs produced: 400119][msgs delivered: 400119][bytes delivered: 116100328][msgs failed: 0][msgs/s: 8335][MB/s: 2.42][produce failures: 25][queue len: 99925] 23/Mar/2018 15:39:09 [nprobe.c:3217] Kafka producer #1 [msgs produced: 495282][msgs delivered: 495282][bytes delivered: 143787297][msgs failed: 0][msgs/s: 10318][MB/s: 3.00][produce failures: 0][queue len: 4768]
Using 2 producer threads doesn’t seem to provide any gain in the performance. The best performance reached, [msgs/s: 22507][MB/s: 6.53], is obtained for the single producer and batch size equal to 300 even if there’s a 2MB/s reduction with reference to the maximum speed reached with the performance assessment tool. This reduction should be explained by the other tasks performed by nProbe that include NetFlow collection and the JSON-encoding of exported data.