Dart Stream

Summary: in this tutorial, you’ll learn about Dart stream and how to process a stream including reading, handling errors, canceling, and transforming.

Introduction to the Dart Stream

A future represents a single value that will be returned by an asynchronous operation. A stream is like a list of futures, representing multiple values that will be returned in the future.

Dart uses the Stream<T> class to create streams. Since the Stream<T> is a generic class, you can have a stream of any objects.

Typically, you’ll use streams to:

  • Read data from a large file in chunks.
  • Download a resource from a remote server.
  • Listen for requests coming into a server.

In practice, you’ll often consume streams from libraries rather than creating new streams from scratch. But to understand it deeply, we’ll show you how to create a simple stream.

Creating a stream

To create a stream, you use the StreamController<T> class. The StreamController<T> class creates a stream that others can listen to and push events (or data) to it.

Here are the steps for creating a stream:

First, create an instance of the StreamController<T> class:

var controller = StreamController<int>();Code language: Dart (dart)

Second, add an event to the stream via the sink property of the StreamController<T> object. The sink property has the type of StreamSink<T>. For example, the following adds the number 1 to the stream:

controller.sink(1);Code language: Dart (dart)

Third, access the stream via the stream property of the StreamController<T> object:

var stream = controller.stream;Code language: Dart (dart)

Fourth, to raise an error, you use the addError() method:

controller.addError("An error occurred");Code language: Dart (dart)

Finally, to close the stream, you use the close() method:

controller.close()Code language: CSS (css)

The following illustrates how to use the StreamController<T> to create a stream that emits an integer every second:

import 'dart:async';

class Number {
  Number() {
    Timer.periodic(
      Duration(seconds: 1),
      (timer) {
        _controller.sink.add(_count);
        _count++;
      },
    );
  }

  //  create a StreamController
  final _controller = StreamController<int>();
  var _count = 1;

  // property that returns the stream object
  Stream<int> get stream => _controller.stream;
}Code language: Dart (dart)

Reading from a stream

To read data from a stream, you can use either callbacks or async & await keywords.

1) Reading from a stream example using callbacks

The following example shows how to use read data from the number stream above using a callback:

import 'number.dart';

void main() {
  var stream = Number().stream;

  var subscription = stream.listen(
    (event) => print(event),
  );
}
Code language: Dart (dart)

The program displays a number from 1 every second:

1
2
3
...Code language: Dart (dart)

How it works.

First, create an instance of the Number() and access the number stream via the stream property:

var stream = Number().stream;Code language: Dart (dart)

Second, subscribe for notifications whenever a new number is coming in the stream by calling the listen() method on the stream object:

var subscription = stream.listen(
  (event) => print(event),
);Code language: Dart (dart)

Dart will execute the anonymous function passed to the listen() method whenever a number is coming into the stream.

By default, a stream accepts a single subscription. In other words, a stream only allows a single listener for its whole lifespan.

If you attempt to listen to a stream more than once, you’ll get an exception like this:

import 'number.dart';

void main() {
  var stream = Number().stream;

  stream.listen(
    (event) => print(event),
  );

  // cause exception
  stream.listen(
    (event) => print(event),
  );
}Code language: Dart (dart)

Exception:

Unhandled exception:
Bad state: Stream has already been listened to.Code language: plaintext (plaintext)

To allow multiple subscriptions, you need to create a stream as a broadcast stream by using the asBroadcastStream() method of the Stream<T> object:

import 'number.dart';

void main() {
  var stream = Number().stream.asBroadcastStream();

  stream.listen(
    (event) => print('listener 1: $event'),
  );

  // cause exception
  stream.listen(
    (event) => print('listener 2: $event'),
  );
}Code language: Dart (dart)

Output:

listener 1: 1
listener 2: 1
listener 1: 2
listener 2: 2
listener 1: 3
listener 2: 3Code language: plaintext (plaintext)

2) Reading from a stream using an await-for statement

The following example uses an asynchronous for loop rather than a callback to read data from the stream:

import 'number.dart';

Future<void> main() async {
  // create a new stream
  var stream = Number().stream;

  // reading data from the stream
  await for (var number in stream) {
    print(number);
  }
}Code language: Dart (dart)

To make a regular for loop asynchronous, you place the await keyword in front of it. The await for pauses the loop until the next number emits from the stream.

