BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Async Streams in C# 8

Async Streams in C# 8

This item in japanese

Bookmarks

Key Takeaways

  • Asynchrous programming techniques provide a way to improve a program's responsiveness.
  • Async/Await pattern debuted in C# 5, but is limited to returning a single scalar value.
  • C# 8 adds Async Streams, which allows an async method to return multiple values broadening its usability.
  • Async streams provide an excellent way to represent asynchronous data sources that can be controlled by the data's consumer.
  • Async streams are an alternative to the reactive programming model used in Java and JavaScript.

Async/Await is introduced with C# 5 to improve user interface responsiveness and web access to the resources. In other words, Async Methods help the developers to execute asynchronous operations that do not block threads and return one scalar result.

After many tries by Microsoft to simplify asynchronous operations, the async/await pattern has gained a good acceptance among developers thanks to its easy-to-understand approach.

One of the significant limitations of the existing async methods is the requirement that they must have a scalar return result (one value). Let us consider the following method async Task < int > DoAnythingAsync(). The result of DoAnythingAsync is an integer (One value). 

Because of this limitation, you cannot use this feature with yield keyword, and you cannot use it with the async IEnumerable < int > (which returns an async enumeration).

If it were to be possible to combine async/awaiting feature with a yielding operator that will allow us to use a very powerful programming model which is known as asynchronous data pull or also known as pull based enumeration and in F# this is called async sequence.

The new proposed feature Async Streams in C# 8 remove the scalar result limitation and allows the async method to returns multiple results.

This changes will make the async pattern more flexible so that you can retrieve data in lazy asynchronous sequence from the database or you can download data from in an asynchronous sequence that returns the data in chunks when they become available.  

Example:

foreach await (var streamChunck in asyncStreams)
{
  Console.WriteLine($“Received data count = {streamChunck.Count}”);
} 

Reactive Extensions (Rx) is another approach to solve asynchronous programming problems. Rx is getting broader acceptance among developers. Many other programming languages like Java and JavaScript have implemented this technique (RxJava, RxJS). Rx is based on the Push Programming Model (Tell Don’t Ask system) and is also known as reactive programming. Reactive programming is a special type of event driving programming, but it handles with data rather than notifications.

Usually, in the Push Programming Model, you do not have to control the Publisher. The data are asynchronously pushed into a queue, and the Consumers consume the data when the data arrives. Unlike the Rx, the Async Streams can be called on demand to generate multiple values until the end of the enumeration is reached.

In this article, I will compare the pull-based model with the push-based model, and I will show you in which scenario which technique is a better fit from the other one. I will use many examples and coding demo to show you the whole concept and the benefits, and finally, I will discuss the Async Streams feature and show you a demo code.

Pull Programming Model vs. Push Programming Model

[Click on the image to enlarge it]

Figure -1- Pull Programming Model vs. Push Programming Model

My example is about the Producer/Consumer famous food example, but in our scenarios, the Producer will not generate food, instead of that he generates data, and the Consumer consumes the generated data as shown in Figure -1-. The Pull Model is easy to understand. The Consumer is asking and pulling the data from the Producer. The other approach is the Push Model. The Producer publishes the data in a queue, and the Consumer must subscribe to receive the desired data.

In the "faster Producer and slow Consumer" use case, the pull model is suitable because the Consumer pulls its required data from the Producer to avoid the overflow issues.  In the "slower Producer and faster Consumer" use case, the Push Model is suitable because the Producer pushes more data from the Producer to Consumer to avoid causing the Consumer unnecessary waiting time.

Rx and Akka Streams (Stream Programming Model) use the Backpressure technique, which is a flow control mechanism. It uses the Pull or Push Models to solve the Producer/Consumer problems mentioned above.

In my example below I have used a slow Consumer to pull asynchronous sequence of data from a faster Producer. Once the Consumer processes an element, it will again ask the Producer for the next element, and so on until the end of the sequence is reached.

Motivation and Background

To understand the problem why we need Async Streams, let’s consider the following code.

// Loops and sums the provided argument (count)
static int SumFromOneToCount(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCount called!");

  var sum = 0;
  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;
  }
  return sum;
}

Method call:

const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Output:

[Click on the image to enlarge it]

We can make this method lazy by using the yield operator as shown below.

static IEnumerable<int> SumFromOneToCountYield(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountYield called!");

  var sum = 0;
  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;

    yield return sum;
  }
}

Calling the method:

