Using kubernetes-kafka as a starting point with minikube.
This uses a StatefulSet and a headless service for service discovery within the cluster.
The goal is to expose the individual Kafka Brokers externally which are internally addressed as:
kafka-0.broker.kafka.svc.cluster.local:9092
kafka-1.broker.kafka.svc.cluster.local:9092
kafka-2.broker.kafka.svc.cluster.local:9092
The constraint is that this external service be able to address the brokers specifically.
Whats the right (or one possible) way of going about this? Is it possible to expose a external service per kafka-x.broker.kafka.svc.cluster.local:9092
?
We have solved this in 1.7 by changing the headless service to Type=NodePort
and setting the externalTrafficPolicy=Local
. This bypasses the internal load balancing of a Service and traffic destined to a specific node on that node port will only work if a Kafka pod is on that node.
apiVersion: v1
kind: Service
metadata:
name: broker
spec:
externalTrafficPolicy: Local
ports:
- nodePort: 30000
port: 30000
protocol: TCP
targetPort: 9092
selector:
app: broker
type: NodePort
For example, we have two nodes nodeA and nodeB, nodeB is running a kafka pod. nodeA:30000 will not connect but nodeB:30000 will connect to the kafka pod running on nodeB.
https://kubernetes.io/docs/tutorials/services/source-ip/#source-ip-for-services-with-typenodeport
Note this was also available in 1.5 and 1.6 as a beta annotation, more can be found here on feature availability: https://kubernetes.io/docs/tasks/access-application-cluster/create-external-load-balancer/#preserving-the-client-source-ip
Note also that while this ties a kafka pod to a specific external network identity, it does not guarantee that your storage volume will be tied to that network identity. If you are using the VolumeClaimTemplates in a StatefulSet then your volumes are tied to the pod while kafka expects the volume to be tied to the network identity.
For example, if the kafka-0 pod restarts and kafka-0 comes up on nodeC instead of nodeA, kafka-0's pvc (if using VolumeClaimTemplates) has data that it is for nodeA and the broker running on kafka-0 starts rejecting requests thinking that it is nodeA not nodeC.
To fix this, we are looking forward to Local Persistent Volumes but right now we have a single PVC for our kafka StatefulSet and data is stored under $NODENAME
on that PVC to tie volume data to a particular node.
https://github.com/kubernetes/features/issues/121
https://kubernetes.io/docs/concepts/storage/volumes/#local
Solutions so far weren't quite satisfying enough for myself, so I'm going to post an answer of my own. My goals:
- Pods should still be dynamically managed through a StatefulSet as much as possible.
- Create an external service per Pod (i.e Kafka Broker) for Producer/Consumer clients and avoid load balancing.
- Create an internal headless service so that each Broker can communicate with each other.
Starting with Yolean/kubernetes-kafka, the only thing missing is exposing the service externally and two challenges in doing so.
- Generating unique labels per Broker pod so that we can create an external service for each of the Broker pods.
- Telling the Brokers to communicate to each other using the internal Service while configuring Kafka to tell the producer/consumers to communicate over the external Service.
Per pod labels and external services:
To generate labels per pod, this issue was really helpful. Using it as a guide, we add the following line to the 10broker-config.yml init.sh
property with:
kubectl label pods ${HOSTNAME} kafka-set-component=${HOSTNAME}
We keep the existing headless service, but we also generate an external Service per pod using the label (I added them to 20dns.yml):
apiVersion: v1
kind: Service
metadata:
name: broker-0
namespace: kafka
spec:
type: NodePort
ports:
- port: 9093
nodePort: 30093
selector:
kafka-set-component: kafka-0
Configure Kafka with internal/external listeners
I found this issue incredibly useful in trying to understand how to configure Kafka.
This again requires updating the init.sh
and server.properties
properties in 10broker-config.yml with the following:
Add the following to the server.properties
to update the security protocols (currently using PLAINTEXT
):
listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
inter.broker.listener.name=INTERNAL_PLAINTEXT
Dynamically determine the external IP and for external port for each Pod in the init.sh
:
EXTERNAL_LISTENER_IP=<your external addressable cluster ip>
EXTERNAL_LISTENER_PORT=$((30093 + ${HOSTNAME##*-}))
Then configure listeners
and advertised.listeners
IPs for EXTERNAL_LISTENER
and INTERNAL_LISTENER
(also in the init.sh
property):
sed -i "s/#listeners=PLAINTEXT:\/\/:9092/listeners=INTERNAL_PLAINTEXT:\/\/0.0.0.0:9092,EXTERNAL_PLAINTEXT:\/\/0.0.0.0:9093/" /etc/kafka/server.properties
sed -i "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=INTERNAL_PLAINTEXT:\/\/$HOSTNAME.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT:\/\/$EXTERNAL_LISTENER_IP:$EXTERNAL_LISTENER_PORT/" /etc/kafka/server.properties
Obviously, this is not a full solution for production (for example addressing security for the externally exposed brokers) and I'm still refining my understanding of how to also let internal producer/consumers to also communicate with the brokers.
However, so far this is the best approach for my understanding of Kubernetes and Kafka.
I'd like to say that I'd read this Question and Answer 3 times before from trying to wrap my head around what Headless Services were/what the point of them was. (and I never fully understood Headless Services, or what this Q&A was about.)
And on the 4th read (revisiting it after further educating myself) it finally clicked/I finally understood.
So the purpose of this answer is to restate Nadir's question/problem/and answer as if explaining it to a grade schooler. So that others who stumble upon this will get the significance of Nadir's awesome solution on the first read.
Useful Background Knowledge:
There exists a Service of type: ExternalName.
ExternalName
Service simply points to a DNS address.
There are 2 Flavors of
ExternalName Service:
- Without a Cluster IP:
A good use case would be allowing a testing cluster and a production cluster to share as much code as
possible. (and for simple conviencence in some cases) Pods in both
testing and production would point to the same service Inner Cluster DNS Address
Name, that would be the predictable reuseable code. The difference
would be that the testing enviornment would have a service that
points to a SQL service that exists inside the cluster. The
production cluster would use an ExternalName Service, which would
redirect/point to the DNS Address of a Cloud Providers Managed SQL
solution.
- With a Cluster IP:
This is the version of an ExternalName Service that is key to the solution.
A Stateful Set has 3 parts to its identity:
- An Ordinal (Number)
- Persistent Storage
- A Persistent and predictable Inner Cluster DNS Name (it gets this from the requirement that it must be shipped with a Headless service)
There are 3 important things to remember about Kube-Proxy:
- It makes sure everything has a unique IP.
- It's responsible for implementing the Virtual Static Cluster IP's (the Virtual Static Cluster IP's are considered virtual because they only exist in every nodes iptables in the iptables implementation of Kube-Proxy, or in a Kernel Hash Table in the ip-vs next-gen version of Kube-Proxy) and it's also responsible for the Logical Load Balancing effect that occurs with Normal Kubernetes Services that have a Cluster IP.
- KubeProxy is responsible for mapping Traffic that comes in on NodePorts to a corresponding Kubernetes Service with a static Cluster IP. <-- This is very
important to the requirement that the Stateful Services should be
externally exposed, NodePorts are always supposed to be involved when
it comes to externally exposing services.
There are 4 important things to remember about a Headless Service:
- It creates a predictable DNS Address.
- It doesn't act as an internal cluster Load Balancer. You talk directly to the pod identified by the predictable DNS Address. (which
is very desirable for stateful workloads)
- It doesn't have a Static Cluster IP.
- As a side effect of qualities 2 and 3, it's outside the Realm of Kube-Proxy (which is responsible for directing traffic coming in on
Node Ports to Services.) I'll paraphrase this a few times so the
problem sinks in: NodePorts can't usually forward traffic to Headless
Services. External traffic entering the cluster can't usually be
forwarded to Headless Services. It's not intuitive how to externally
expose a Headless Service.
Now that we understand the problem better, lets go back to the question: How can a Headless Service (which points to an individual member of a stateful set) be externally exposed?
Solution Part 1:
Any pod in the cluster can talk to the members of the statefulset.
Because the stateful generate a headless service, with a predictable inner cluster DNS address of the form:
statefulsetname-#.associatedheadlessservice.namespace.svc.cluster.local:port
kafka-0.broker.kafka.svc.cluster.local:9092
kafka-1.broker.kafka.svc.cluster.local:9092
kafka-2.broker.kafka.svc.cluster.local:9092
broker.kafka.svc.cluster.local:9092, can also be used to refer to which ever one's available.
Solution Part 2:
You allow external traffic to talk to members of the stateful set, by introducing a 2nd service that can accept external traffic, and then redirecting traffic from that service to the headless service that can only accept internet traffic.
For each pod in the Stateful Set a Service of type ExternalName with a Virtual Static ClusterIP Address that's managed by Kube-Proxy is created. Each one of these ExternalName Services points to/redirects traffic to a predictable static inner cluster DNS Address identified in Solution 1, and because this ExternalName service has a Virtual Static ClusterIP managed via Kube-Proxy, there can be a mapping from NodePorts to it.
Change the service from a headless ClusterIP into a NodePort which would forward request to any of the nodes on a set port (30092 in my example) to port 9042 on the Kafkas. You would hit one of the pods, on random, but I guess that is fine.
20dns.yml becomes (something like this):
# A no longer headless service to create DNS records
---
apiVersion: v1
kind: Service
metadata:
name: broker
namespace: kafka
spec:
type: NodePort
ports:
- port: 9092
- nodePort: 30092
# [podname].broker.kafka.svc.cluster.local
selector:
app: kafka
Disclaimer: You might need two services. One headless for the internal dns names and one NodePort for the external access. I haven't tried this my self.
From the kubernetes kafka documentation:
Outside access with hostport
An alternative is to use the hostport for the outside access. When
using this only one kafka broker can run on each host, which is a good
idea anyway.
In order to switch to hostport the kafka advertise address needs to be
switched to the ExternalIP or ExternalDNS name of the node running the
broker. in kafka/10broker-config.yml switch to
OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath='{.status.addresses[?(@.type=="ExternalIP")].address}')
OUTSIDE_PORT=${OutsidePort}
and in kafka/50kafka.yml add the hostport:
- name: outside
containerPort: 9094
hostPort: 9094