Apache KafkaStreamingReal-TimeEvent-DrivenData Processing
Apache Kafka Streaming 2026: Real-Time Data Processing
Miraclin Technologies•

Apache Kafka Streaming 2026: Real-Time Data Processing
Apache Kafka is the leading distributed streaming platform, enabling real-time data processing and event-driven architectures. This guide covers Kafka fundamentals, stream processing, and best practices for building scalable, fault-tolerant streaming applications.
Kafka Architecture and Concepts
Core Components
# Kafka cluster setup
# server.properties
broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=/var/kafka-logs
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
Topic Management
# Create topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 6 \
--replication-factor 3
# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092
# Describe topic
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic user-events
# Modify topic
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 12
Producer Implementation
Java Producer
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
public class UserEventProducer {
private final Producer<String, String> producer;
public UserEventProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Performance settings
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Batching for throughput
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
this.producer = new KafkaProducer<>(props);
}
public void sendUserEvent(String userId, String eventData) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events",
userId,
eventData
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Error sending message: " + exception.getMessage());
} else {
System.out.println("Message sent to partition " + metadata.partition() +
" with offset " + metadata.offset());
}
}
});
}
public void close() {
producer.close();
}
}
Node.js Producer
const kafka = require('kafkajs');
const client = kafka({
clientId: 'user-service',
brokers: ['localhost:9092'],
retry: {
initialRetryTime: 100,
retries: 8
}
});
const producer = client.producer({
maxInFlightRequests: 1,
idempotent: true,
transactionTimeout: 30000
});
class UserEventProducer {
async connect() {
await producer.connect();
}
async sendEvent(userId, eventType, eventData) {
try {
const message = {
key: userId,
value: JSON.stringify({
userId,
eventType,
eventData,
timestamp: Date.now()
}),
headers: {
'event-type': eventType,
'source': 'user-service'
}
};
const result = await producer.send({
topic: 'user-events',
messages: [message]
});
console.log('Message sent:', result);
} catch (error) {
console.error('Error sending message:', error);
}
}
async sendBatch(events) {
const messages = events.map(event => ({
key: event.userId,
value: JSON.stringify(event),
timestamp: Date.now().toString()
}));
await producer.sendBatch({
topicMessages: [{
topic: 'user-events',
messages
}]
});
}
async disconnect() {
await producer.disconnect();
}
}
module.exports = UserEventProducer;
Consumer Implementation
Java Consumer
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
public class UserEventConsumer {
private final Consumer<String, String> consumer;
public UserEventConsumer(String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Consumer settings
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
this.consumer = new KafkaConsumer<>(props);
}
public void consume() {
consumer.subscribe(Arrays.asList("user-events"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
processEvent(record.key(), record.value());
}
// Manual commit after processing
consumer.commitSync();
}
} catch (Exception e) {
System.err.println("Error in consumer: " + e.getMessage());
} finally {
consumer.close();
}
}
private void processEvent(String userId, String eventData) {
// Process the event
System.out.println("Processing event for user: " + userId);
System.out.println("Event data: " + eventData);
// Simulate processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Node.js Consumer
const kafka = require('kafkajs');
const client = kafka({
clientId: 'analytics-service',
brokers: ['localhost:9092']
});
const consumer = client.consumer({
groupId: 'analytics-group',
minBytes: 1,
maxBytes: 1e6,
maxWaitTimeInMs: 3000
});
class UserEventConsumer {
async connect() {
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: true });
}
async startConsuming() {
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString());
await this.processEvent(event);
} catch (error) {
console.error('Error processing message:', error);
}
},
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (let message of batch.messages) {
try {
const event = JSON.parse(message.value.toString());
await this.processEvent(event);
resolveOffset(message.offset);
await heartbeat();
} catch (error) {
console.error('Error processing batch message:', error);
}
}
}
});
}
async processEvent(event) {
console.log('Processing event:', event);
// Route to different processors based on event type
switch (event.eventType) {
case 'user_login':
await this.handleUserLogin(event);
break;
case 'user_purchase':
await this.handleUserPurchase(event);
break;
case 'user_logout':
await this.handleUserLogout(event);
break;
default:
console.log('Unknown event type:', event.eventType);
}
}
async handleUserLogin(event) {
// Update user analytics
console.log(`User ${event.userId} logged in`);
}
async handleUserPurchase(event) {
// Process purchase analytics
console.log(`User ${event.userId} made a purchase`);
}
async handleUserLogout(event) {
// Update session analytics
console.log(`User ${event.userId} logged out`);
}
async disconnect() {
await consumer.disconnect();
}
}
module.exports = UserEventConsumer;
Kafka Streams Processing
Stream Processing Application
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
public class UserEventStreamProcessor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-event-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Input stream
KStream<String, String> userEvents = builder.stream("user-events");
// Filter and transform events
KStream<String, String> loginEvents = userEvents
.filter((key, value) -> value.contains("\"eventType\":\"user_login\""))
.mapValues(value -> {
// Transform the event data
return processLoginEvent(value);
});
// Aggregate user activity
KTable<String, Long> userActivityCount = userEvents
.groupByKey()
.count(Materialized.as("user-activity-count"));
// Window-based aggregation
KTable<Windowed<String>, Long> windowedCounts = userEvents
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Join streams
KStream<String, String> enrichedEvents = userEvents
.leftJoin(userActivityCount,
(eventValue, count) -> enrichEventWithCount(eventValue, count));
// Output streams
loginEvents.to("user-login-events");
enrichedEvents.to("enriched-user-events");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static String processLoginEvent(String eventData) {
// Process login event logic
return eventData + ",\"processed\":true";
}
private static String enrichEventWithCount(String eventData, Long count) {
// Enrich event with user activity count
return eventData.replace("}", ",\"userActivityCount\":" + count + "}");
}
}
Complex Stream Processing
public class AdvancedStreamProcessor {
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
// User events stream
KStream<String, UserEvent> userEvents = builder
.stream("user-events", Consumed.with(Serdes.String(), userEventSerde()));
// User profile table
KTable<String, UserProfile> userProfiles = builder
.table("user-profiles", Consumed.with(Serdes.String(), userProfileSerde()));
// Detect fraud patterns
KStream<String, FraudAlert> fraudAlerts = userEvents
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.aggregate(
() -> new UserActivity(),
(key, event, activity) -> activity.addEvent(event),
Materialized.with(Serdes.String(), userActivitySerde())
)
.toStream()
.filter((windowedKey, activity) -> activity.isSuspicious())
.map((windowedKey, activity) ->
KeyValue.pair(windowedKey.key(), new FraudAlert(windowedKey.key(), activity)));
// Real-time recommendations
KStream<String, Recommendation> recommendations = userEvents
.filter((key, event) -> event.getEventType().equals("page_view"))
.join(userProfiles,
(event, profile) -> generateRecommendation(event, profile),
Joined.with(Serdes.String(), userEventSerde(), userProfileSerde()));
// Output streams
fraudAlerts.to("fraud-alerts", Produced.with(Serdes.String(), fraudAlertSerde()));
recommendations.to("recommendations", Produced.with(Serdes.String(), recommendationSerde()));
return builder.build();
}
private Recommendation generateRecommendation(UserEvent event, UserProfile profile) {
// Recommendation logic based on user behavior and profile
return new Recommendation(profile.getUserId(),
generateProductRecommendations(event, profile));
}
}
Schema Registry and Avro
Avro Schema Definition
{
"type": "record",
"name": "UserEvent",
"namespace": "com.example.events",
"fields": [
{"name": "userId", "type": "string"},
{"name": "eventType", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "properties", "type": {"type": "map", "values": "string"}},
{"name": "sessionId", "type": ["null", "string"], "default": null}
]
}
Producer with Schema Registry
import io.confluent.kafka.serializers.KafkaAvroSerializer;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, UserEvent> producer = new KafkaProducer<>(props);
UserEvent event = UserEvent.newBuilder()
.setUserId("user123")
.setEventType("page_view")
.setTimestamp(System.currentTimeMillis())
.setProperties(Map.of("page", "/products", "category", "electronics"))
.build();
producer.send(new ProducerRecord<>("user-events", event.getUserId(), event));
Kafka Connect
JDBC Source Connector
{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"table.whitelist": "users,orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "db-",
"poll.interval.ms": 1000
}
}
Elasticsearch Sink Connector
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "user-events",
"connection.url": "http://localhost:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true"
}
}
Monitoring and Operations
JMX Metrics
// Custom metrics
public class KafkaMetrics {
private final MeterRegistry meterRegistry;
public KafkaMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void recordMessageProcessed(String topic) {
Counter.builder("kafka.messages.processed")
.tag("topic", topic)
.register(meterRegistry)
.increment();
}
public void recordProcessingTime(String topic, Duration duration) {
Timer.builder("kafka.processing.time")
.tag("topic", topic)
.register(meterRegistry)
.record(duration);
}
}
Health Checks
# Check cluster health
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# Check consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group analytics-group --describe
# Topic metrics
kafka-run-class.sh kafka.tools.JmxTool \
--object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
--jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
Security Configuration
SSL/TLS Configuration
# server.properties
listeners=SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/var/ssl/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/var/ssl/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.client.auth=required
SASL Authentication
# SASL configuration
listeners=SASL_SSL://localhost:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# JAAS configuration
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
Performance Tuning
Producer Optimization
// High throughput producer settings
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
Consumer Optimization
// High throughput consumer settings
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 262144);
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072);
Career Opportunities
High-Demand Roles
- Kafka Engineer: $105,000 - $145,000
- Stream Processing Engineer: $110,000 - $150,000
- Data Platform Engineer: $115,000 - $155,000
- Real-Time Systems Architect: $130,000 - $170,000
Essential Skills
- Kafka architecture and administration
- Stream processing with Kafka Streams
- Schema management and evolution
- Performance tuning and optimization
- Security implementation
- Monitoring and troubleshooting
- Integration patterns
- Event-driven architecture design
Conclusion
Apache Kafka is essential for building real-time, event-driven applications. Master these streaming concepts, processing patterns, and operational practices to create scalable, fault-tolerant data pipelines. The growing demand for real-time data processing creates excellent opportunities for Kafka specialists in modern data architectures.