This blog post will talk about how to transport logs from multiple servers to a central location for storage or analysis. Apache Kafka is chosen for this purpose to allow for easy scalability as well as for it’s versatility and community.
Good and meaningful logs are the first step to be able to quickly identify and debug issues as well as predict future problems. As long as the whole application stack fits onto one or two servers, manual reading or parsing of the logs will suffice most of the time. The more servers are added to the stack and the more services are running on these servers, the more complex it becomes to analyze and correlate the data. As soon as there are two application servers that are behind a round robin load balancer, the logs on a single server may no longer be enough to track down how a user triggered a bug or why a certain request took a very long time.
The first part of the series “Scalable and Robust Logging for Web Applications” described how to improve the default Ruby on Rails logger with Log4r to get more structured logs and data that matters. This is the second post in a series which’s goal it is to develop a robust system for logging, monitoring and collection of metrics that can easily scale in terms of throughput – i.e. adding more application servers – but is also easy to expand to new types of log data – i.e. by adding a new database or importing external data sources – and makes it easy to modify and analyze the data exactly as you need.
(Subscribe via Email on top of the right side-column to get informed as soon as the next post in this series is published!)
Scalable and Robust Logging for Web Applications
- Log4r for Ruby on Rails
- Log Transport and Distribution with Kafka
- How to Pre-Process Logs with Logstash
- Real Time Analytics with Storm
- Server Monitoring with Sensu
- Metrics with Graphite and RabbitMQ Using AMQP
- Storing Metrics and Logs
- Scalable and Robust Logging for Web Applications
- What to Transport? Logs vs. Metrics
- Apache Kafka
- Handling Logs
- Transporting Logs
What to Transport? Logs vs. Metrics
The first consideration should be if it is possible and/or necessary to transport all logs to a central location. If there are many servers or a lot of log data, this might be very resource intensive and aggregating or filtering the data might be necessary. The extremes of this are either transporting every single log vs. only transporting aggregated metrics. The following paragraphs try to help you decide on the right balance for your use case.
Advantages of transporting all logs:
- Metrics can be added, modified and deleted in one central location
- Historical data on new metrics can be computed from the stored logs
- Possibility to peek into live data-stream
- Allows building complex debugging and monitoring tools
- Central location for all logs. Invaluable for debugging, root-cause analysis and correlation of incidents
Advantages of transporting only metrics:
- Transporting (aggregated) metrics requires far less bandwidth
- Smaller storage requirements
- Scales far better
- Better than nothing
Overall transporting all logs has many advantages and should be preferred over aggregated metrics if possible. Especially managing metric definitions in one place and the ability to compute historic data for new metrics is very valuable. Also does transporting all logs allow for thorough (computationally expensive) data analysis on historic data to i.e. train machine learning models, predict behavior or give enhanced insights into who your users are and what they do.
There is no strict rule to follow and it is perfectly ok to mix and match. An example would be to just transport logs that contain examinable data and aggregate performance metrics, like average response time or jobs processed per minute, on the server.
This post will focus on the transport of raw log data. The posts “Server Monitoring with Sensu” and “Metrics with Graphite” will introduce better suited technologies to work with pure metrics.
Apache Kafka is a publish-subscribe messaging system that can handle high throughput and is easy to scale horizontally, which makes it a perfect for transporting huge amounts of log data. It is written in Scala and was initially developed at LinkedIn.
A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients.Scalable
Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers
Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact.
Distributed by Design
Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.
- from the official Apache Kafka website
Following is an excerpt form the paper the team at LinkedIn published about Kafka. It describes the basic principles, however you are encouraged to read the complete paper to understand how Kafka works and why certain design decisions were made.
[Here is an introduction to the] basic concepts of Kafka. A stream of messages of a particular type is defined by a topic. A producer can publish messages to a topic. The published messages are then stored at a set of servers called brokers. A consumer can subscribe to one or more topics from the brokers, and consume the subscribed messages by pulling data from the brokers. (…) To subscribe to a topic, a consumer first creates one or more message streams for the topic. The messages published to that topic will be evenly distributed into these sub-streams. (…) Unlike traditional iterators, the message stream iterator never terminates. If there are currently no more messages to consume, the iterator blocks until new messages are published to the topic.
- Kreps et al, “Kafka: a Distributed Messaging System for Log Processing”
Paul Ingles also wrote a nice introduction post on how uSwitch uses Kafka: “Kafka for uSwitch’s Event Pipeline” and Eric Onnen spoke at Surge2012 about “Data Models and Consumer Idioms using Apache Kafka”.
Kafka is not the only system that can be used for this kind of work. “Competitors” include RabbitMQ, Flume, Kestrel or Scribe. Comparing all these alternatives would be too much for this post and has already been done in an excellent way by the wikimedia foundation in their Logging Solutions Recommendation and Logging Solution Overview pages. Also two discussions on Quora are very good as they compare RabbitMQ with Kafka and take a look at the performance properties of Kafka.
Getting Started With Apache Kafka
This part will talk about how to set up a local Apache Kafka instance for testing purposes and how to configure a production ready three-node Kafka cluster using Zookeeper. This post will use the latest version of Apache Kafka – 0.8.1 – which is still considered beta but seems to be reasonably stable and is used by companies like LinkedIn in production.
Running Kafka Locally
Kafka comes with simple local scripts to start preconfigured Zookeeper and Kafka instances as well as simple producers and consumers. This allows to get started with development in less than a minute. Similar instructions for the latest stable release – 0.7.2 – can be found on the Apache Kafka 0.7 quickstart page.
The following excerpt from the Apache Kafka 0.8 documentation lists the commands necessary to get a single local Kafka server up and running. The guide describes the steps in more detail should something be unclear.
#Download the 0.8 release.
> tar xzf kafka-<VERSION>.tgz
> cd kafka-<VERSION>
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
#Start Kafka server
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
#See that topic exists
> bin/kafka-list-topic.sh --zookeeper localhost:2181
#Start a producer
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
#Start a consumer in a different terminal
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
This setup allows to test and code against a Kafka 0.8 instance on your local machine. The next part describes how a production cluster can be set up.
Setting Up a Production Ready Kafka Cluster With Zookeeper
To run a cluster with reasonable reliability, three machines with a replication factor of two should be the minimum in a production environment. Three machines allow one to go down or replacing as well as updating a single node in the cluster without creating a single point of failure.
A replication factor of two means that the data is at any time at two separate servers – with some synchronization delay. Should one server go down, all the data that was already synchronized to another node will be available there and Kafka will make this node the new “leader”, the primary node to write to and read from. As soon as the node fails, another node will become the “follower” (on a topic by topic basis) to satisfy the minimum replication factor.
Because replication in case of a failover also puts stress onto the servers, it is a bad idea to only have as many servers as are immediately necessary to handle the load. Always make sure the cluster can handle the peak load even if one broker goes down to not run into problems when an instance actually breaks or needs replacement. If this is not the case, increase the number of Kafka servers. A replication factor of two can be maintained with a larger amount of nodes unless greater reliability is desired.
Zookeeper is a little different as it needs a majority of nodes to operate. This means in a three node Zookeeper cluster, only one node can fail. Because in this example three nodes are co-located with the Kafka servers, it is a good idea to have at least one more Zookeeper instance outside of the Kafka servers to ensure that consumers still can process messages if two of the Kafka servers go down. This is a must if any other services in your stack use Zookeeper as well. (For more information about using Zookeeper, read Scott Leberknight’s excellent introductory series on using Zookeeper)
It is a good idea to use tools like Chef or Puppet to configure and set up services like Zookeeper and Kafka. They not only allow to maintain all configuration files in a single place and adapt them based on the server they are deployed to, they also allow to replicate setting up a new node easily. This means instead of installing and configuring a new instance, only a single command is necessary to provision a new Kafka server and integrate it into the cluster.
This part will talk about the most important parts of configuring Zookeeper, however to get the best performance you have to read the docs yourself and tune your configuration to match your use case. Most importantly, don’t forget to tune your JVM settings as well to make sure there are no memory issues slowing down your application. Monitoring Zookeeper and Apache Kafka can be done using tools like JConsole or jmxtrans.
Each Zookeeper server has to have a unique ID between 1 and 255. It has to be stored in the myid file, containing only the id of the current Zookeeper instance. The zoo.cfg configuration file contains a list of all the zookeeper instances that are part of the cluster. Each instance should know of all the other instances in the cluster:
# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
Configuring Apache Kafka
The Kafka brokers need more configuration to make sure they are working as efficient as possible on your machine, especially considering how many disks, cores and memory your servers have. The most important configuration options for replication are following:
# default replication factor. Can be changed on topic creation
# on a topic by topic basis
# Default number of partitions per topic
# Enable controlled shutdown of the broker.
# If enabled, the broker will move all leaders on it to some other
# brokers before shutting itself down. This reduces the unavailability
# window during shutdown.
Take a look a the documentation and tweak the other parameters, most importantly concurrency and memory, before starting up the cluster.
After the Kafka cluster is set up, the next step is to take a look at the different logs and the storage methods used.
Log to disk
Each program should log to the disk, even if logging directly to Kafka is available in the logger. One reason is that if something goes wrong during the transport, the full logs are still on the server and can be reviewed manually. The other concern is that Kafka uses TCP and therefore the log transport might slow down or block the program itself, which has to be avoided at any cost.
Another benefit of this approach is that it keeps the log pipeline the same for each program.
When saving logs to disk it is important to rotate the logs so the files don’t become too large, have a good structure to allow searching the logs manually and, most importantly, allow easy (automated) deletion of old logs so these don’t fill up the hard drive.
Even though many logging frameworks support log rotation on their own, using the logrotate command seems slightly better (imho) because all settings can be viewed in one file or script and changes to the log rotation don’t affect the application.
There is an excellent program for Kafka 0.7 brokers that tails your log files and sends the data to Kafka, called tail2kafka. However it does not support the Kafka 0.8 protocol and therefore would not allow to use features like topic partitioning / load balancing through the broker.
Clone the code from my mmlac GitHub repository. You can use it as follows:
# install dependency "kafka-python"
$ git clone https://github.com/mumrah/kafka-python
$ pip install ./kafka-python
# clone tail2kafka and start it
$ git clone firstname.lastname@example.org:mmlac/tail2kafka.git
tail2kafka -l <log-file> -t <kafka-topic> -s <kafka-server> -p <kafka-port> [other-options]
Tails a log file continously, sending log lines to a kafka topic as messages and supporting log rotation. Optionally,
prepend a "metadata" string to each log line (kafka message will contain the string <metadata>:<log-line>).
set -l to the log file to be tailed. The log tailing supports log rotation.
set -s and -p to set the kafka server (Multiple brokers to bootstrap from are not supported atm)
set -t <topic> to set the kafka topic to send
set -b to flush every n messages(default: 20)
set -w to flush every t seconds (default: 5)
Whichever condition is met first will trigger the flush
Advanced: If needed, use -d <delay> in order to control the tail delay - Unneeded in almost all cases.
-h, --help show this help message and exit
-s HOST, --host=HOST kafka host
-p PORT, --port=PORT kafka port
-t TOPIC, --topic=TOPIC
REQUIRED: Topic to send to
-l LOGFILE, --log-file=LOGFILE
REQUIRED: Log file to tail
-m METADATA, --metadata=METADATA
metadata tag to send along with the data
-b BATCH_SIZE, --batch-size=BATCH_SIZE
Size of message batches
-w BATCH_TIME, --batch-time=BATCH_TIME
Time based batching of messages
-d DELAY, --delay=DELAY
tail delay between iterations
There is still an issue with the shutdown, but it should not affect the running operation of the tool. I am no python wizard, so if you know how to fix it, please let me know or submit a pull request.
I hope this article gave you a good introduction into how to use Kafka to transport your logs off the server for processing. The next posts in this series will take a look at how to process the data once it’s sent to Kafka.
Subscribe to the blog to get informed immediately when the next post in this series is published!