Skip to content

Watching Streams & Futures

You've learned to watch synchronous data. Now let's handle async data with Streams and Futures.

Why Special Functions?

Streams and Futures are different from Listenable:

  • Stream - Emits multiple values over time
  • Future - Completes once with a value
  • Both have loading/error states

watch_it provides watchStream() and watchFuture() - like StreamBuilder and FutureBuilder, but in one line.

watchStream - Reactive Streams

Replace StreamBuilder with watchStream():

dart
class ChatWidget extends WatchingWidget {
  @override
  Widget build(BuildContext context) {
    // One line instead of StreamBuilder!
    final snapshot = watchStream(
      (ChatService s) => s.messageStream,
      initialValue: 'Waiting for messages...',
    );

    return Text(snapshot.data ?? 'No data');
  }
}

Handling Stream States

dart
class UserActivity extends WatchingWidget {
  @override
  Widget build(BuildContext context) {
    final snapshot = watchStream(
      (UserService s) => s.activityStream,
      initialValue: 'No activity',
    );

    // Check state like StreamBuilder
    if (snapshot.connectionState == ConnectionState.waiting) {
      return CircularProgressIndicator();
    }

    if (snapshot.hasError) {
      return Text('Error: ${snapshot.error}');
    }

    return Text('Activity: ${snapshot.data}');
  }
}

AsyncSnapshot and Null Safety

When you provide a non-null initialValue and use a non-nullable stream type (like Stream<String>), AsyncSnapshot.data won't be null. It starts with your initial value and updates with stream events:

dart
class MessageWidget extends WatchingWidget {
  const MessageWidget({super.key});

  @override
  Widget build(BuildContext context) {
    // Stream<String> - non-nullable type
    final snapshot = watchStream(
      (ChatService s) => s.messageStream,
      initialValue: 'Waiting for messages...',
    );

    // Safe to use ! because:
    // 1. We provided a non-null initialValue
    // 2. Stream type is non-nullable (String, not String?)
    return Column(
      children: [
        Text(snapshot.data!),
        if (snapshot.connectionState == ConnectionState.waiting)
          Text('(connecting...)', style: TextStyle(fontSize: 12)),
      ],
    );
  }
}

Note: If your stream type is nullable (like Stream<String?>), then stream events can emit null values, making snapshot.data null even with a non-null initialValue.

Compare with StreamBuilder

Without watch_it:

dart
class UserActivity extends StatelessWidget {
  const UserActivity({super.key});

  @override
  Widget build(BuildContext context) {
    return StreamBuilder(
      stream: di<UserService>().activityStream,
      initialData: 'No activity',
      builder: (context, snapshot) {
        if (snapshot.connectionState == ConnectionState.waiting) {
          return CircularProgressIndicator();
        }
        if (snapshot.hasError) {
          return Text('Error: ${snapshot.error}');
        }
        return Text('Activity: ${snapshot.data}');
      },
    );
  }
}

Much more nested and verbose!

Advanced watchStream Usage

Watching Local Streams (target parameter)

If your stream isn't registered in get_it, use the target parameter:

dart
class TimerWidget extends WatchingWidget {
  const TimerWidget({super.key, required this.timerStream});

  final Stream<int> timerStream;

  @override
  Widget build(BuildContext context) {
    // Watch a stream passed as parameter (not from get_it)
    final snapshot = watchStream(
      null, // No selector needed
      target: timerStream, // Watch this stream directly
      initialValue: 0,
    );

    return Text('Seconds: ${snapshot.data}');
  }
}

When to use:

  • Stream passed as widget parameter
  • Locally created streams
  • Streams from external packages

Allowing Stream Changes (allowStreamChange)

By default, watchStream behaves differently depending on how you provide the stream:

  • With select function: Calls the selector once to prevent creating multiple streams on every rebuild
  • With target parameter: Throws an error if the stream instance changes between rebuilds

Set allowStreamChange: true if you expect the stream to legitimately change between rebuilds:

dart
class ChatRoomWidget extends WatchingWidget {
  const ChatRoomWidget({super.key, required this.selectedRoomId});

  final String selectedRoomId;

  @override
  Widget build(BuildContext context) {
    final chatService = createOnce(() => ChatRoomService());

    // Stream changes when selectedRoomId changes
    final snapshot = watchStream(
      null,
      target: chatService.getRoomStream(selectedRoomId),
      initialValue: 'No messages yet',
      allowStreamChange: true, // Allow switching between room streams
    );

    return Column(
      children: [
        Text('Room: $selectedRoomId'),
        Text('Last message: ${snapshot.data}'),
      ],
    );
  }
}

