BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles The Schema Proliferation Problem in Kafka and Flink Pipelines: How to Solve It

The Schema Proliferation Problem in Kafka and Flink Pipelines: How to Solve It

Listen to this article -  0:00

Key Takeaways

  • One-to-one event-to-schema mapping is easy to start with but creates compounding downstream complexity as systems scale: fragmented queries, rising maintenance overhead, and schema drift.
  • Event schemas with eighty to ninety-five percent structural overlap can be consolidated using discriminator enum fields, cutting table count (from over ten tables to two, for example) and enabling single-table consumer queries.
  • Nullable attribute blocks enable backward-compatible schema evolution, allowing new event variants to be added without breaking existing consumers.
  • A layered adapter design separates transformation logic from framework integration, making schema consolidation easier to implement and test within existing Apache Flink pipelines.
  • Designing schemas around consumer access patterns makes queries simpler and reduces long-term maintenance overhead in event-driven systems.

Introduction

Most teams building Apache Kafka and Apache Flink pipelines hit the same wall somewhere around the time their event catalog reaches a few dozen types. What starts as a clean system, in which each event has its own schema, gradually becomes a maintenance burden. Queries become complicated, schema changes turn into coordination exercises, and the data lake starts to look more like a data swamp.

This article focuses on that specific problem: schema proliferation from one-to-one event-to-schema mapping. It is a structural issue that builds slowly and becomes expensive to fix. The solution, discriminator-based schema consolidation, is a pattern that reduces schema count, simplifies downstream consumption, and makes schema evolution manageable.

The examples use a ride-sharing domain because it makes the problem tangible. The same pattern can apply to any system where high-cardinality event types share significant structural overlap, including contact center platforms, financial transaction systems, and healthcare event pipelines.

What One-to-One Mapping Looks Like at Scale

Let’s start with something simple. A ride-sharing platform tracks driver activity across a handful of event types:

  • Ride accepted
  • Ride started
  • Ride completed
  • Ride cancelled

These four event types represent the full lifecycle of a driver's engagement with a ride request, from the moment a driver accepts an incoming request through to the final completion or cancellation. Each event carries a timestamp, a driver identifier, a ride identifier, and a city context that appears in every variant.

Each event also varies by ride type:

  • Standard
  • Shared
  • Scheduled

Standard rides are single passenger trips that track vehicle class and surge pricing. Shared rides pool multiple passengers and track passenger count alongside a pooling efficiency score. Scheduled rides are booked in advance and carry a scheduled departure time and the number of minutes the booking was made ahead of the trip.

The natural instinct is to create a schema for each combination. This approach gives you:

DriverRideAcceptedStandardEvent
DriverRideAcceptedSharedEvent
DriverRideAcceptedScheduledEvent
DriverRideStartedStandardEvent
DriverRideStartedSharedEvent
...

Twelve schemas for four event types and three ride types. Real platforms have more event types, more subtypes, and the list keeps growing as new features ship. Each schema maps to a separate downstream table in the data lake. Twelve schemas means twelve tables. If you are using Apache Iceberg on S3, that is twelve tables to manage, twelve Schema Registry entries to version, and twelve adapter classes in the Flink pipeline. The system works until it stops being manageable as shown in Figure 1, where N event types fan out to N separate schemas and N separate Iceberg tables. Querying across variants requires UNION operations across all tables.

Figure 1. Schema proliferation with one schema per event variant (before).(Image by author.)

The Problem: Schema Proliferation

Schema proliferation occurs when a system accumulates schemas faster than the team can justify the complexity. It has four main symptoms.

Query Complexity

When a data consumer wants to answer "What did Driver 4821 do in the last hour?", that consumer needs to union across all relevant tables. If the question spans event types and ride types, the query might touch eight or ten tables. That is not a query, it is a project.

Maintenance Overhead

