Miraclin Technologies
← Back to Blog
Apache KafkaStreamingReal-TimeEvent-DrivenData Processing

Apache Kafka Streaming 2026: Real-Time Data Processing

Miraclin Technologies
Apache Kafka Streaming 2026: Real-Time Data Processing

Apache Kafka Streaming 2026: Real-Time Data Processing

Apache Kafka is the leading distributed streaming platform, enabling real-time data processing and event-driven architectures. This guide covers Kafka fundamentals, stream processing, and best practices for building scalable, fault-tolerant streaming applications.

Kafka Architecture and Concepts

Core Components

# Kafka cluster setup
# server.properties
broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=/var/kafka-logs
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

Topic Management

# Create topic
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --partitions 6 \
  --replication-factor 3

# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092

# Describe topic
kafka-topics.sh --describe \
  --bootstrap-server localhost:9092 \
  --topic user-events

# Modify topic
kafka-topics.sh --alter \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --partitions 12

Producer Implementation

Java Producer

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

public class UserEventProducer {
    private final Producer<String, String> producer;
    
    public UserEventProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // Performance settings
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // Batching for throughput
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        
        this.producer = new KafkaProducer<>(props);
    }
    
    public void sendUserEvent(String userId, String eventData) {
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "user-events", 
            userId, 
            eventData
        );
        
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.err.println("Error sending message: " + exception.getMessage());
                } else {
                    System.out.println("Message sent to partition " + metadata.partition() + 
                                     " with offset " + metadata.offset());
                }
            }
        });
    }
    
    public void close() {
        producer.close();
    }
}

Node.js Producer

const kafka = require('kafkajs');

const client = kafka({
  clientId: 'user-service',
  brokers: ['localhost:9092'],
  retry: {
    initialRetryTime: 100,
    retries: 8
  }
});

const producer = client.producer({
  maxInFlightRequests: 1,
  idempotent: true,
  transactionTimeout: 30000
});

class UserEventProducer {
  async connect() {
    await producer.connect();
  }

  async sendEvent(userId, eventType, eventData) {
    try {
      const message = {
        key: userId,
        value: JSON.stringify({
          userId,
          eventType,
          eventData,
          timestamp: Date.now()
        }),
        headers: {
          'event-type': eventType,
          'source': 'user-service'
        }
      };

      const result = await producer.send({
        topic: 'user-events',
        messages: [message]
      });

      console.log('Message sent:', result);
    } catch (error) {
      console.error('Error sending message:', error);
    }
  }

  async sendBatch(events) {
    const messages = events.map(event => ({
      key: event.userId,
      value: JSON.stringify(event),
      timestamp: Date.now().toString()
    }));

    await producer.sendBatch({
      topicMessages: [{
        topic: 'user-events',
        messages
      }]
    });
  }

  async disconnect() {
    await producer.disconnect();
  }
}

module.exports = UserEventProducer;

Consumer Implementation

Java Consumer

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

public class UserEventConsumer {
    private final Consumer<String, String> consumer;
    