What happens with allowStreamChange: true:

  • The selector function is called and evaluated on every build
  • If the stream instance changed, watchStream automatically unsubscribes from the old stream
  • Subscribes to the new stream
  • Widget rebuilds with data from the new stream

When to use:

  • Stream depends on reactive parameters (like selected room ID)
  • Switching between different streams based on user input
  • Important: Only use when the stream should actually change, not when accidentally recreating the same stream

Full Method Signature

dart
AsyncSnapshot<R> watchStream<T extends Object, R>(
  Stream<R> Function(T)? select, {
  T? target,
  R? initialValue,
  bool preserveState = true,
  bool allowStreamChange = false,
  String? instanceName,
  GetIt? getIt,
})

All parameters:

  • select - Function to get Stream from registered object (optional if using target)
  • target - Direct stream to watch (optional, not from get_it)
  • initialValue - Value shown before first stream event (makes data never null)
  • preserveState - Keep last value when stream changes (default: true)
  • allowStreamChange - Allow stream instance to change (default: false)
  • instanceName - For named registrations
  • getIt - Custom GetIt instance (rarely needed)

watchFuture - Reactive Futures

Replace FutureBuilder with watchFuture():

dart
class DataWidget extends WatchingWidget {
  @override
  Widget build(BuildContext context) {
    final snapshot = watchFuture(
      (DataService s) => s.fetchTodos(),
      initialValue: [],
    );

    if (snapshot.connectionState == ConnectionState.waiting) {
      return CircularProgressIndicator();
    }

    if (snapshot.hasError) {
      return Text('Error: ${snapshot.error}');
    }

    return Text('Data: ${snapshot.data?.length} items');
  }
}

AsyncSnapshot and Null Safety

Just like watchStream, when you provide a non-null initialValue to watchFuture with a non-nullable future type (like Future<String>), AsyncSnapshot.data won't be null. See the AsyncSnapshot tip above for details.

Common Pattern: App Initialization

dart
class SplashScreen extends WatchingWidget {
  const SplashScreen({super.key});

  @override
  Widget build(BuildContext context) {
    // Selector function called once - starts initialization automatically
    final snapshot = watchFuture(
      (AppService s) => s.initialize(),
      initialValue: false,
    );

    if (snapshot.connectionState == ConnectionState.waiting) {
      return Column(
        children: [
          CircularProgressIndicator(),
          Text('Initializing...'),
        ],
      );
    }

    if (snapshot.hasError) {
      return ErrorScreen(error: snapshot.error);
    }

    // Initialization complete - navigate
    callOnce((_) {
      Navigator.pushReplacement(
        context,
        MaterialPageRoute(builder: (_) => HomeScreen()),
      );
    });

    return Container(); // Brief moment before navigation
  }
}

Advanced: Wait for Multiple Dependencies

If you need to wait for multiple async services to initialize (like database, auth, config), use allReady() instead of individual futures. See Async Initialization with allReady for more details.

Advanced watchFuture Usage

Allowing Future Changes (allowFutureChange)

By default, watchFuture behaves differently depending on how you provide the future:

  • With select function: Calls the selector once to prevent creating multiple futures on every rebuild
  • With target parameter: Throws an error if the future instance changes between rebuilds

Set allowFutureChange: true if you expect the future to legitimately change between rebuilds (like retry operations):

dart
class DataWidget extends WatchingWidget {
  const DataWidget({super.key});

  @override
  Widget build(BuildContext context) {
    // Create retry counter
    final retryCount = createOnce(() => ValueNotifier<int>(0));

    // Watch the future - future changes when retryCount changes
    final snapshot = watchFuture(
      (DataService s) => s.fetchTodos(),
      initialValue: null,
      allowFutureChange: true, // Allow new future on retry
    );

    return Column(
      children: [
        if (snapshot.data == null && !snapshot.hasError)
          CircularProgressIndicator()
        else if (snapshot.hasError)
          Column(
            children: [
              Text('Error: ${snapshot.error}'),
              ElevatedButton(
                // Trigger new future by changing retryCount
                onPressed: () => retryCount.value++,
                child: Text('Retry'),
              ),
            ],
          )
        else
          Text('Loaded ${snapshot.data!.length} items'),
      ],
    );
  }
}

What happens with allowFutureChange: true:

  • The selector function is called and evaluated on every build
  • If the future instance changed, watchFuture starts watching the new future
  • Widget rebuilds when the new future completes
  • Previous future completion is ignored

When to use:

  • Retry functionality for failed requests
  • Future depends on reactive parameters that change
  • Important: Only use when the future should actually change, not when accidentally recreating the same future