Most schemas in a proliferated system share eighty to ninety-five percent of their fields. When a shared field changes, that change has to be replicated across every schema that contains it. A single field rename means twenty schema updates, twenty adapter updates, and twenty rounds of testing.

Schema Drift

Schemas maintained independently diverge over time. One team adds a field with a slightly different name than another team used for the same concept. Nullability decisions become inconsistent. Types diverge. Nobody intended this inconsistency, it just happens when there is no forcing function to keep schemas aligned.

Producer-Consumer Mismatch

One-to-one schema design optimizes for the producer. From the consumer side, all of these events are variations of the same concept: Something happened in the lifecycle of a ride. Consumers want to query them together without thinking about which of twelve tables holds the record they need. The mismatch between producer-centric schemas and consumer-centric querying is the root cause of most schema proliferation pain.

The Solution: Consolidated Schema Design

Most event variants in a high-cardinality system share a structural core. The differences between a DriverRideAcceptedStandardEvent and a DriverRideAcceptedSharedEvent are small. Everything else is identical.

Instead of one schema per variant, use one consolidated schema per logical domain, as shown in Figure 2, with discriminator fields to identify the variant and nullable attribute blocks to carry variant-specific data. The consolidated approach collapses twelve event type variants into a single DriverRideActivityRecord schema. Consumers query one table and filter by discriminator fields, with no UNION required.

Figure 2. The consolidated schema: discriminator-based routing (after). (Image by author.)

Discriminator Fields

Every record in the consolidated schema carries explicit discriminator fields that identify exactly what kind of event it represents:

eventType: enum (ACCEPTED, STARTED, COMPLETED, CANCELLED)
rideType:  enum (STANDARD, SHARED, SCHEDULED)

These fields are always populated. Instead of querying twelve tables to find ride completions, a consumer queries one table and filters on eventType = 'COMPLETED'.

Using enums rather than free-form strings matters. Enums give you compile-time safety in adapter code, efficient predicate filtering in downstream query engines, and explicit documentation of what values are valid. With a plain string field, nothing prevents a producer from writing "complete" instead of "COMPLETED" and silently breaking downstream filters.

Shared Fields

Fields that appear in every record regardless of variant go at the top level of the consolidated schema:

eventTime:    timestamp
driverId:     string
rideId:       string
cityId:       string
fareAmount:   double (null for pre-completion events)
durationMins: int    (null for pre-completion events)

Nullable Attribute Blocks

Fields that apply only to certain ride types are grouped into nullable nested structures. Exactly one block is populated per record. All others are null.

{
  "type": "record",
  "name": "DriverRideActivityRecord",
  "fields": [
    {"name": "eventTime",   "type": "long"},
    {"name": "driverId",    "type": "string"},
    {"name": "rideId",      "type": "string"},
    {"name": "eventType",   "type": {"type": "enum", "name": "EventType", "symbols": ["ACCEPTED","STARTED","COMPLETED","CANCELLED"]}},
    {"name": "rideType",    "type": {"type": "enum", "name": "RideType", "symbols": ["STANDARD","SHARED","SCHEDULED"]}},
    {"name": "standardRideAttributes", "type": ["null", {
      "type": "record", "name": "StandardRideAttributes",
      "fields": [
        {"name": "vehicleClass",    "type": "string"},
        {"name": "surgeMultiplier", "type": "double"}
      ]
    }], "default": null},
    {"name": "sharedRideAttributes", "type": ["null", {
      "type": "record", "name": "SharedRideAttributes",
      "fields": [
        {"name": "passengerCount", "type": "int"},
        {"name": "poolingScore",   "type": "double"}
      ]
    }], "default": null},
    {"name": "scheduledRideAttributes", "type": ["null", {
      "type": "record", "name": "ScheduledRideAttributes",
      "fields": [
        {"name": "scheduledTime",        "type": "long"},
        {"name": "advanceBookingMinutes","type": "int"}
      ]
    }], "default": null}
  ]
}