    public UserEventConsumer(String groupId) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        // Consumer settings
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        
        this.consumer = new KafkaConsumer<>(props);
    }
    
    public void consume() {
        consumer.subscribe(Arrays.asList("user-events"));
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    processEvent(record.key(), record.value());
                }
                
                // Manual commit after processing
                consumer.commitSync();
            }
        } catch (Exception e) {
            System.err.println("Error in consumer: " + e.getMessage());
        } finally {
            consumer.close();
        }
    }
    
    private void processEvent(String userId, String eventData) {
        // Process the event
        System.out.println("Processing event for user: " + userId);
        System.out.println("Event data: " + eventData);
        
        // Simulate processing time
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Node.js Consumer

const kafka = require('kafkajs');

const client = kafka({
  clientId: 'analytics-service',
  brokers: ['localhost:9092']
});

const consumer = client.consumer({
  groupId: 'analytics-group',
  minBytes: 1,
  maxBytes: 1e6,
  maxWaitTimeInMs: 3000
});

class UserEventConsumer {
  async connect() {
    await consumer.connect();
    await consumer.subscribe({ topic: 'user-events', fromBeginning: true });
  }

  async startConsuming() {
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        try {
          const event = JSON.parse(message.value.toString());
          await this.processEvent(event);
        } catch (error) {
          console.error('Error processing message:', error);
        }
      },
      
      eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
        for (let message of batch.messages) {
          try {
            const event = JSON.parse(message.value.toString());
            await this.processEvent(event);
            resolveOffset(message.offset);
            await heartbeat();
          } catch (error) {
            console.error('Error processing batch message:', error);
          }
        }
      }
    });
  }

  async processEvent(event) {
    console.log('Processing event:', event);
    
    // Route to different processors based on event type
    switch (event.eventType) {
      case 'user_login':
        await this.handleUserLogin(event);
        break;
      case 'user_purchase':
        await this.handleUserPurchase(event);
        break;
      case 'user_logout':
        await this.handleUserLogout(event);
        break;
      default:
        console.log('Unknown event type:', event.eventType);
    }
  }

  async handleUserLogin(event) {
    // Update user analytics
    console.log(`User ${event.userId} logged in`);
  }

  async handleUserPurchase(event) {
    // Process purchase analytics
    console.log(`User ${event.userId} made a purchase`);
  }

  async handleUserLogout(event) {
    // Update session analytics
    console.log(`User ${event.userId} logged out`);
  }

  async disconnect() {
    await consumer.disconnect();
  }
}

module.exports = UserEventConsumer;

Kafka Streams Processing

Stream Processing Application

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

public class UserEventStreamProcessor {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-event-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Input stream
        KStream<String, String> userEvents = builder.stream("user-events");
        
        // Filter and transform events
        KStream<String, String> loginEvents = userEvents
            .filter((key, value) -> value.contains("\"eventType\":\"user_login\""))
            .mapValues(value -> {
                // Transform the event data
                return processLoginEvent(value);
            });
        
        // Aggregate user activity
        KTable<String, Long> userActivityCount = userEvents
            .groupByKey()
            .count(Materialized.as("user-activity-count"));
        
        // Window-based aggregation
        KTable<Windowed<String>, Long> windowedCounts = userEvents
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .count();
        
        // Join streams
        KStream<String, String> enrichedEvents = userEvents
            .leftJoin(userActivityCount, 
                (eventValue, count) -> enrichEventWithCount(eventValue, count));
        
        // Output streams
        loginEvents.to("user-login-events");
        enrichedEvents.to("enriched-user-events");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
    
    private static String processLoginEvent(String eventData) {
        // Process login event logic
        return eventData + ",\"processed\":true";
    }
    
    private static String enrichEventWithCount(String eventData, Long count) {
        // Enrich event with user activity count
        return eventData.replace("}", ",\"userActivityCount\":" + count + "}");
    }
}

Complex Stream Processing

public class AdvancedStreamProcessor {
    
    public Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // User events stream
        KStream<String, UserEvent> userEvents = builder
            .stream("user-events", Consumed.with(Serdes.String(), userEventSerde()));
        
        // User profile table
        KTable<String, UserProfile> userProfiles = builder
            .table("user-profiles", Consumed.with(Serdes.String(), userProfileSerde()));
        
        // Detect fraud patterns
        KStream<String, FraudAlert> fraudAlerts = userEvents
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
            .aggregate(
                () -> new UserActivity(),
                (key, event, activity) -> activity.addEvent(event),
                Materialized.with(Serdes.String(), userActivitySerde())
            )
            .toStream()
            .filter((windowedKey, activity) -> activity.isSuspicious())
            .map((windowedKey, activity) -> 
                KeyValue.pair(windowedKey.key(), new FraudAlert(windowedKey.key(), activity)));
        
