This post was written by Joe Stein of Stealth.ly and an Apache Kafka committer, with collaboration from Neha Narkhede from Confluent and Derrick Harris from Mesosphere. It also ran on the Confluent blog.
Stream data is everywhere and is on the rise - streams of application metrics for monitoring a company's IT infrastructure, streams of orders and shipment for retail companies, streams of activity data from devices for the IoT, streams of stock ticker data for finance companies, and so on. Increasingly, there is a need for infrastructure to harness all this stream data and make it available to various applications in a company's datacenter that need to process it, and do so in real-time and at scale.
Kafka's unique ability to move large amounts of data in real-time makes it a perfect fit for managing stream data, and many organizations
use Kafka to power real-time data monitoring, analysis, security, fraud detection and stream processing. Kafka also plays a key role in integrating various systems in a company's datacenter. Apache Mesos abstracts away datacenter resources to make it easy to deploy and manage distributed applications and systems. This post explains how to run Kafka clusters on Mesos to simplify the task of managing stream data at scale.
A quick overview of Apache Kafka
Apache Kafka is a distributed, high-throughput, low-latency publish-subscribe messaging system. Since it was open-sourced in 2011, Apache Kafka has been adopted widely across the industry at web companies like LinkedIn, Netflix, and Uber, as well as traditional enterprises like Cisco and Goldman Sachs. At these companies, it forms the backbone for critical data pipelines moving hundreds of billions of messages per day in real-time.
Apache Mesos: 0 to 60 in a minute
Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications or frameworks. It sits between the application layer and the operating system, making it easy and efficient to deploy and manage applications in large-scale clustered environments.
Here is a quick overview of the Apache Mesos architecture. Mesos is made up of Masters, Slaves and Frameworks.
The Mesos master
The Mesos master is responsible for the communication of resources between slaves and the frameworks. There is only one master running as the leader at any one time. There is typically at least one stand-by process for failover in the case the master crashes (in proxy mode stand-bys pass the data to the Master). The Mesos master is responsible for allocating the resources (between schedulers and slaves) for tasks, managing state, high availability, etc.
The Mesos slave
The Mesos slave launches local processes on the servers where it's running. These processes are launched within Linux containers by the Executor, which is the parent container for any process that it might launch, in addition to itself being a process.
The Mesos framework
Frameworks receive resource offers from the Mesos Master about the Mesos slaves' resources (e.g., CPU and RAM). Frameworks are made up of two components:
- The Scheduler: The scheduler provides the foundation for managing what the framework's tasks are doing. The scheduler is responsible for managing the state between slave success and failures, tasks failures, internal application configuration and failures, communication to the outside world and a lot more.
- The Executor: The executor is the running application's code on the server. Within this container, other processes can be launched as well, depending on how the application is configured to manage itself. Most often, the Executor is just the business logic code running on the server with a thin layer for interacting with the Mesos Master.
You can read more about the Apache Mesos architecture here
is a Mesos framework that makes it easy to launch any long-running application on Mesos without requiring a custom, application-specific framework. It automatically provides a lot of features that many applications need in a clustered environment, such as high-availability, node constraints, application health checks, an API for scriptability and service discovery, and an easy to use web-based user interface. The simplicity of Marathon comes with loss of flexibility and customization though. The application has no say in how the resources should be allocated to respect certain constraints, for example, to preserve processing or data affinity.
Why not run Kafka on Marathon?
At first, we started by running Kafka on Marathon but, in practice, we ran into a number of issues.
First, Marathon is not designed for managing stateful services. In the event of a failure or even a simple service restart, Marathon blindly restarts the service on any other resource that matches the constraints defined by the service. This is not ideal for stateful services since that requires the service to move all its local state to the new server which can turn out to be a very expensive operation. Kafka, like several other storage systems, maintains its data on local disks. Running Kafka on Marathon would mean a simple restart operation of a broker process can move each broker to a different server, making the broker replicate all its data from the rest of the brokers. Since typically Kafka stores large amounts of data, this could mean unnecessarily replicating terabytes of data over the wire. You want brokers to be able to wait if a restart was caused by a broker and if something becomes critically wrong after that still move the broker.
Second, Marathon does not allow you to selectively load balance the application's state amongst a subset of processes that belong to the application. In Kafka, examples of this operation are cluster expansion where you want to selectively move some partitions from the rest of the cluster to the newly restarted brokers. Currently, the cluster expansion operation has to be performed manually through an admin interface in Kafka. Merely starting new brokers in a cluster does not allocate any data to it and the admin has to selectively move some partitions from the rest of the cluster to the newly started broker. Also, Kafka does not have support for quotas yet, so this operation of moving data to the new brokers has to be done carefully in stages to avoid saturating the network and rest of the replication traffic in a Kafka cluster. Marathon does not have hooks that allow application specific business logic to drive the failure detection and handling of processes started using Marathon.
Due to these drawbacks, we decided to pursue the framework approach for integrating Kafka with Mesos.
Running Kafka on Mesos: The framework approach
Here is how the various components of the Kafka Mesos framework work:
Kafka Mesos scheduler
The scheduler provides the operational automation for a Kafka cluster and any version of Kafka can run on Mesos through the scheduler. It is the central point where the decisions for task failures, administration and scaling are all made. The scheduler state is maintained in Zookeeper, while the configuration and other administrative tasks are available through a REST API
The scheduler should be running on Marathon so that if the scheduler process dies, Marathon can launch that on another Mesos Slave.
Kafka Mesos executor
The executor interacts with the Kafka broker as an intermediary to the scheduler. The executor looks for the Kafka binary release tgz and runs that. This allows users to not only run different versions, but also allows users to patch Kafka and run simulated tests through configured and automated deployments.
Getting started: Installing Kafka on Apache Mesos
If you want to get your hands dirty, here is a quickstart on the Kafka Mesos framework.
Open up two terminal windows. Check the kafka-mesos.properties after you cd into the directory that you cloned to make sure the scheduler is configured for your cluster.
In the first window.
git clone https://github.com/mesos/kafka mesosKafka cd mesosKafka ./gradlew jar ./kafka-mesos.sh scheduler
In the second window.
./kafka-mesos.sh add 1000..1002 --cpus 0.01 --heap 128 --mem 256 ./kafka-mesos.sh start 1000..1002 ./kafka-mesos.sh status
At this point you will have 3 Kafka brokers running. For more CLI commands:
You can also get help for each command
./kafka-mesos.sh help <cmd>
Administering the Kafka Mesos framework
The Kafka Mesos Scheduler provides another option than just having a CLI. All commands available in the CLI are also available in the REST API.
To discover where the Mesos Kafka Scheduler is running you need to query the Marathon api.
curl -X GET -H "Content-type: application/json" -H "Accept: application/json" http://localhost:8080/v2/tasks
The REST API provides every feature that is available in the CLI. This takes the format of:
Adding a broker
Starting a broker
Status of running brokers
# curl "http://localhost:7000/api/brokers/status"
Existing Kafka tools, producers and consumers
Existing Kafka tools, producers and consumers all work with Kafka on Mesos just the same way they do while running Kafka outside Mesos. You can discover other brokers by interacting with either the CLI or the REST APIs.
When running the scheduler on Marathon for higher availability, first look up from the Marathon API
the host and port of the scheduler, then call that scheduler to find the brokers. Mesos DNS
can also be used so that brokers can have static DNS names assigned to them. Once you have your broker to connect to you are good to go.
Big things to come
The future is bright for Kafka on Mesos and the DCOS. We have a lot of feedback and ideas circulating for what should come next and how it should continue to grow. Here are some of what has been discussed, in no particular order, to help improve this integration, most of which are features we are working to add in Apache Kafka itself:
- Continued support for new Kafka and Mesos features and bug fixes.
- Integrating Kafka commands (e.g. kafka-topics, etc) into the scheduler so it can be used through the CLI and REST API.
- Auto-scaling clusters (including auto reassignment of partitions) so that the resources (CPU, RAM, etc.) that brokers are using can be used elsewhere in known valleys of traffic.
- Rack-aware partition assignment for fault tolerance.
- Hooks so that producers and consumers can also be launched from the scheduler and managed with the cluster.
- Automated partition reassignment based on load and traffic.
In the time to come, companies will expect to do even more with their growing data. Gone are the days of single monolithic databases. Now companies are adding new specialized distributed systems to process data -- but they will need to minimize the complexity of deploying and managing hardware resources, lest they risk becoming slaves to their infrastructure. Not only will Kafka be central to a company's data pipeline infrastructure to enable data flow to all these diverse systems, but a cluster manager like Mesos will become increasingly important as big data technologies like Kafka continue to proliferate.