As shown in Figure 3, a record for a shared ride populates sharedRideAttributes and leaves all other attribute blocks null. The discriminator field rideType = SHARED tells consumers which block to inspect.

Figure 3. Nullable attribute blocks: exactly one block populated per record. (Image by author.)

Implementing This Pattern in a Flink Pipeline

In a Kafka-Flink pipeline, the consolidation happens in the processing layer between Kafka ingestion and downstream serialization. The architecture uses two layers.

Layer 1: Transformation Logic

Each source event type has a dedicated adapter class responsible for mapping that event to the consolidated schema:

public class SharedRideAcceptedAdapter
    implements RecordAdapter<DriverRideAcceptedSharedEvent, DriverRideActivityRecord> {
 
    @Override
    public DriverRideActivityRecord adapt(String orgId,
                  DriverRideAcceptedSharedEvent event) {
        DriverRideActivityRecord record = new DriverRideActivityRecord();
        record.setEventTime(event.getTimestamp());
        record.setDriverId(event.getDriverId());
        record.setRideId(event.getRideId());
        record.setCityId(event.getCityId());
        record.setEventType(EventType.ACCEPTED);
        record.setRideType(RideType.SHARED);
        SharedRideAttributes attrs = new SharedRideAttributes();
        attrs.setPassengerCount(event.getPassengerCount());
        attrs.setPoolingScore(event.getPoolingScore());
        record.setSharedRideAttributes(attrs);
        return record;
    }
}

These adapter classes have no dependency on the Flink framework. They are pure transformation logic, straightforward to unit test without any framework setup.

Layer 2: Framework Integration

public class RideActivityConsolidationJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
        KafkaSource<RawRideEvent> source = KafkaSource.<RawRideEvent>builder()
            .setBootstrapServers(config.getKafkaBrokers())
            .setTopics("ride-events")
            .setGroupId("ride-consolidation-consumer")
            .setValueOnlyDeserializer(new RawRideEventDeserializer())
            .build();
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka ride events")
            .map(new ConsolidationAdapter(adapterRegistry))
            .sinkTo(icebergSink);
        env.execute("Ride Activity Schema Consolidation");
    }
}

The job reads raw events from a single Kafka topic, routes each one through ConsolidationAdapter, which looks up the correct adapter by event type and calls adapt(), then writes consolidated records to an Iceberg table on S3.

The adapterRegistry is a map of event types to their corresponding adapter instances, initialized once at job startup and passed into the ConsolidationAdapter class. The call to WatermarkStrategy.noWatermarks() is intentional because schema consolidation is a stateless transformation with no event-time windowing, so there is nothing for watermarks to advance. Exactly-once checkpointing coordinates Kafka offset commits with Iceberg's transactional commit protocol, so a job restart after failure replays from the last consistent checkpoint without producing duplicate records. The thirty-second minimum pause prevents a slow checkpoint from immediately triggering the next one. The checkpointing interval controls how much data replays on a restart and should be tuned to match your throughput and acceptable recovery time.

Schema Evolution with Apache Avro

Apache Avro is a schema-based binary serialization format widely used in Kafka pipelines. It pairs with the Schema Registry to manage schema versions and enforce compatibility rules, both of which matter more as a system grows. One of the main concerns teams raise with consolidated schemas is evolution. If everything goes into one schema, does adding a new event type require touching every consumer?

No. The rule is new attribute blocks must always be nullable with a default of null. When a new ride type ships, the schema update adds a new premiumRideAttributes nullable block with default null and adds PREMIUM to the rideType enum.

Existing consumers compiled against the old schema read new records and see null for premiumRideAttributes. They do not break. They do not need to be redeployed.