        // Real-time recommendations
        KStream<String, Recommendation> recommendations = userEvents
            .filter((key, event) -> event.getEventType().equals("page_view"))
            .join(userProfiles, 
                (event, profile) -> generateRecommendation(event, profile),
                Joined.with(Serdes.String(), userEventSerde(), userProfileSerde()));
        
        // Output streams
        fraudAlerts.to("fraud-alerts", Produced.with(Serdes.String(), fraudAlertSerde()));
        recommendations.to("recommendations", Produced.with(Serdes.String(), recommendationSerde()));
        
        return builder.build();
    }
    
    private Recommendation generateRecommendation(UserEvent event, UserProfile profile) {
        // Recommendation logic based on user behavior and profile
        return new Recommendation(profile.getUserId(), 
                                generateProductRecommendations(event, profile));
    }
}

Schema Registry and Avro

Avro Schema Definition

{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.example.events",
  "fields": [
    {"name": "userId", "type": "string"},
    {"name": "eventType", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "properties", "type": {"type": "map", "values": "string"}},
    {"name": "sessionId", "type": ["null", "string"], "default": null}
  ]
}

Producer with Schema Registry

import io.confluent.kafka.serializers.KafkaAvroSerializer;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

Producer<String, UserEvent> producer = new KafkaProducer<>(props);

UserEvent event = UserEvent.newBuilder()
    .setUserId("user123")
    .setEventType("page_view")
    .setTimestamp(System.currentTimeMillis())
    .setProperties(Map.of("page", "/products", "category", "electronics"))
    .build();

producer.send(new ProducerRecord<>("user-events", event.getUserId(), event));

Kafka Connect

JDBC Source Connector

{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "postgres",
    "connection.password": "password",
    "table.whitelist": "users,orders",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "db-",
    "poll.interval.ms": 1000
  }
}

Elasticsearch Sink Connector

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "user-events",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}

Monitoring and Operations

JMX Metrics

// Custom metrics
public class KafkaMetrics {
    private final MeterRegistry meterRegistry;
    
    public KafkaMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    public void recordMessageProcessed(String topic) {
        Counter.builder("kafka.messages.processed")
            .tag("topic", topic)
            .register(meterRegistry)
            .increment();
    }
    
    public void recordProcessingTime(String topic, Duration duration) {
        Timer.builder("kafka.processing.time")
            .tag("topic", topic)
            .register(meterRegistry)
            .record(duration);
    }
}

Health Checks

# Check cluster health
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# Check consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group analytics-group --describe

# Topic metrics
kafka-run-class.sh kafka.tools.JmxTool \
  --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
  --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi

Security Configuration

SSL/TLS Configuration

# server.properties
listeners=SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/var/ssl/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/var/ssl/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.client.auth=required

SASL Authentication

# SASL configuration
listeners=SASL_SSL://localhost:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# JAAS configuration
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};

Performance Tuning

Producer Optimization

// High throughput producer settings
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

Consumer Optimization

// High throughput consumer settings
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 262144);
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072);

Career Opportunities

High-Demand Roles

  • Kafka Engineer: $105,000 - $145,000
  • Stream Processing Engineer: $110,000 - $150,000
  • Data Platform Engineer: $115,000 - $155,000
  • Real-Time Systems Architect: $130,000 - $170,000

Essential Skills

  1. Kafka architecture and administration
  2. Stream processing with Kafka Streams
  3. Schema management and evolution
  4. Performance tuning and optimization
  5. Security implementation
  6. Monitoring and troubleshooting
  7. Integration patterns
  8. Event-driven architecture design

Conclusion

Apache Kafka is essential for building real-time, event-driven applications. Master these streaming concepts, processing patterns, and operational practices to create scalable, fault-tolerant data pipelines. The growing demand for real-time data processing creates excellent opportunities for Kafka specialists in modern data architectures.