Key Takeaways
- Many batch pipelines are limited more by scheduling and orchestration delays than by processing cost. Running micro-batches continuously can remove most of that latency without requiring record-level streaming.
- Record-level streaming is often proposed as the "correct" solution, but in batch-oriented systems it introduces unnecessary operational risk without delivering meaningful benefits.
- For object store-based ingestion, especially in systems with eventual consistency (such as Simple Storage Service (S3) storage), relying on success files or completion markers breaks down in practice, and deterministic, rate-based progress is often more reliable for micro-batch streaming.
- Lag and restart behavior must be designed explicitly; in freshness-driven pipelines with overlapping window semantics, skipping to the latest available partition can be more valuable than exhaustively replaying historical data.
- Long-running streaming jobs should be built to restart cleanly and regularly, treating restarts as a normal operational mechanism rather than a failure condition.
Introduction
Many data pipelines that are described as batch systems already operate in a near-continuous mode. They process incremental data frequently, often with overlapping windows, and exist primarily to reduce freshness gaps between larger, less frequent full recomputations.
This article describes the migration of such a system, a set of scheduled batch jobs responsible for generating a delta index used in a search and ads retrieval pipeline.
These jobs were moved to a continuously running micro-batch model using Spark Structured Streaming in micro-batch mode, not to achieve per-record real-time processing, but to eliminate scheduling delays and improve operational predictability.
While Spark Structured Streaming was used as the execution engine, progress tracking did not rely on its native checkpointing or event-time watermark semantics, because the pipeline advanced based on partition-level progress rather than on continuous event streams. Instead, a logical watermark was maintained externally, representing the latest processed partition based on partition timestamps.
The system operated on time-partitioned data stored in object storage (S3-style), without relying on event streams such as Kafka. A log-based ingestion model was considered, but the pipeline operated on batch-oriented, snapshot-style data where completeness at the partition level was more important than per-record ordering, making object storage-based ingestion a better fit.
As a result, progress depended on listing and interpreting partitioned data rather than consuming ordered events.
The key challenge was not computational efficiency, but making reliable progress in an environment where:
- Data arrived as time-partitioned files.
- Completion signals were unreliable.
- Freshness mattered more than strict replay of intermediate states.
Rather than adopting record-level streaming, the system converged on a simpler model based on:
- Time-driven execution.
- Watermark-based progress tracking.
- Consistently processing only the latest available partition.
This article focuses on the practical lessons from that transition: what failed, what worked, and under what constraints these design choices are applicable.
System Scope and Use Case
This work applied to production batch jobs responsible for ingesting new ads and campaign data, along with product, item, and customer signals such as conversion, performance, and co-purchase behavior. These inputs were processed to generate an inverted index used by online retrieval services. The system operated at the scale of several millions of documents, with a full index size on the order of hundreds of gigabytes and delta size typically in the tens of gigabytes range.
These jobs were not general-purpose streaming infrastructure; they operated as fixed-function pipelines over time-partitioned data in object storage, without event streams or per-record processing semantics. They sat directly on the freshness-critical path, where delays translated into a delayed go live for updated ads and related metadata resulting in stale retrieval results and lost opportunity if ads ran out of budget before the latest version made it to production.
In steady state:
- New incremental data arrived approximately every five to seven minutes depending on volume of new ads and updates to existing campaigns.
- Each delta run covered roughly last five hours
- Multiple delta runs per hour were useful and expected
Although the number of jobs was small, their operational behavior directly influenced freshness. The primary bottleneck was not computation, but scheduling delay and orchestration overhead, especially during bursts and failures.
Background: Full Index and Delta Index Pipelines
Our indexing system consisted of two pipelines with very different responsibilities.
The full index pipeline rebuilt the entire Solr index from scratch to include all ads and metadata along with product, items, customer, conversion, impression, and behavioral signals ( such as "bought together") used by downstream relevance and quality models. It was heavy, expensive, and time-consuming, which made frequent execution impractical. A full rebuild took approximately two to three hours, followed by validation and deployment taking total time up to approximately five hours. Deployment was performed using full index swaps, ensuring that updates became visible atomically rather than through partial segment updates.
To get changes into production between full index runs, we relied on a delta index pipeline. This pipeline processed only incremental ads, ads groups and ads campaign updates and was kept intentionally small. A typical delta index size was approximately one tenth of the full index, allowing it to be frequently regenerated.
In theory, this approach should have allowed updates to propagate quickly. In practice, the delta pipeline was externally scheduled and executed as a discrete job invocation each time. Over time, several issues became visible. Incremental data that arrived shortly after a scheduled run often waited nearly a full scheduling interval before being processed. Progress was tracked at the job level, so failures required re-executing the entire scheduled window, even if only part of it was incomplete. During bursts of updates, batch duration increased. In the externally scheduled model, this increase reduced or eliminated the idle gap between runs and could even cause scheduled invocations to be skipped or delayed. As a result, freshness lag compounded because progress was gated not only by processing time but also by fixed scheduling boundaries.
The core issue was not batching itself; it was the combination of coarse scheduling boundaries and job-level progress semantics. Eventually, it became clear that batch scheduling delay and orchestration overhead, rather than processing cost, were the primary contributors to freshness lag.
Why Streaming Was Controversial Internally
When streaming was first proposed, the pushback was not about correctness or performance; it was about operations.
Senior engineers raised concerns that were entirely reasonable:
- Long-running streaming jobs are harder to reason about operationally.
- Recovery behavior can be unpredictable.
- Failures tend to linger instead of terminating cleanly.
- On-call burden often increases rather than decreases.
In practice, some of these concerns did show up, especially around long-running job behavior and memory, while others became less of an issue once we simplified how the system made progress and handled restarts.
These concerns mattered. The goal was not to replace a predictable batch job with an opaque system that failed in ways that were new and harder to debug.
Any move to streaming had to address these operational risks directly.
False Start: Beginning with Record-Level Streaming
Like many teams, we initially started with record-level streaming. On paper, this approach looked like the most "correct" solution. In practice, it quickly became clear that it was unnecessary and risky for this pipeline.
The indexing logic assumed batch completeness and operated at a product or item grouping level rather than at the individual ad record level. This grouping was primarily an architectural choice in the intermediate processing pipeline, where data was aggregated at a higher-level entity before being expanded back into individual records in the final index. A change to a single ad often required recomputing the grouped representation for that product or item within the delta index. Moving to per-record streaming would have introduced partial-update states where some ads were updated but the grouped index representation was not yet fully consistent. In practice, this approach would either force regeneration of the entire affected delta partition or risk temporarily inconsistent search results.
Refactoring around this problem would have required significant changes with unclear payoff. More importantly, the business did not need per-record immediacy. What it needed was to stop waiting for batch schedules. This was the first major realization: Record-level streaming was solving a problem we did not have while introducing problems we did not want.
Converging on Micro-Batch Streaming
We moved to micro-batch streaming, deliberately. The goal was not continuous record processing, but continuous availability. Micro-batching allowed us to remove scheduling gaps while preserving batch-oriented semantics where they mattered.
Operationally, the job was configured with a fixed-trigger interval of approximately thirty seconds. This configuration was implemented using Spark Structured Streaming in micro-batch mode with a fixed processing time trigger. The trigger interval was intentionally much smaller than the partition arrival cadence of five to seven minutes, ensuring that newly available data was picked up quickly without external scheduling gaps.
Each trigger execution followed a bounded and deterministic sequence:
- Determine the current wall-clock time.
- Compute the latest partition that should be considered eligible based on time and partitioning rules.
- Compare that partition with the current watermark (the last acknowledged partition).
- If multiple partitions were pending, advance directly to the latest eligible partition rather than processing intermediate ones.
- Process one bounded partition per trigger cycle at most.
Eligibility was determined purely by partition ordering and watermark comparison, without introducing delay thresholds or buffering for storage visibility. Progress was therefore time-driven and freshness-oriented rather than strictly sequential. This approach differs from traditional streaming models that rely on sequential replay with watermarks, where all intermediate data is processed in order to maintain completeness. In contrast, this pipeline prioritized freshness by advancing directly to the latest available partition, accepting that intermediate states would be implicitly covered by overlapping window computations.
On restart, the same logic applied. The job would recompute the latest visible partition and compare it with the persisted watermark. If it were newer, it would process only that latest partition, maintaining the same freshness-first rule used during steady-state execution.
Because delta windows overlapped by design, skipping intermediate partitions did not cause permanent data loss. Subsequent runs operating over the sliding window naturally recovered any necessary state. The sliding window duration was significantly larger than the partition interval, ensuring that any skipped partitions were always covered by subsequent recomputation.
It’s worth clarifying that this approach was not an attempt to approximate batch processing by running it more frequently, nor a shortcut taken because "full streaming felt hard". We initially implemented record-level streaming and backed away from it only after seeing the operational cost and semantic mismatch it introduced for a batch-oriented indexing system.
The final design represents a convergence: using micro-batch streaming to remove scheduling latency, deterministic progress to avoid fragile completion signals, and explicit restart semantics to keep behavior predictable.
Source and Sink: Object Storage Was Not Optional
Both the source and sink of the pipeline were object storage. Incremental data arrived as partitions or files, was processed, and written back before being consumed downstream. The system relied on time-partitioned paths such as /year/month/day/hour/minute partitions, and listings were used to determine eligibility. Object storage lacks native per-record progress semantics. Completion is inferred, not guaranteed. Listings may be incomplete due to eventual consistency.
That distinction mattered. The system operated on object storage (S3-style semantics). While modern object stores provide strong read-after-write consistency, the challenge here was not consistency guarantees but the lack of a reliable way to infer partition completeness from listings in a continuously running pipeline.
False Start: Success Files and Completion Markers
Our first attempt at streaming ingestion reused the existing success-file and completion-marker logic from the batch pipeline. In the scheduled batch model, this approach worked reasonably well because listings were infrequent and evaluation happened at coarse time boundaries. In a continuously running streaming job, however, this model broke down.
We encountered issues such as:
- Completion markers appearing late or inconsistently due to non-atomic visibility between data files and completion markers during repeated listings.
- Partition listings that were temporarily incomplete, even when data had already been written.
- The streaming job repeatedly polling for markers that already existed but were not yet visible.
- Duplicate or premature processing when listings and markers became visible at slightly different times.
For example, at time T a partition begins writing data files and at T+dt a completion marker is written. A streaming job polling in this interval may observe partial data without the marker, or the marker without a complete view of all files. Since listings are not tied to a transactional boundary, data and marker visibility can diverge across polls.
Without a time-driven trigger, processing depends on detecting when a partition is complete, typically via markers or heuristics. In a streaming read based on repeated listings, this notion of completion is not reliable. A time-driven trigger avoids this dependency by advancing processing based on time rather than completion.
In practice, these issues showed up regularly enough to impact reliability. More importantly, the core problem was not just storage consistency, but the lack of a reliable way to interpret partition completion signals within a continuously evaluating streaming read.
Additionally, there was no reliable way to make marker detection both correct and efficient. Aggressive polling increased load and still did not eliminate race conditions. Conservative polling increased freshness lag and defeated the purpose of streaming.
These issues were not one-off bugs. They were structural consequences of combining marker-based completion semantics with a continuously evaluating streaming engine. At that point, it became clear that file-based completion signaling was too fragile for a system that needed deterministic and time-driven progress.
Pattern: Deterministic Progress with Rate-Based Triggers
We replaced success-file detection with a rate-based trigger that executed every thirty seconds. While this approach still involved periodic evaluation of storage state, it differed fundamentally from marker-based polling: The system did not wait for a specific completion signal, but advanced deterministically based on time and watermark comparison.
There were no filesystem notifications, no completion markers, and no "wait until partition is old enough" logic. Each trigger cycle did the same simple thing:
- List the currently visible partitions in object storage.
- Identify the latest visible partition based on timestamp ordering.
- Compare it to the watermark (the last partition the pipeline has acknowledged).
- Process the latest partition and advance the watermark if the latest partition is newer.
- Otherwise, exit immediately.
Partition ordering was based on the timestamp encoded in the partition path, representing the completion time of upstream snapshot generation rather than event-time or ingestion-time semantics. Since this timestamp was assigned by a single upstream job, ordering remained consistent and did not depend on clock synchronization across producers. Each partition corresponded to a bounded time window of data, making progress effectively snapshot-driven rather than event-time driven, with correctness achieved through overlapping reprocessing instead of strict event-time completeness.
Only the latest partition was processed. This fact made progress deterministic: The pipeline moved forward whenever a newer partition became visible, and it never blocked waiting for a specific marker file to appear.
This approach accomplished several things. Each micro-batch processed a bounded unit of work (one partition). Progress advanced via watermark comparison, not file-based completion signals.
The pipeline avoided stalling on object-store visibility glitches tied to marker files. New data was picked up quickly without external scheduling gaps. The worst-case freshness delay was reduced from approximately ten minutes to thirty seconds under normal operating conditions.
Pattern: Handling Lag by Choosing Freshness
Lag introduced a non-obvious question: What should happen if multiple new partitions became visible between trigger cycles? In a strictly sequential streaming model, the system would process each unprocessed partition in order. That approach was intentionally not adopted here. Instead, the rule was simple and consistent:
- List the latest visible partition.
- Compare it against the watermark.
- If it is newer, process only that latest partition.
- Ignore any intermediate partitions.
There was no lag threshold and no replay window. The pipeline always advanced directly to the most recent visible partition. Therefore, the system was freshness-driven by design. If partitions P1, P2, and P3 became visible while the watermark was at P0, the next execution would process P3, not P1 and P2.
This behavior was safe because the delta index operated over an overlapping sliding window, which worked because of a few practical assumptions. Each delta run recomputed a recent time window instead of applying small incremental changes. These windows overlapped, so most skipped partitions were naturally covered in later runs. The system didn’t require processing every intermediate state, only reaching the latest correct snapshot. In rare cases where too much was skipped and overlap was not enough, any missed updates were picked up by the next full index rebuild, which runs every few hours as a bounded recovery mechanism.
In systems that require ordered replay of every event, this approach would be incorrect. In this use case, replaying intermediate deltas added latency without increasing correctness. The same rule applied during steady-state execution and during restarts.
Pattern: Restarting by Jumping to the Latest
Because progress was defined strictly in terms of "latest visible partition versus watermark," restart behavior required no special replay logic. On restart, the job would:
- Recompute the latest visible partition in object storage.
- Compare it with the persisted watermark.
- If newer, process only that latest partition.
- Otherwise, exit and wait for the next trigger cycle.
In practice, two variations of this logic were used: Some jobs compared against a persisted watermark stored as lightweight metadata in object storage (a single file updated via atomic overwrite), while others simply fast-forwarded on restart by treating the latest visible partition as already processed and continuing from there. This approach avoided reliance on the streaming framework’s checkpointing, which is designed for sequential replay and did not align well with the freshness-first, non-sequential processing model.
There was no attempt to replay intermediate partitions or reconstruct a backlog. The same freshness-first rule applied consistently in both steady-state execution and recovery, which made restarts predictable and fast. A restart did not introduce a catch-up phase; it simply resumed the same deterministic watermark comparison used during normal operation.
Continuous Execution and Memory Pressure
Running continuously in a JVM-based, long-lived streaming job exposed problems that scheduled batch jobs had never encountered. Over time there were consequences to this approach:
- Heap usage gradually increased despite stable input rates.
- Garbage collection pauses became more frequent.
- Micro-batch completion times grew less predictable.
These effects were likely influenced by a combination of long-lived execution characteristics and application-level state accumulation (primarily within index-construction logic rather than Spark-managed state), rather than any single platform-specific issue. While not immediately visible, the effects accumulated over extended runtimes and made the system harder to reason about operationally.
Pattern: Planned Restarts as an Operational Tool
Instead of fighting this behavior, we embraced it. The job was designed to restart automatically, even when healthy, on a cadence of twenty-four hours. This cadence was chosen as a practical operational baseline, configurable per deployment, and validated through operational signals such as GC overhead, long-lived memory growth, and the natural cadence of code updates, rather than derived from a strict analytical threshold. Planned restarts accomplished the following:
- Released accumulated memory.
- Reset internal execution state.
- Allowed new code to be picked up without manual intervention.
Trying to keep the job running indefinitely proved harder than restarting it cleanly. This design choice addressed many of the original operational concerns raised by senior engineers.
Pattern: Watchdog-Managed Streaming Jobs
To make the system predictable, we introduced a lightweight watchdog. The watchdog was implemented externally to the streaming runtime, interacting with the streaming platform through its control APIs (implemented as a scheduler-level controller in our deployment environment) to monitor and manage job execution. The watchdog monitored job liveness, restarted jobs on unexpected termination, enforced periodic restarts, and standardized behavior across environments.
Failures became routine and recoverable instead of urgent and disruptive.
Impact on End-to-End Latency
After the migration, end-to-end latency dropped by approximately fifty percent. Here, latency refers to freshness lag, measured as the time from data availability in object storage to index availability for serving, typically observed as worst-case delay under normal operating conditions rather than percentile-based metrics.
The worst-case delay, for example, was reduced from roughly ten minutes to thirty seconds, primarily due to the elimination of scheduler and orchestration delays rather than changes in per-batch processing cost. Incremental updates no longer missed expected freshness windows under normal load. This improvement did not come from faster processing. It came from eliminating batch scheduling delays, orchestration overhead, and idle wait time between executions.
Results in Production
In production, in a system where delta freshness directly affected ad visibility and advertiser expectations, we observed:
- Faster propagation of incremental updates.
- Predictable recovery behavior.
- Fewer memory-related incidents.
- Easier rollout of code changes.
- The worst-case freshness delay reduced from roughly ten minutes to thirty seconds, significantly reducing missed update windows that were previously driven by scheduling gaps.
These results were achieved without committing to record-level streaming. This approach is best suited for batch-oriented systems where incremental freshness is important, but per-record immediacy is not. It works particularly well when ingestion relies on object storage, where completion signals are inferred and eventual consistency makes file-based coordination fragile.
In contrast, this pattern is not a good fit for systems that require strict per-record processing guarantees, ordered replay of all historical events, or exact catch-up semantics after downtime. In those cases, record-level streaming and full replay are necessary, even if they come with higher operational cost.
Conclusion
This migration didn’t succeed because we adopted streaming as a concept. It worked because we were willing to move away from designs that looked correct on paper but didn’t hold up in production. By stepping back from record-level streaming, removing fragile completion signals, making progress time-driven, and treating restarts as a normal part of operation, we ended up with a system that was simpler, more predictable, and easier to run.
This pattern works well when data naturally fits into bounded snapshots and the goal is to stay close to the latest state rather than process every intermediate step. It is not a good fit for systems that need strict ordering, a full replay of all events, or strong guarantees on processing every update. The main takeaway is that the best streaming design is the one that works reliably in production, not the one that looks the most elegant in theory.