const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{
  ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed.");

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Output:

[Click on the image to enlarge it]

As you see above in the output window, the result is returned in parts and not as one value. The above shown accumulated results known as lazy enumeration. However, we still have a problem; the sum methods are blocking the code. If you look at the threads, then you can see that everything is running in the main thread.

Now let’s use the magic word async and apply it to the first method SumFromOneToCount (without yield).

static async Task<int> SumFromOneToCountAsync(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountAsync called!");

  var result = await Task.Run(() =>
  {
    var sum = 0;

    for (var i = 0; i <= count; i++)
    {
      sum = sum + i;
    }
    return sum;
  });

  return result;
}

Calling the method:

const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// Sum runs asynchronously! Not enough. We need sum to be async with lazy behavior.
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed.");

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Output:

[Click on the image to enlarge it]

Nice, we can see that the computing is running in another thread, but we still have a problem with the result. The result is returned as one value!

Imagine that we can combine the lazy enumerations (yield return) with the async methods in an imperative style. This combination known as Async Streams. It is the new proposed feature in C# 8. The new feature gives us an excellent technique to solve the Pull Programming Model problems like downloading data from a website or to read in records from a file or a database in a modern way.

Let’s try to do that with current C# version. I will add async keyword to the method SumFromOneToCountYield as following

[Click on the image to enlarge it]

Figure -2- The error occurs when yield with the async keyword is combined

When we are trying to add the async to the SumFromOneToCountYield, then we get an error directly as shown above!

Let’s try something else. We can put IEnumerable in a task and remove the yield keyword as shown below:

static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!");
  var collection = new Collection<int>();

  var result = await Task.Run(() =>
  {
    var sum = 0;

    for (var i = 0; i <= count; i++)
    {
      sum = sum + i;
      collection.Add(sum);
    }
    return collection;
  });

  return result;
}

Calling the method:

const int count = 5;
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");
var scs = await SumFromOneToCountTaskIEnumerable(count);
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!");

foreach (var sc in scs)
{
  // !!!This is not what we need we become the result in one block!!!!
  ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");
}

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Output:

[Click on the image to enlarge it]

As you see in the threads, we calculate everything asynchronously, but we still have a problem. The results (all results are accumulated in the collection) returns as one block. That is not our desired lazy behavior. If you remember, our goal was combining the lazy behavior with asynchronous computing style.

To achieve the desired behavior you need to use an external library like Ix (part of Rx), or you have to use the new proposed C# feature Async Streams.

Back to our code example. I have used an external library to show the async behavior.

static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{
  ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");

  await sequence.ForEachAsync(value =>
  {
    ConsoleExt.WriteLineAsync($"Consuming the value: {value}");

    // simulate some delay!
    Task.Delay(TimeSpan.FromSeconds(1)).Wait();
  });
}

static IEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{
  ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
  var sum = 0;

  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;

    // simulate some delay!
    Task.Delay(TimeSpan.FromSeconds(0.5)).Wait();

    yield return sum;
  }
}

Calling the method:

const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!");

// Start a new task. Used to produce async sequence of data!
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable();

ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");

// Start another task; Used to consume the async data sequence!
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));

// Just for demo! Wait until the task is finished!
consumingTask.Wait();
ConsoleExt.WriteLineAsync("Async Streams Demo Done!");

Output:

[Click on the image to enlarge it]

Finally, we have achieved our desired behavior! We can iterate asynchronous over the enumeration.

Source Code is here.

Asynchronous pull with Client/Server

I will show the concept in a more realistic example. Client/Server Architecture is an excellent scenario to demonstrate the benefits of this feature.

Client/Server synchronous call

The Client sends a request to the Server, and the client must wait (The Client is blocked) until the response is coming from the Server as shown below in the Figure -3-.

Figure -3- Synchronous Data Pull, the Client, is waiting until the request is finished

Asynchronous data pull

In this case, the Client asking for the data and continue further by doing something else. Once the data is arrived then the client continue with his work.

Figure -4- Asynchronous Data Pull, the client can be doing something else while the data is requested

Asynchronous sequence data pull

In this case, the Client asking for a chunk of data and continue further by doing something else. Once the data chunk arrives. The Client process the received data and asking for the next data chunk and so on until the last data chunk. This scenario is exactly where the Async Streams idea is coming from. The Figure -5- shows that the Client can do something else or process the data chunk when any data is received.

[Click on the image to enlarge it]

Figure -5- Asynchronous Sequence Data Pull (Async Streams). The Client is not blocked!

Async Streams

Similar to IEnumerable<T> and IEnumerator<T>; There are two new interfaces, IAsyncEnumerable<T> and IAsyncEnumerator<T> which are defined as below:

public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        Task<bool> MoveNextAsync();
        T Current { get; }
    }

   // The Async Streams Feature can also asynchronous disposed
   public interface IAsyncDisposable
   {
      Task DiskposeAsync();
   }

Jonathan Allen has handled this theme in InfoQ. I do not want to repeat everything again here, so I recommend you to read his article, too.

The trick in the Task<bool> MoveNextAsync() reutrn value (changed from bool to Task<bool>bool IEnumerator.MoveNext() ).  This trick will help to keep the whole computation asynchronous and its iteration asynchronous as well.  The consumer decides that it is ready to await the next value. Mostly, it is still a pull model even though it is asynchronous. An IAsyncDisposable interface can be used to enable asynchronous cleanup. Further information about async you can find here.

Syntax

The final syntax should be something like below:

foreach await (var dataChunk in asyncStreams)
{
  // Working the yield dataChunk! Or doing something else!
}

As seen above in the example, instead of computing a single value we can now to potentially compute many values, sequentially, while also being able to await other asynchronous operations.