There is one edge case worth noting: Adding a new value to an Avro enum is not always safe under all Schema Registry compatibility modes. In Backward compatibility mode, a consumer compiled against the old schema may encounter an unknown symbol and throw a deserialization error. Forward compatibility covers the opposite direction, an older producer writing records that newer consumers need to read. For consolidated schemas that will evolve over time, Full or Full_Transitive compatibility is the safest setting because it simultaneously validates both directions and catches enum additions before they reach production.

Contrast this with the one-to-one approach. Adding a premium ride type means a new schema, a new table, new adapter code, a new Schema Registry entry, and consumers who want to query across all ride types now need to add another table to their unions.

Trade-offs

Wider Record

Consolidated schemas carry nullable blocks that are empty for most records. Using Avro with proper null handling, the serialization cost is minimal, but it is not zero. At very high throughput, billions of events per day, the additional serialization overhead is worth benchmarking before committing.

Schema Governance

A consolidated schema owned by multiple teams needs clear ownership. A Schema Registry with enforced compatibility rules handles the mechanical side automatically, but someone still needs to own decisions about what goes into the schema and what stays out. Without that, you trade schema proliferation for a different kind of drift.

Debugging

When something goes wrong with a specific event type, the issue is in one table alongside many other event types. An extra filter step is not expensive, but it is a change in the debugging workflow worth flagging to the team. A query like SELECT * FROM driver_ride_activity LIMIT 100 without a WHERE eventType = 'COMPLETED' filter returns a mix of all event types. This issue is easy to fix, but it is a new habit the team needs to build.

When Not to Use It

This pattern makes sense when event types share structural overlap and are frequently queried together. If two event types have completely different fields and are never queried in the same context, there is no real benefit to consolidating them.

What This Approach Looks Like in Practice

After deploying the consolidated design, the most immediate change shows up in the data layer. An event group that previously required querying across ten separate tables now queries two. For analysts, that is the difference between a query they can write in five minutes and one that requires knowing which tables hold which variants and how to union them correctly.

Schema change propagation becomes considerably simpler. When schemas share ninety to ninety-five percent of their fields, any change to a shared field previously required updating every schema containing it, coordinating adapter changes, and managing downstream table updates across teams. With the consolidated schema, the same change is one schema update.

Adding a new event variant follows the same pattern. It is additive. It does not touch existing consumers, does not require a new table, and does not require a new adapter written from scratch.

Beyond the Adapter: Native Multi-Event Support

The two-layer adapter pattern described here is the right solution when working within an existing pipeline framework that processes one event type per mapping. It delivers all the consolidation benefits without requiring changes to the underlying framework. For teams building or extending a processing framework, there is a cleaner option: native support for multiple event types per mapping at the framework level.

In the adapter approach, the framework still thinks it is handling one event type. The consolidation happens in the layer added on top. When a framework natively supports multiple input event types mapping to a single output schema, the custom routing layer disappears. The transformation logic itself, the adapter classes that map each source event to the consolidated schema, still exists. What goes away is the dispatcher that manually routes events to the right adapter. The framework takes over that responsibility natively.

The practical path most teams follow: Implement the adapter pattern first to validate that the consolidated schema design works. Once the value is clear, the case for native framework support makes itself. The consolidated schema design stays the same either way. The adapter layer is the part that eventually goes away.

Conclusion

Schema proliferation is the kind of problem that feels manageable until it isn't. By the time it becomes painful, analysts are writing twelve-table union queries and engineers are updating twenty schemas for one field rename. The schemas have become load-bearing walls. Fixing them means coordinating across teams, updating downstream consumers, and migrating historical data. None of that is impossible, but all of it is expensive.

The consolidation pattern here is not a perfect fit for every system. It works well when event types share structure and are queried together. It adds governance overhead that has to be deliberately managed. For some teams those trade-offs will not be worth it.

But if you are looking at a proliferating schema catalog and most of those schemas share eighty to ninety percent of their fields, the question worth asking is not whether to consolidate. It is how long to wait.

A reference implementation is available at this GitHub repository.

About the Author

Rate this Article

Adoption
Style

BT