Note that we mark the main() function as an async function because it has an await keyword.

StreamSubscription<T>

The listen() method of the Stream<T> object returns an instance of the StreamSubscription<T>:

var subscription = stream.listen(
  (event) => print(event),
);Code language: JavaScript (javascript)

The StreamSubscription<T> object has three useful methods:

  • pause() – pauses a subscription.
  • resume() – resumes a subscription after a pause.
  • cancel() – cancels a subscription.

The following example illustrates how to pause a subscription after two seconds and then resume it after three seconds using two timers:

import 'number.dart';
import 'dart:async';

void main() {
  var stream = Number().stream;

  var subscription = stream.listen(
    (event) => print(event),
  );

  // pause after 2 seconds
  Timer(
    Duration(seconds: 2),
    () {
      print('Pausing the subscription');
      subscription.pause();
    },
  );

  // resume after three seconds later
  Timer(
    Duration(seconds: 5),
    () {
      print('Resuming the subscription');
      subscription.resume();
    },
  );
}Code language: C# (cs)

Output:

1
2
Pausing the subscription
Resuming the subscription
3
4
5
...

Note that while pausing the subscription, the stream is emitting the numbers.

Transforming a stream

The Stream<T> object provides you with many useful methods for transforming data in the stream.

For example, the following program uses the take() method to get the first 10 numbers, the where() method to get only even numbers, and the map() method to transform the numbers into strings:

import 'dart:async';
import 'number.dart';

Future<void> main() async {
  Number()
      .stream
      .take(10)
      .where((number) => number % 2 == 0)
      .map((number) => 'number $number')
      .listen(
        (number) => print(number),
        onDone: () => print('Done!'),
      );
}
Code language: Dart (dart)

Output:

number 2
number 4
number 6
number 8
number 10
Done!Code language: plaintext (plaintext)

Handling errors

The following modifies the number.dart file to raise an error in the number stream when the current number reaches 5:

import 'dart:async';

class Number {
  Number() {
    Timer.periodic(
      Duration(seconds: 1),
      (timer) {
        _controller.sink.add(_count);
        _count++;
        if (_count == 5) {
          _controller.addError("Limit reached");
          _controller.close();
          timer.cancel();
        }
      },
    );
  }

  //  create a StreamController
  final _controller = StreamController<int>();
  var _count = 1;

  // property that returns the stream object
  Stream<int> get stream => _controller.stream;
}
Code language: Dart (dart)

In this example, we use the addError() method of the StreamController<T> class to raise an error and the close() method to close the stream

To handle the errors of a stream, you can use either a callback or a try-catch block.

1) Using a callback to handle error

The following example illustrates how to use a callback to handle an error that occurs in a stream:

import 'dart:async';
import 'number.dart';

Future<void> main() async {
  Number().stream.listen(
        (number) => print(number),
        onError: (err) => print('Error:$err'),
      );
}
Code language: Dart (dart)

Output:

1
2
3
4
Error:Limit reachedCode language: plaintext (plaintext)

The listen() method also has a parameter onDone that accepts a callback. When the stream closes and sends a done event, the onDone callback is called. If onDone is null, nothing happens. For example:

import 'dart:async';
import 'number.dart';

Future<void> main() async {
  Number().stream.listen(
        (number) => print(number),
        onError: (err) => print('Error:$err'),
        onDone: () => print("Done."),
      );
}Code language: Dart (dart)

Output:

1
2
3
4
Error:Limit reached
Done.Code language: plaintext (plaintext)

2) Using a try-catch block to handle errors

Besides the OnError handler, you can also use the try-catch block to handle the errors on a stream. For example:

import 'dart:async';
import 'number.dart';

Future<void> main() async {
  try {
    Number().stream.listen(
          (number) => print(number),
        );
  } catch (err) {
    print(err);
  }
}Code language: Dart (dart)

Output:

1
2
3
4
Unhandled exception:
Limit reachedCode language: plaintext (plaintext)

Summary

  • A stream represents multiple values that will be returned in the future.
  • Use the StreamController<T> to create a simple stream that others can listen to and push data into.
  • Use the Stream<T> class to manage streams.
  • Use the listen() method of the Stream<T> object to subscribe for notifications.
Was this tutorial helpful ?