Microsoft Example reworked

I have rewritten the Microsoft Code demo; you can download the whole code from my GitHub.

The idea behind this example is to create a big MemoryStream (array 20000 bytes) and iterate asynchronously in sequence over the elements of the collection or the memory stream in our scenario. Each iteration pulls 8K from the array.

At the Point (1) we are creating a big byte array and filled with some dummy values, and at Point (2) we have defined a variable, which called checksum. We will use the checksum to ensure that the sum of the calculation is correct. The array and the checksum are created in memory, and both returned in a tuple as shown in the Point (3).

Point (4) AsEnumarble (better name AsAsyncEnumarble) is an extension method which it helps to simulate the async stream in 8 kb blocks (BufferSize = 8000 Point (6)).

Usually, you do not have to inherit from the IAsyncEnumerable, but in the above-shown example, Microsoft did that to simplify the demo as shown below in Point (5).

Point (7) is the "foreach", which pulls the data in 8 kb chunks from the async memory stream. The pulling process is sequentially when the Consumer (foreach code block) is ready to receive more data; then he pulls more data from the Producer (memory stream array). Finally, when the iteration is finished then, the application will compare the 'c' with the checksum and write "Checksums match!", if they are matched as shown in Point (8)!

The output window from Microsoft Demo:

[Click on the image to enlarge it]

Summary

We have discussed Async Streams, which is an excellent asynchronous pulling technique that can be used to write computations that generate multiple values asynchronously.

The programming concept behind Async Streams is the async pull model. We can demand the next element of the sequence, and we eventually get a reply. This is different to the push model of IObservable<T>, which generates values unrelatedly of the consumer’s state. Async Streams provide an excellent way to represent asynchronous data sources that can be controlled by the consumer, for example, when the consumer isn't ready to handle more data. Examples include a web application or reading records from the database.

I have demonstrated how to produce an async enumeration and consuming it with an external async sequence library. Also, I have demonstrated the concept of how this feature can be advantageous to download stuff from the web.  Finally, we have seen the new Async Streams syntax, and a complete example, which is based on the Microsoft Build Demo Code (May 7-9, 2018// Seattle, WA).

About the Author

Bassam Alugili is Senior Software Specialist and databases expert at STRATEC AG. STRATEC is a world-leading partner for fully automated analyzer systems, software for laboratory data management, and smart consumables.

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • Small fix

    by Bassam Alugili,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    To make the fake count SumFromOneToCount more clear I will change those methods as following:

    static int SumFromOneToCount(int count)
    {
    ConsoleExt.WriteLine("SumFromOneToCount called!");

    var sum = 0;
    for (var i = 0; i <= count; i++)
    {
    sum = sum + i;
    }
    return sum;
    }

    The output should be 15:

  • Question & Answer

    by Bassam Alugili,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    quotation from: www.reddit.com/r/csharp/comments/9d3usa/async_streams_in_c_8/

    Question:
    I understand the need for IAsyncEnumerable but awaiting a foreach doesn't seem like a big deal. I can just as easily do that with a while loop awaiting each iteration manually. Maybe it is nice to have but certainly not some major improvement. I am not sure I fully understand the article but it seems like what I am missing is the equivalent of yield. The final example includes manually implemented AsyncEnumerator and the great thing about the iterators language feature is that it greatly simplifies writing those. Also I thought sticking a Dispose on an IEnumerable is considered a mistake. Why are they doing it again with the Async version?



    Answer:
    Right now using IEnumerable with async is incredibly expensive in terms of the number of objects that are generated. And you can't inline functions across await calls.

    With the IAsyncEnumerable interface you have two loops. The inner loop that looks like a normal IEnumerable with all the performance that entails. Only when you actually run out of data do you move to the outer loop and make an await call.

    If the data happens to be coming in faster than you can process it, you may never hit an await. But even if you do, you won't be wasting anywhere near as much time on garbage collection and context changes.

  • another important link in InfoQ

    by Bassam Alugili,

    Your message is awaiting moderation. Thank you for participating in the discussion.

  • RxNET

    by Antão Almada ,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Thanks for the great article! It's only unfortunate that you mention RxJava and RxJS but not RxNET, the .NET version from where all other Rx* were ported from ... github.com/dotnet/reactive

  • Before C# 8.0 Async Streams Come Out

    by Sergii Semenov,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Since the release date of C# 8.0 is still unknown, you may start utilizing the 'AsyncEnumerator' library today as an interim solution. There will be an easy upgrade path when the new version of the language is out due to similarity in syntax. Some examples are available on the first page:
    github.com/Dasync/AsyncEnumerable
    P.S. I'm the author of the library.

  • Re: RxNET

    by Bassam Alugili,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Thank you for the additional information! Like +1

  • Re: Before C# 8.0 Async Streams Come Out

    by Bassam Alugili,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Yes, I agree with Sergeii and I also recommended currently to used it! github.com/Dasync/AsyncEnumerable ---> Just try it when you need a good soultion for the async pull based problems.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT