Watching Streams & Futures
You've learned to watch synchronous data. Now let's handle async data with Streams and Futures.
Why Special Functions?
Streams and Futures are different from Listenable:
- Stream - Emits multiple values over time
- Future - Completes once with a value
- Both have loading/error states
watch_it provides watchStream() and watchFuture() - like StreamBuilder and FutureBuilder, but in one line.
watchStream - Reactive Streams
Replace StreamBuilder with watchStream():
class ChatWidget extends WatchingWidget {
@override
Widget build(BuildContext context) {
// One line instead of StreamBuilder!
final snapshot = watchStream(
(ChatService s) => s.messageStream,
initialValue: 'Waiting for messages...',
);
return Text(snapshot.data ?? 'No data');
}
}Handling Stream States
class UserActivity extends WatchingWidget {
@override
Widget build(BuildContext context) {
final snapshot = watchStream(
(UserService s) => s.activityStream,
initialValue: 'No activity',
);
// Check state like StreamBuilder
if (snapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
}
if (snapshot.hasError) {
return Text('Error: ${snapshot.error}');
}
return Text('Activity: ${snapshot.data}');
}
}AsyncSnapshot and Null Safety
When you provide a non-null initialValue and use a non-nullable stream type (like Stream<String>), AsyncSnapshot.data won't be null. It starts with your initial value and updates with stream events:
class MessageWidget extends WatchingWidget {
const MessageWidget({super.key});
@override
Widget build(BuildContext context) {
// Stream<String> - non-nullable type
final snapshot = watchStream(
(ChatService s) => s.messageStream,
initialValue: 'Waiting for messages...',
);
// Safe to use ! because:
// 1. We provided a non-null initialValue
// 2. Stream type is non-nullable (String, not String?)
return Column(
children: [
Text(snapshot.data!),
if (snapshot.connectionState == ConnectionState.waiting)
Text('(connecting...)', style: TextStyle(fontSize: 12)),
],
);
}
}Note: If your stream type is nullable (like Stream<String?>), then stream events can emit null values, making snapshot.data null even with a non-null initialValue.
Compare with StreamBuilder
Without watch_it:
class UserActivity extends StatelessWidget {
const UserActivity({super.key});
@override
Widget build(BuildContext context) {
return StreamBuilder(
stream: di<UserService>().activityStream,
initialData: 'No activity',
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
}
if (snapshot.hasError) {
return Text('Error: ${snapshot.error}');
}
return Text('Activity: ${snapshot.data}');
},
);
}
}Much more nested and verbose!
Advanced watchStream Usage
Watching Local Streams (target parameter)
If your stream isn't registered in get_it, use the target parameter:
class TimerWidget extends WatchingWidget {
const TimerWidget({super.key, required this.timerStream});
final Stream<int> timerStream;
@override
Widget build(BuildContext context) {
// Watch a stream passed as parameter (not from get_it)
final snapshot = watchStream(
null, // No selector needed
target: timerStream, // Watch this stream directly
initialValue: 0,
);
return Text('Seconds: ${snapshot.data}');
}
}When to use:
- Stream passed as widget parameter
- Locally created streams
- Streams from external packages
Allowing Stream Changes (allowStreamChange)
By default, watchStream behaves differently depending on how you provide the stream:
- With
selectfunction: Calls the selector once to prevent creating multiple streams on every rebuild - With
targetparameter: Throws an error if the stream instance changes between rebuilds
Set allowStreamChange: true if you expect the stream to legitimately change between rebuilds:
class ChatRoomWidget extends WatchingWidget {
const ChatRoomWidget({super.key, required this.selectedRoomId});
final String selectedRoomId;
@override
Widget build(BuildContext context) {
final chatService = createOnce(() => ChatRoomService());
// Stream changes when selectedRoomId changes
final snapshot = watchStream(
null,
target: chatService.getRoomStream(selectedRoomId),
initialValue: 'No messages yet',
allowStreamChange: true, // Allow switching between room streams
);
return Column(
children: [
Text('Room: $selectedRoomId'),
Text('Last message: ${snapshot.data}'),
],
);
}
}What happens with allowStreamChange: true:
- The selector function is called and evaluated on every build
- If the stream instance changed,
watchStreamautomatically unsubscribes from the old stream - Subscribes to the new stream
- Widget rebuilds with data from the new stream
When to use:
- Stream depends on reactive parameters (like selected room ID)
- Switching between different streams based on user input
- Important: Only use when the stream should actually change, not when accidentally recreating the same stream
Full Method Signature
AsyncSnapshot<R> watchStream<T extends Object, R>(
Stream<R> Function(T)? select, {
T? target,
R? initialValue,
bool preserveState = true,
bool allowStreamChange = false,
String? instanceName,
GetIt? getIt,
})All parameters:
select- Function to get Stream from registered object (optional if usingtarget)target- Direct stream to watch (optional, not from get_it)initialValue- Value shown before first stream event (makesdatanever null)preserveState- Keep last value when stream changes (default:true)allowStreamChange- Allow stream instance to change (default:false)instanceName- For named registrationsgetIt- Custom GetIt instance (rarely needed)
watchFuture - Reactive Futures
Replace FutureBuilder with watchFuture():
class DataWidget extends WatchingWidget {
@override
Widget build(BuildContext context) {
final snapshot = watchFuture(
(DataService s) => s.fetchTodos(),
initialValue: [],
);
if (snapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
}
if (snapshot.hasError) {
return Text('Error: ${snapshot.error}');
}
return Text('Data: ${snapshot.data?.length} items');
}
}AsyncSnapshot and Null Safety
Just like watchStream, when you provide a non-null initialValue to watchFuture with a non-nullable future type (like Future<String>), AsyncSnapshot.data won't be null. See the AsyncSnapshot tip above for details.
Common Pattern: App Initialization
class SplashScreen extends WatchingWidget {
const SplashScreen({super.key});
@override
Widget build(BuildContext context) {
// Selector function called once - starts initialization automatically
final snapshot = watchFuture(
(AppService s) => s.initialize(),
initialValue: false,
);
if (snapshot.connectionState == ConnectionState.waiting) {
return Column(
children: [
CircularProgressIndicator(),
Text('Initializing...'),
],
);
}
if (snapshot.hasError) {
return ErrorScreen(error: snapshot.error);
}
// Initialization complete - navigate
callOnce((_) {
Navigator.pushReplacement(
context,
MaterialPageRoute(builder: (_) => HomeScreen()),
);
});
return Container(); // Brief moment before navigation
}
}Advanced: Wait for Multiple Dependencies
If you need to wait for multiple async services to initialize (like database, auth, config), use allReady() instead of individual futures. See Async Initialization with allReady for more details.
Advanced watchFuture Usage
Allowing Future Changes (allowFutureChange)
By default, watchFuture behaves differently depending on how you provide the future:
- With
selectfunction: Calls the selector once to prevent creating multiple futures on every rebuild - With
targetparameter: Throws an error if the future instance changes between rebuilds
Set allowFutureChange: true if you expect the future to legitimately change between rebuilds (like retry operations):
class DataWidget extends WatchingWidget {
const DataWidget({super.key});
@override
Widget build(BuildContext context) {
// Create retry counter
final retryCount = createOnce(() => ValueNotifier<int>(0));
// Watch the future - future changes when retryCount changes
final snapshot = watchFuture(
(DataService s) => s.fetchTodos(),
initialValue: null,
allowFutureChange: true, // Allow new future on retry
);
return Column(
children: [
if (snapshot.data == null && !snapshot.hasError)
CircularProgressIndicator()
else if (snapshot.hasError)
Column(
children: [
Text('Error: ${snapshot.error}'),
ElevatedButton(
// Trigger new future by changing retryCount
onPressed: () => retryCount.value++,
child: Text('Retry'),
),
],
)
else
Text('Loaded ${snapshot.data!.length} items'),
],
);
}
}What happens with allowFutureChange: true:
- The selector function is called and evaluated on every build
- If the future instance changed,
watchFuturestarts watching the new future - Widget rebuilds when the new future completes
- Previous future completion is ignored
When to use:
- Retry functionality for failed requests
- Future depends on reactive parameters that change
- Important: Only use when the future should actually change, not when accidentally recreating the same future
Full Method Signature
AsyncSnapshot<R> watchFuture<T extends Object, R>(
Future<R> Function(T)? select, {
T? target,
required R initialValue,
bool preserveState = true,
bool allowFutureChange = false,
String? instanceName,
GetIt? getIt,
})All parameters:
select- Function to get Future from registered object (optional if usingtarget)target- Direct future to watch (optional, not from get_it)initialValue- Required. Value shown before future completes (makesdatanever null)preserveState- Keep last value when future changes (default:true)allowFutureChange- Allow future instance to change (default:false)instanceName- For named registrationsgetIt- Custom GetIt instance (rarely needed)
Multiple Async Sources
Watch multiple streams or futures:
class Dashboard extends WatchingWidget {
@override
Widget build(BuildContext context) {
final messages = watchStream(
(MessageService s) => s.messageStream,
initialValue: <Message>[],
);
final notifications = watchStream(
(NotificationService s) => s.notificationStream,
initialValue: 0,
);
return Column(
children: [
Text('Messages: ${messages.data?.length ?? 0}'),
Text('Notifications: ${notifications.data}'),
],
);
}
}Mix Sync and Async
Combine synchronous and asynchronous data:
class UserProfile extends WatchingWidget {
const UserProfile({super.key});
@override
Widget build(BuildContext context) {
// Synchronous data
final userId = watchValue((SimpleUserManager m) => m.name);
// Asynchronous data - fetch stats from API based on current userId
final statsSnapshot = watchFuture(
(ApiClient api) => api.get('/users/$userId/stats'),
initialValue: null,
);
return Column(
children: [
Text('User: $userId'),
if (statsSnapshot.data == null)
CircularProgressIndicator()
else if (statsSnapshot.hasError)
Text('Error loading stats')
else
Text('Stats: ${statsSnapshot.data!['data']}'),
],
);
}
}AsyncSnapshot Quick Guide
Both watchStream() and watchFuture() return AsyncSnapshot<T>:
// Check connection state
snapshot.connectionState == ConnectionState.waiting;
snapshot.connectionState == ConnectionState.done;
// Check for data/errors
snapshot.hasData; // true if data available
snapshot.hasError; // true if error occurred
// Access data/error
snapshot.data; // The value (T?)
snapshot.error; // The error if anyCommon Patterns
Pattern 1: Simple Loading
final snapshot = watchFuture(
(DataService s) => s.fetchTodos(),
initialValue: null,
);
if (snapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
}
return Text('Data loaded: ${snapshot.data?.length} items');Pattern 2: Error Handling
final snapshot = watchStream(
(MessageService s) => s.messageStream,
initialValue: <Message>[],
);
if (snapshot.hasError) {
return Column(
children: [
Text('Error: ${snapshot.error}'),
ElevatedButton(
onPressed: () {}, // Retry logic
child: Text('Retry'),
),
],
);
}
return ListView(children: snapshot.data!.map((m) => Text(m.text)).toList());No More Nested Builders!
Before:
return FutureBuilder(
future: initFuture,
builder: (context, futureSnapshot) {
if (futureSnapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
}
return StreamBuilder(
stream: dataStream,
builder: (context, streamSnapshot) {
if (streamSnapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
}
return Text(streamSnapshot.data!);
},
);
},
);After:
final initSnapshot = watchFuture(
(AppService s) => s.initialize(),
initialValue: false,
);
final dataSnapshot = watchStream(
(ChatService s) => s.messageStream,
initialValue: '',
);
if (initSnapshot.connectionState == ConnectionState.waiting ||
dataSnapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
}
return Text(dataSnapshot.data!);Flat, readable code!
Key Takeaways
✅ watchStream() replaces StreamBuilder - no nesting ✅ watchFuture() replaces FutureBuilder - same benefit ✅ Both return AsyncSnapshot<T> - same API you know ✅ Automatic subscription and cleanup ✅ Combine sync and async data easily
Next: Learn about side effects with handlers.
See Also
- Your First Watch Functions - Sync data
- Side Effects with Handlers - Navigation, toasts
- Watch Functions Reference - Complete API