Full Method Signature

dart
AsyncSnapshot<R> watchFuture<T extends Object, R>(
  Future<R> Function(T)? select, {
  T? target,
  required R initialValue,
  bool preserveState = true,
  bool allowFutureChange = false,
  String? instanceName,
  GetIt? getIt,
})

All parameters:

  • select - Function to get Future from registered object (optional if using target)
  • target - Direct future to watch (optional, not from get_it)
  • initialValue - Required. Value shown before future completes (makes data never null)
  • preserveState - Keep last value when future changes (default: true)
  • allowFutureChange - Allow future instance to change (default: false)
  • instanceName - For named registrations
  • getIt - Custom GetIt instance (rarely needed)

Multiple Async Sources

Watch multiple streams or futures:

dart
class Dashboard extends WatchingWidget {
  @override
  Widget build(BuildContext context) {
    final messages = watchStream(
      (MessageService s) => s.messageStream,
      initialValue: <Message>[],
    );

    final notifications = watchStream(
      (NotificationService s) => s.notificationStream,
      initialValue: 0,
    );

    return Column(
      children: [
        Text('Messages: ${messages.data?.length ?? 0}'),
        Text('Notifications: ${notifications.data}'),
      ],
    );
  }
}

Mix Sync and Async

Combine synchronous and asynchronous data:

dart
class UserProfile extends WatchingWidget {
  const UserProfile({super.key});

  @override
  Widget build(BuildContext context) {
    // Synchronous data
    final userId = watchValue((SimpleUserManager m) => m.name);

    // Asynchronous data - fetch stats from API based on current userId
    final statsSnapshot = watchFuture(
      (ApiClient api) => api.get('/users/$userId/stats'),
      initialValue: null,
    );

    return Column(
      children: [
        Text('User: $userId'),
        if (statsSnapshot.data == null)
          CircularProgressIndicator()
        else if (statsSnapshot.hasError)
          Text('Error loading stats')
        else
          Text('Stats: ${statsSnapshot.data!['data']}'),
      ],
    );
  }
}

AsyncSnapshot Quick Guide

Both watchStream() and watchFuture() return AsyncSnapshot<T>:

dart
// Check connection state
snapshot.connectionState == ConnectionState.waiting;
snapshot.connectionState == ConnectionState.done;

// Check for data/errors
snapshot.hasData; // true if data available
snapshot.hasError; // true if error occurred

// Access data/error
snapshot.data; // The value (T?)
snapshot.error; // The error if any

Common Patterns

Pattern 1: Simple Loading

dart
final snapshot = watchFuture(
  (DataService s) => s.fetchTodos(),
  initialValue: null,
);

if (snapshot.connectionState == ConnectionState.waiting) {
  return CircularProgressIndicator();
}

return Text('Data loaded: ${snapshot.data?.length} items');

Pattern 2: Error Handling

dart
final snapshot = watchStream(
  (MessageService s) => s.messageStream,
  initialValue: <Message>[],
);

if (snapshot.hasError) {
  return Column(
    children: [
      Text('Error: ${snapshot.error}'),
      ElevatedButton(
        onPressed: () {}, // Retry logic
        child: Text('Retry'),
      ),
    ],
  );
}

return ListView(children: snapshot.data!.map((m) => Text(m.text)).toList());

No More Nested Builders!

Before:

dart
return FutureBuilder(
  future: initFuture,
  builder: (context, futureSnapshot) {
    if (futureSnapshot.connectionState == ConnectionState.waiting) {
      return CircularProgressIndicator();
    }

    return StreamBuilder(
      stream: dataStream,
      builder: (context, streamSnapshot) {
        if (streamSnapshot.connectionState == ConnectionState.waiting) {
          return CircularProgressIndicator();
        }
        return Text(streamSnapshot.data!);
      },
    );
  },
);

After:

dart
final initSnapshot = watchFuture(
  (AppService s) => s.initialize(),
  initialValue: false,
);

final dataSnapshot = watchStream(
  (ChatService s) => s.messageStream,
  initialValue: '',
);

if (initSnapshot.connectionState == ConnectionState.waiting ||
    dataSnapshot.connectionState == ConnectionState.waiting) {
  return CircularProgressIndicator();
}

return Text(dataSnapshot.data!);

Flat, readable code!

Key Takeaways

watchStream() replaces StreamBuilder - no nesting ✅ watchFuture() replaces FutureBuilder - same benefit ✅ Both return AsyncSnapshot<T> - same API you know ✅ Automatic subscription and cleanup ✅ Combine sync and async data easily

Next: Learn about side effects with handlers.

See Also

Released under the MIT License.